2

I have millions of log files which generating every day and I need to read all of them and put together as a single file to do some process on it in other app.

I'm looking for the fastest way to do this. Currently I'm using Threads, Tasks and parallel like this:

Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i => { ReadFiles(files[i]); }); void ReadFiles(string file) { try { var txt = File.ReadAllText(file); filesTxt.Add(tmp); } catch { } GlobalCls.ThreadNo--; } 

or

foreach (var file in files) { //Int64 index = i; //var file = files[index]; while (Process.GetCurrentProcess().Threads.Count > 100) { Thread.Sleep(100); Application.DoEvents(); } new Thread(() => ReadFiles(file)).Start(); GlobalCls.ThreadNo++; // Task.Run(() => ReadFiles(file)); } 

The problem is that after a few thousand reading files, the reading gets slower and slower!!

Any idea why? and what's the fastest approaches to reading millions small files? Thank you.

5
  • 1
    depending on what you want to do with them. i'd use some commandline tools, not c# to combine them. Commented Sep 29, 2019 at 1:41
  • 2
    You can't read all the files parallel. Even if your code could, you hard disk can't do it. As Daniel A. White mentioned; use a proper command line tool. Commented Sep 29, 2019 at 1:52
  • 1
    Threads are good for CPU-bound problems, but not for IO-bound problems. You end up in having many threads with all their overhead and most of them are waiting for IO to complete. Use async instead. See e.g. stackoverflow.com/questions/13167934/… Commented Sep 29, 2019 at 6:47
  • To start with, the task how you described it, isn't parallelized very well - while you can read files simultaneously the second part seems to be sequential as you have to synchronize the adding of content to a single file unless getting a randomly shuffled content is your goal. Secondly, you blended operations with different latency into single one where one can be a bottleneck for other (I/O throughput can be limited for example) and established parallelism to static high number which cannot be adjusted. And finally, the gradual performance degradation it typical indicator of GC pressure. Commented Sep 29, 2019 at 13:07
  • 1
    How much memory does your program consume? Commented Sep 30, 2019 at 9:12

3 Answers 3

3

It seems that you are loading the contents of all files in memory, before writing them back to the single file. This could explain why the process becomes slower over time.

A way to optimize the process is to separate the reading part from the writing part, and do them in parallel. This is called the producer-consumer pattern. It can be implemented with the Parallel class, or with threads, or with tasks, but I will demonstrate instead an implementation based on the powerful TPL Dataflow library, that is particularly suited for jobs like this.

private static async Task MergeFiles(IEnumerable<string> sourceFilePaths, string targetFilePath, CancellationToken cancellationToken = default, IProgress<int> progress = null) { var readerBlock = new TransformBlock<string, string>(async filePath => { return File.ReadAllText(filePath); // Read the small file }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 2, // Reading is parallelizable BoundedCapacity = 100, // No more than 100 file-paths buffered CancellationToken = cancellationToken, // Cancel at any time }); StreamWriter streamWriter = null; int filesProcessed = 0; var writerBlock = new ActionBlock<string>(text => { streamWriter.Write(text); // Append to the target file filesProcessed++; if (filesProcessed % 10 == 0) progress?.Report(filesProcessed); }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1, // We can't parallelize the writer BoundedCapacity = 100, // No more than 100 file-contents buffered CancellationToken = cancellationToken, // Cancel at any time }); readerBlock.LinkTo(writerBlock, new DataflowLinkOptions() { PropagateCompletion = true }); // This is a tricky part. We use BoundedCapacity, so we must propagate manually // a possible failure of the writer to the reader, otherwise a deadlock may occur. PropagateFailure(writerBlock, readerBlock); // Open the output stream using (streamWriter = new StreamWriter(targetFilePath)) { // Feed the reader with the file paths foreach (var filePath in sourceFilePaths) { var accepted = await readerBlock.SendAsync(filePath, cancellationToken); // Cancel at any time if (!accepted) break; // This will happen if the reader fails } readerBlock.Complete(); await writerBlock.Completion; } async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2) { try { await block1.Completion.ConfigureAwait(false); } catch (Exception ex) { if (block1.Completion.IsCanceled) return; // On cancellation do nothing block2.Fault(ex); } } } 

Usage example:

var cts = new CancellationTokenSource(); var progress = new Progress<int>(value => { // Safe to update the UI Console.WriteLine($"Files processed: {value:#,0}"); }); var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log", SearchOption.AllDirectories); // Include subdirectories await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress); 

The BoundedCapacity is used to keep the memory usage under control.

If the disk drive is SSD, you can try reading with a MaxDegreeOfParallelism larger than 2.

For best performance you could consider writing to a different disc drive than the drive containing the source files.

The TPL Dataflow library is available as a package for .NET Framework, and is build-in for .NET Core.

Sign up to request clarification or add additional context in comments.

Comments

2

When it comes to IO operations, CPU parallelism is useless. Your IO device (disk, network, whatever) is your bottleneck. By reading from the device concurrently you risk to even lower your performance.

Comments

0

Perhaps you can just use PowerShell to concatenate the files, such as in this answer.

Another alternative is to write a program that uses the FileSystemWatcher class to watch for new files and append them as they are created.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.