23

The below method doesn't compile. Alternatives?

public static async Task<IEnumerable<object[]>> GetRecordsAsync( this Transaction transaction, string commandText, params SqlParameter[] parameters) { // Get a SqlDataReader var reader = await transaction.GetReaderAsync(commandText, parameters); var fieldCount = -1; // Begin iterating through records asynchronously while (await reader.ReadAsync()) // Note we don't loop until .ReadAsync returns a boolean { // Grab all the field values out if (fieldCount < 0) fieldCount = reader.FieldCount; var fields = new object[fieldCount]; reader.GetValues(fields); // Yield return the field values from this record yield return fields; } } 

Error message:

The body of 'TransactionExtensions.GetRecordsAsync(Transaction, string, params SqlParameter[])' cannot be an iterator block because 'Task>' is not an iterator interface type

I don't see a way to adapt this answer to a similar sounding (but different) question, because I don't know a priori how many times the loop will go.

Edit: fixed formatting

6
  • 6
    IEnumerable<T> itself doesn't support that. Use Reactive Extensions. Commented Mar 19, 2017 at 1:58
  • 1
    You can use ObservableCollection to monitor elements being added. Create and pass it to GetRecordsAsync, which shall now only return Task, and add to it once you are ready to yield fields. Now that I think about it, simply passing a "on fields received" delegate to the method is also possible. Commented Mar 19, 2017 at 2:02
  • @IllidanS4 I think that would boil down to the comment given by SLaks. Both good ideas, but Reactive Extensive brings a lot of other goodies to bear Commented Mar 19, 2017 at 14:56
  • 2
    @MattThomas, also check via Using async / await with DataReader ? ( without middle buffers!) for some alternative ideas. Commented Mar 20, 2017 at 4:29
  • 1
    @Noseratio thanks for the link. The best option I took from that was Rx. To me the answer felt like something that Rx (or more generally, pub-sub) does Commented Mar 20, 2017 at 11:52

3 Answers 3

13

Don't return a Task<IEnumerable<T>> and don't even use Task at all for this; instead, return an IAsyncEnumerable<T>. No need for third-party libraries or other workarounds, no need to even alter the body of your original method.

public static async IAsyncEnumerable<object[]> GetRecordsAsync( this Transaction transaction, string commandText, params SqlParameter[] parameters) { // Get a SqlDataReader var reader = await transaction.GetReaderAsync(commandText, parameters); var fieldCount = -1; // Begin iterating through records asynchronously while (await reader.ReadAsync()) // Note we don't loop until .ReadAsync returns a boolean { // Grab all the field values out if (fieldCount < 0) fieldCount = reader.FieldCount; var fields = new object[fieldCount]; reader.GetValues(fields); // Yield return the field values from this record yield return fields; } } 
Sign up to request clarification or add additional context in comments.

1 Comment

For those not using Core 3.0 or later, this is available via the Microsoft.Bcl.AsyncInterfaces NuGet package. You also need C# 8 or later, so those of us still mucking around in Framework have some manual project editing to do.
11

Based on @SLaks's comment to the question, here's a general alternative using Reactive Extensions:

/// <summary> /// Turns the given asynchronous functions into an IObservable /// </summary> static IObservable<T> ToObservable<T>( Func<Task<bool>> shouldLoopAsync, Func<Task<T>> getAsync) { return Observable.Create<T>( observer => Task.Run(async () => { while (await shouldLoopAsync()) { var value = await getAsync(); observer.OnNext(value); } observer.OnCompleted(); } ) ); } 

Example usage, tailored to solve the question's specific case:

/// <summary> /// Asynchronously processes each record of the given reader using the given handler /// </summary> static async Task ProcessResultsAsync(this SqlDataReader reader, Action<object[]> fieldsHandler) { // Set up async functions for the reader var shouldLoopAsync = (Func<Task<bool>>)reader.ReadAsync; var getAsync = new Func<SqlDataReader, Func<Task<object[]>>>(_reader => { var fieldCount = -1; return () => Task.Run(() => { Interlocked.CompareExchange(ref fieldCount, _reader.FieldCount, -1); var fields = new object[fieldCount]; _reader.GetValues(fields); return fields; }); })(reader); // Turn the async functions into an IObservable var observable = ToObservable(shouldLoopAsync, getAsync); // Process the fields as they become available var finished = new ManualResetEventSlim(); // This will be our signal for when the observable completes using (observable.Subscribe( onNext: fieldsHandler, // Invoke the handler for each set of fields onCompleted: finished.Set // Set the gate when the observable completes )) // Don't forget best practice of disposing IDisposables // Asynchronously wait for the gate to be set await Task.Run((Action)finished.Wait); } 

(Note that getAsync could be simplified in the above code block, but I like how explicit it is about the closure that's being created)

...and finally:

// Get a SqlDataReader var reader = await transaction.GetReaderAsync(commandText, parameters); // Do something with the records await reader.ProcessResultsAsync(fields => { /* Code here to process each record */ }); 

Comments

0

I solved it without third-party extensions:

public async Task<IEnumerable<Item>> GetAllFromDb() { OracleConnection connection = null; DbDataReader reader = null; try { connection = new OracleConnection(connectionString); var command = new OracleCommand(queryString, connection); connection.Open(); reader = await command.ExecuteReaderAsync(); return this.BuildEnumerable(connection, reader); } catch (Exception) { reader?.Dispose(); connection?.Dispose(); throw; } } private IEnumerable<Item> BuildEnumerable(OracleConnection connection, DbDataReader reader) { using (connection) using (reader) { while (reader.Read()) { var item = new Item() { Prop = reader.GetString(0), }; yield return item; } } } 

This example is for Oracle Data Reader but the same approach is applicable to any asynchronous operation combined with yield return

2 Comments

But at the heart of this you call reader.Read(), the blocking method. It may compile and run but I don't think you are getting the real async benefits.
Just wanted to point that we do have asynchronous operation: reader = await command.ExecuteReaderAsync(); so if implemented properly this operation will unblock the thread. Regarding Read operation - I agree with @Henk-Holterman, it is synchronous. But it is best trade off possible if you want to use standard C#, Linq, foreach, etc.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.