I would like to write a method which accept several parameters, including an action and a retry amount and invoke it.
So I have this code:
public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method) { object lockObj = new object(); int index = 0; return new Action(async () => { while (true) { T item; lock (lockObj) { if (index < source.Count) { item = source[index]; index++; } else break; } int retry = retries; while (retry > 0) { try { bool res = await action(item); if (res) retry = -1; else //sleep if not success.. Thread.Sleep(200); } catch (Exception e) { LoggerAgent.LogException(e, method); } finally { retry--; } } } }).RunParallel(threads); } RunParallel is an extention method for Action, its look like this:
public static IEnumerable<Task> RunParallel(this Action action, int amount) { List<Task> tasks = new List<Task>(); for (int i = 0; i < amount; i++) { Task task = Task.Factory.StartNew(action); tasks.Add(task); } return tasks; } Now, the issue: The thread is just disappearing or collapsing without waiting for the action to finish.
I wrote this example code:
private static async Task ex() { List<int> ints = new List<int>(); for (int i = 0; i < 1000; i++) { ints.Add(i); } var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) => { try { List<string> test = await fetchSmthFromDb(); Console.WriteLine("#" + num + " " + test[0]); return test[0] == "test"; } catch (Exception e) { Console.WriteLine(e.StackTrace); return false; } }, 5, "test"); await Task.WhenAll(tasks); } The fetchSmthFromDb is a simple Task> which fetches something from the db and works perfectly fine when invoked outside of this example.
Whenever the List<string> test = await fetchSmthFromDb(); row is invoked, the thread seems to be closing and the Console.WriteLine("#" + num + " " + test[0]); not even being triggered, also when debugging the breakpoint never hit.
The Final Working Code
private static async Task DoWithRetries(Func<Task> action, int retryCount, string method) { while (true) { try { await action(); break; } catch (Exception e) { LoggerAgent.LogException(e, method); } if (retryCount <= 0) break; retryCount--; await Task.Delay(200); }; } public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method) { Func<T, Task> newAction = async (item) => { await DoWithRetries(async ()=> { await action(item); }, retries, method); }; await source.ParallelForEachAsync(newAction, threads); }
RunParallelmethod) Is it some sort of load testing on a DB?while (true)loop 100 times