Edit: A second iteration of this problem here.
I need to provide a service (either a Windows Service or an Azure Worker Role) which will handle the parallel execution of jobs. These jobs could be anything from importing data to compiling of reports, or to sending out mass notifications, etc. They are generally long-running and are not suitable to run on the web server.
Important factors
- The solution needs to be a very extensible and maintainable solution, as new job implementations will be created by developers all the time.
- The execution of these jobs is to be abstracted away from the client code. All developers need to do is to create the new job and add it to the job data source (database, MSMQ, Azure Queue, etc).
Solution
Deriving from AbstractJob and overriding the abstract Execute() method to provide implementation for the new job:
public abstract class AbstractJob { public JobState State { get; protected set; } public AbstractJob() { State = JobState.New; } protected abstract bool Execute(); public sealed void ExecuteJob() { State = JobState.Executing; try { if (Execute()) { State = JobState.Successful; } else { State = JobState.Failed; } } catch (Exception) { State = JobState.FailedOnException; } } } public class TestJob : AbstractJob { protected override bool Execute() { Debug.WriteLine("Test job!"); return true; } } The following JobConsumer would reside on a Windows Service or Azure Worker Role. IJobSource provides the consumer with jobs. These could be from the database, a MSMQ, an Azure Queue, etc (I have left this implementation out).
Every 5 seconds (this can obviously be configurable), the JobConsumer executes new jobs when there are available cores.
public class JobConsumer { public List<AbstractJob> Jobs { get; set; } public IJobSource JobSource { get; private set; } public int ParallelExecutionCount { get; set; } protected List<Task> Tasks { get; set; } public JobConsumer(IJobSource jobSource, int parallelExecutionCount) { JobSource = jobSource; ParallelExecutionCount = parallelExecutionCount; } public void StartConsumer() { Task.Factory.StartNew(() => { TaskScheduler currentScheduler = TaskScheduler.Current; while (true) { int availableCores = ParallelExecutionCount - Tasks.Count(); IEnumerable<AbstractJob> newJobs = JobSource.ReadNewJobs().Take(availableCores); foreach (var job in newJobs) { System.Threading.Tasks.Task task = null; task = Task.Factory.StartNew(() => { Tasks.Add(task); }, CancellationToken.None, TaskCreationOptions.None, currentScheduler) .ContinueWith(t => { // Execute job on a new thread job.ExecuteJob(); }) .ContinueWith(t => { Tasks.Remove(task); }, currentScheduler); } Thread.Sleep(5000); } }); } } public interface IJobSource { public IEnumerable<AbstractJob> ReadNewJobs(); } Input, critique and suggestions very welcome!
EDIT: I am using .NET 4.5
ExecutetoExecuteAsyncreturning aTaskand theAbstractJobtoIJob?Taskcarries everything that you put in theJobStateflag and more (e.g. if it failed, why). Then you could also drop the task creation on consumer side and use eitherConcurrentQueue<T>orBlockingCollection<T>to process the jobs, but that depends on .NET version. \$\endgroup\$