4

I'm not completely new to C#, but I'm not familiar enough with the language to know how to do what I need to do.

I have a file, call it File1.txt. File1.txt has 100,000 lines or so. I will duplicate File1.txt and call it File1_untested.txt. I will also create an empty file "Successes.txt" For each line in the file:

  • Remove this line from File1_untested.txt
  • If this line passes the test, write it to Successes.txt

So, my question is, how can I multithread this?

My approach so far has been to create an object (LineChecker), give the object its line to check, and pass the object into a ThreadPool. I understand how to use ThreadPools for a few tasks with a CountdownEvent. However, it seems unreasonable to queue up 100,000 tasks all at once. How can I gradually feed the pool? Maybe 1000 lines at a time or something like that.

Also, I need to ensure that no two threads are adding to Successes.txt or removing from File1_untested.txt at the same time. I can handle this with lock(), right? What should I be passing into lock()? Can I use a static member of LineChecker?

I'm just trying to get a broad understanding of how something like this can be designed.

4
  • 1
    Give us some idea bout the nature of your "test". What does it do? How does it work? Is it computationally cheap or computationally expensive? Commented Sep 26, 2015 at 20:25
  • The test is web related. I expect most time will be spent waiting for a response from a webserver. Commented Sep 26, 2015 at 20:29
  • 1
    Take a look at the BlockingCollection class. msdn.microsoft.com/en-us/library/dd997371(v=vs.110).aspx It implements the consumer/producer patter which is what you want to use. Commented Sep 26, 2015 at 20:33
  • Ok Paul, thank you. This is the exact type of responses I'm looking for! Commented Sep 26, 2015 at 20:42

2 Answers 2

2

Since the tests takes a relatively significant amount of time then it makes sense to utilize multiple CPU cores. However, such utilization should be done only for the relatively expensive test, not for reading/updating the file. This is because reading/updating the file is relatively cheap.

Here is some example code that you can use:

Assuming the you have a relatively expensive Test method:

private bool Test(string line) { //This test is expensive } 

Here is a code sample that can utilize multiple CPU for testing:

Here we limit the number of items in the collection to 10, so that the thread that is reading from the file will wait for the other threads to catch up before reading more lines from the file.

This input thread will read much faster than the other threads can test, so we at the worst case we will have read 10 more lines than the testing threads have done testing. This makes sure we have good memory consumption.

CancellationTokenSource cancellation_token_source = new CancellationTokenSource(); CancellationToken cancellation_token = cancellation_token_source.Token; BlockingCollection<string> blocking_collection = new BlockingCollection<string>(10); using (StreamReader reader = new StreamReader(new FileStream(filename, FileMode.Open, FileAccess.Read))) { using ( StreamWriter writer = new StreamWriter(new FileStream(success_filename, FileMode.OpenOrCreate, FileAccess.Write))) { var input_task = Task.Factory.StartNew(() => { try { while (!reader.EndOfStream) { if (cancellation_token.IsCancellationRequested) return; blocking_collection.Add(reader.ReadLine()); } } finally //In all cases, even in the case of an exception, we need to make sure that we mark that we have done adding to the collection so that the Parallel.ForEach loop will exit. Note that Parallel.ForEach will not exit until we call CompleteAdding { blocking_collection.CompleteAdding(); } }); try { Parallel.ForEach(blocking_collection.GetConsumingEnumerable(), (line) => { bool test_reault = Test(line); if (test_reault) { lock (writer) { writer.WriteLine(line); } } }); } catch { cancellation_token_source.Cancel(); //If Paralle.ForEach throws an exception, we inform the input thread to stop throw; } input_task.Wait(); //This will make sure that exceptions thrown in the input thread will be propagated here } } 
Sign up to request clarification or add additional context in comments.

7 Comments

Wow, this is really helpful, thank you! Why do you give an upper bound of 10 to BlockingCollection? Also, the Parallel.ForEach will block until it's tested each line?
I updated my answer to contain an explanation for the 10 value. Yes, Parallel.ForEach will block until it tested all lines.
I see you've added the cancellation token, that makes sense. Nice addition
Yes. To make sure that we handle exceptions correctly.
The upper bound of the BlockingCollection does not effect the degree of parallelism, it just makes sure for example that the input thread does not read all the 100000 lines while the testing threads have only complete some 200 lines. Specifying 10 for example will make sure that if the testing threads have finished 200 lines, then the reading/input thread have read at most 210 lines. So 10 is the number of lines that have been read and are ready to be tested.
|
2

If your "test" was fast, then multithreading would not have given you any advantage whatsoever, because your code would be 100% disk-bound, and presumably you have all of your files on the same disk: you cannot improve the throughput of a single disk with multithreading.

But since your "test" will be waiting for a response from a webserver, this means that the test is going to be slow, so there is plenty of room for improvement by multithreading. Basically, the number of threads you need depends on how many requests the webserver can be servicing simultaneously without degrading the performance of the webserver. This number might still be low, so you might end up not gaining anything, but at least you can try.

If your file is not really huge, then you can read it all at once, and write it all at once. If each line is only 80 characters long, then this means that your file is only 8 megabytes, which is peanuts, so you can read all the lines into a list, work on the list, produce another list, and in the end write out the entire list.

This will allow you to create a structure, say, MyLine which contains the index of each line and the text of each line, so that you can sort all lines before writing them, so that you do not have to worry about out-of-order responses from the server.

Then, what you need to do is use a bounding blocking queue like BlockingCollection as @Paul suggested.

BlockingCollection accepts as a constructor parameter its maximum capacity. This means that once its maximum capacity has been reached, any further attempts to add to it are blocked (the caller sits there waiting) until some items are removed. So, if you want to have up to 10 simultaneously pending requests, you would construct it as follows:

var sourceCollection = new BlockingCollection<MyLine>(10); 

Your main thread will be stuffing sourceCollection with MyLine objects, and you will have 10 threads which block waiting to read MyLines from the collection. Each thread sends a request to the server, waits for a response, saves the result into a thread-safe resultCollection, and attempts to fetch the next item from sourceCollection.

Instead of using multiple threads you could instead use the async features of C#, but I am not terribly familiar with them, so I cannot advice you on precisely how you would do that.

In the end, copy the contents of resultCollection into a List, sort the list, and write it to the output file. (The copy into a separate List is probably a good idea because sorting the thread-safe resultCollection will probably be much slower than sorting a non-thread-safe List. I said probably.)

2 Comments

Excellent description Mike, thank you. Do you have any ideas as far as the file "File1_untested.txt"? I mean, a clean way to remove lines from it as a test them. (In case of an unexpected interruption of the program)
This is hard. Removing lines from text files is expensive. Essentially you have to read the entire file and write the entire file, skipping the line you want removed. It would probably be a lot easier to keep storing the maximum line number you have processed in a separate file which would contain just a single line, containing just a single number. Then you would consult this file each time before you start processing. But saving that file will be expensive too if done in a tight loop, so try updating it only every hundred lines or so.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.