I have a few thousand files and I use a function to extract some information, then I enter the information into a database. Reading the file from the disk is fast. Extracting the data is slow. Updating the database is fast. I want to perform this operation on multiple threads, to make better use of the CPU. I set up a semaphore, but I don't see the expected behavior. I expect to see the program start processing three files, then completing one and only then starting another one. In the beginning I see much more than three files that started processing at the same time and none of them are completed yet.
using System; using System.Threading; using System.IO; using System.Collections.Generic; namespace Threads { class Program { static Semaphore semaphore = new Semaphore(3, 3); static Queue<string> queue = new Queue<string>(); public static void Main(string[] args) { string[] files = Directory.GetFiles(@"C:\MyFolder"); foreach (string file in files) { queue.Enqueue(file); } while (queue.Count > 0) { string fileName1 = NextFile(); semaphore.WaitOne(); Thread thread1 = new Thread(() => ProcessFile(fileName1)); thread1.Start(); semaphore.Release(); } Console.Write("Press any key to continue . . . "); Console.ReadKey(true); } public static void ProcessFile(string fileName) { Console.WriteLine("Processing file " + fileName); string value = ExtractData(fileName); InsertInDatabase(value); Console.WriteLine("Completed processing file " + fileName); } public static string NextFile() { string fileName = queue.Dequeue(); return fileName; } /// <summary> /// This function takes a long time /// </summary> /// <param name="fileName"></param> /// <returns></returns> static string ExtractData(string fileName) { Thread.Sleep(5000); return "value"; } static void InsertInDatabase(string value) { Thread.Sleep(100); // do some work } } }
var processed = (from file in Directory.GetFiles(@"C:\MyFolder").ToObservable() from data in Observable.Start(() => ExtractData(file)) from insert in Observable.Start(() => InsertInDatabase(data)) select file).ToArray().Wait();.using System.Reqctive.Linq;.