40

I would like to write a method which accept several parameters, including an action and a retry amount and invoke it.

So I have this code:

public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method) { object lockObj = new object(); int index = 0; return new Action(async () => { while (true) { T item; lock (lockObj) { if (index < source.Count) { item = source[index]; index++; } else break; } int retry = retries; while (retry > 0) { try { bool res = await action(item); if (res) retry = -1; else //sleep if not success.. Thread.Sleep(200); } catch (Exception e) { LoggerAgent.LogException(e, method); } finally { retry--; } } } }).RunParallel(threads); } 

RunParallel is an extention method for Action, its look like this:

public static IEnumerable<Task> RunParallel(this Action action, int amount) { List<Task> tasks = new List<Task>(); for (int i = 0; i < amount; i++) { Task task = Task.Factory.StartNew(action); tasks.Add(task); } return tasks; } 

Now, the issue: The thread is just disappearing or collapsing without waiting for the action to finish.

I wrote this example code:

private static async Task ex() { List<int> ints = new List<int>(); for (int i = 0; i < 1000; i++) { ints.Add(i); } var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) => { try { List<string> test = await fetchSmthFromDb(); Console.WriteLine("#" + num + " " + test[0]); return test[0] == "test"; } catch (Exception e) { Console.WriteLine(e.StackTrace); return false; } }, 5, "test"); await Task.WhenAll(tasks); } 

The fetchSmthFromDb is a simple Task> which fetches something from the db and works perfectly fine when invoked outside of this example.

Whenever the List<string> test = await fetchSmthFromDb(); row is invoked, the thread seems to be closing and the Console.WriteLine("#" + num + " " + test[0]); not even being triggered, also when debugging the breakpoint never hit.

The Final Working Code

private static async Task DoWithRetries(Func<Task> action, int retryCount, string method) { while (true) { try { await action(); break; } catch (Exception e) { LoggerAgent.LogException(e, method); } if (retryCount <= 0) break; retryCount--; await Task.Delay(200); }; } public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method) { Func<T, Task> newAction = async (item) => { await DoWithRetries(async ()=> { await action(item); }, retries, method); }; await source.ParallelForEachAsync(newAction, threads); } 
7
  • Are you sure your logger is thread safe? I'm getting "Thread was being aborted" when I replace it with Console.WriteLine... also what is the lock for? What are you trying to do? Commented Aug 28, 2016 at 15:09
  • I'm really confused with the example above. Why are trying to run the same action 100 times in parallel? (the RunParallel method) Is it some sort of load testing on a DB? Commented Aug 28, 2016 at 15:14
  • Logger is not thread safe, but doesn't crash for me. @SergeSemenov Im using mongodb and i cant update 100 files in one procedure like we can do in SQL, so i built a method to accept list of actions on a single enumerable and operate as a single procedure Commented Aug 28, 2016 at 15:54
  • And you did it in a wrong way because you run your while (true) loop 100 times Commented Aug 28, 2016 at 15:59
  • i would be happy for an insight Commented Aug 28, 2016 at 16:00

2 Answers 2

91

The problem is in this line:

return new Action(async () => ... 

You start an async operation with the async lambda, but don't return a task to await on. I.e. it runs on worker threads, but you'll never find out when it's done. And your program terminates before the async operation is complete -that's why you don't see any output.

It needs to be:

return new Func<Task>(async () => ... 

UPDATE

First, you need to split responsibilities of methods, so you don't mix retry policy (which should not be hardcoded to a check of a boolean result) with running tasks in parallel.

Then, as previously mentioned, you run your while (true) loop 100 times instead of doing things in parallel.

As @MachineLearning pointed out, use Task.Delay instead of Thread.Sleep.

Overall, your solution looks like this:

using System.Collections.Async; static async Task DoWithRetries(Func<Task> action, int retryCount, string method) { while (true) { try { await action(); break; } catch (Exception e) { LoggerAgent.LogException(e, method); } if (retryCount <= 0) break; retryCount--; await Task.Delay(millisecondsDelay: 200); }; } static async Task Example() { List<int> ints = new List<int>(); for (int i = 0; i < 1000; i++) ints.Add(i); Func<int, Task> actionOnItem = async item => { await DoWithRetries(async () => { List<string> test = await fetchSmthFromDb(); Console.WriteLine("#" + item + " " + test[0]); if (test[0] != "test") throw new InvalidOperationException("unexpected result"); // will be re-tried }, retryCount: 5, method: "test"); }; await ints.ParallelForEachAsync(actionOnItem, maxDegreeOfParalellism: 100); } 

You need to use the AsyncEnumerator NuGet Package in order to use the ParallelForEachAsync extension method from the System.Collections.Async namespace.

Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for the replay, but what do I do with the extension method, creates another extension for Fun<Task> ?
I suggest you to update your question with the code your changed
but why the DoWithRetries inside the thread? i could already solve it if i wrote the retries mechanism inside every method. I want something to wrap the original task and not break it. I need the Threads amount to use as input, the purpese was to save all this code wrapping the main task and not copy paste it
Thanks for mentioning my suggestion in your answer! :-)
neither the less, i modified the code so it be more generic and its working. Thanks alot
|
3

Besides the final complete reengineering, I think it's very important to underline what was really wrong with the original code.

0) First of all, as @Serge Semenov immediately pointed out, Action has to be replaced with

Func<Task> 

But there are still other two essential changes.

1) With an async delegate as argument it is necessary to use the more recent Task.Run instead of the older pattern new TaskFactory.StartNew (or otherwise you have to add Unwrap() explicitly)

2) Moreover the ex() method can't be async since Task.WhenAll must be waited with Wait() and without await.

At that point, even though there are logical errors that need reengineering, from a pure technical standpoint it does work and the output is produced.

A test is available online: http://rextester.com/HMMI93124

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.