7

I want a generic way to convert an asynchronous method to an observable. In my case, I'm dealing with methods that uses HttpClient to fetch data from an API.

Let's say we have the method Task<string> GetSomeData() that needs to become a single Observable<string> where the values is generated as a combination of:

  • Repeated periodic calls to GetSomeData() (for example every x seconds)
  • Manually triggered calls to GetSomeData() at any given time (for example when user hits refresh).

Since there is two ways to trigger execution of GetSomeData() concurrency can be an issue. To avoid demanding that GetSomeData() is thread-safe, I want to limit the concurrency so that only one thread is executing the method at the same time. As a consequence I need to handle overlapping requests with some strategy. I made a (kind of) marble diagram trying to describe the problem and wanted outcome

marble diagram

My instinct tells me there is a simple way to achieve this, so please give me some insights :)

This is the solution I've got so far. It unfortunately doesn't solve the concurrency problem.

 public class ObservableCreationWrapper<T> { private Subject<Unit> _manualCallsSubject = new Subject<Unit>(); private Func<Task<T>> _methodToCall; private IObservable<T> _manualCalls; public IObservable<T> Stream { get; private set; } public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period) { _methodToCall = methodToCall; _manualCalls = _manualCallsSubject.AsObservable() .Select(x => Observable.FromAsync(x => methodToCall())) .Merge(1); Stream = Observable.FromAsync(() => _methodToCall()) .DelayRepeat(period) .Merge(_manualCalls); } public void TriggerAdditionalCall() { _manualCallsSubject.OnNext(Unit.Default); } } 

Extension method for repeating with delay:

static class Extensions { public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source .Concat( Observable.Create<T>(async observer => { await Task.Delay(delay); observer.OnCompleted(); })) .Repeat(); } 

An example of a service containing the method to generate the observable

class SomeService { private int _ticks = 0; public async Task<string> GetSomeValueAsync() { //Just a hack to dermine if request was triggered manuall or by timer var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer"; //Here we have a data race! We would like to limit access to this method var valueToReturn = $"{_ticks} ({initiatationWay})"; await Task.Delay(500); _ticks += 1; return valueToReturn; } } 

Used like this (data race will occur):

static async Task Main(string[] args) { //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync var someService = new SomeService(); var stopwatch = Stopwatch.StartNew(); var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000)); observableWrapper.Stream .Take(6) .Subscribe(x => { Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed"); }); await Task.Delay(4000); observableWrapper.TriggerAdditionalCall(); observableWrapper.TriggerAdditionalCall(); Console.ReadLine(); } 
8
  • You're right that it's simple, but please give us some code to work with. What do the "regular async endpoints" look like? What does a "signal from the application" look like? A button click? A timer? Commented Nov 4, 2020 at 11:18
  • @Enigmativity I've added example code of what I've got so far. It works, but does not fulfill my requirements entirely. Also, I suspect there are some improvements to be made. Commented Nov 5, 2020 at 11:15
  • 1
    I have a hard time understanding the requirements for an acceptable solution. Could you design a marble diagram showing sample input and output data? You don't need to draw it in Photoshop, you can just use plain ASCII characters like this: Source: +--1-2-3--4--|, Result: +--A-B-C--D--|. Commented Nov 11, 2020 at 15:41
  • @TheodorZoulias Thanks for the feedback! I Re-wrote the question to make it clearer and added a (kind of) marble diagram. Commented Nov 11, 2020 at 21:04
  • Nice marble diagram! The question is much cleared now IMHO. I don't know why it was downvoted. Commented Nov 12, 2020 at 5:11

3 Answers 3

5
+100

Here is my take on this problem:


Update: I was able to simplify greatly my suggested solution by borrowing ideas from Enigmativity's answer. The Observable.StartAsync method handles the messy business of cancellation automatically¹, and the requirement of non-overlapping execution can be enforced simply by using a SemaphoreSlim.

/// <summary> /// Creates an observable sequence containing the results of an asynchronous /// function that is invoked periodically and manually. Overlapping invocations /// are prevented. Timer ticks that would cause overlapping are ignored. /// Manual invocations cancel previous invocations, and restart the timer. /// </summary> public static IObservable<T> PeriodicAndManual<T>( Func<bool, CancellationToken, Task<T>> functionAsync, TimeSpan period, out Action manualInvocation) { // Arguments validation omitted var manualSubject = new Subject<bool>(); manualInvocation = () => manualSubject.OnNext(true); return Observable.Defer(() => { var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping return Observable .Interval(period) .Select(_ => false) // Not manual .Merge(manualSubject) .TakeUntil(isManual => isManual) // Stop on first manual .Repeat() // ... and restart the timer .Prepend(false) // Skip the initial interval delay .Select(isManual => { if (isManual) { // Triggered manually return Observable.StartAsync(async ct => { await semaphore.WaitAsync(ct); try { return await functionAsync(isManual, ct); } finally { semaphore.Release(); } }); } else if (semaphore.Wait(0)) { // Triggered by the timer and semaphore acquired synchronously return Observable .StartAsync(ct => functionAsync(isManual, ct)) .Finally(() => semaphore.Release()); } return null; // Otherwise ignore the signal }) .Where(op => op != null) .Switch(); // Pending operations are unsubscribed and canceled }); } 

The out Action manualInvocation argument is the mechanism that triggers a manual invocation.

Usage example:

int ticks = 0; var subscription = PeriodicAndManual(async (isManual, token) => { var id = $"{++ticks} " + (isManual ? "manual" : "periodic"); Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}"); await Task.Delay(500, token); return id; }, TimeSpan.FromMilliseconds(1000), out var manualInvocation) .Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}")) .Subscribe(); await Task.Delay(3200); manualInvocation(); await Task.Delay(200); manualInvocation(); await Task.Delay(3200); subscription.Dispose(); 

Output:

19:52:43.684 Begin 1 periodic 19:52:44.208 Received #1 periodic 19:52:44.731 Begin 2 periodic 19:52:45.235 Received #2 periodic 19:52:45.729 Begin 3 periodic 19:52:46.232 Received #3 periodic 19:52:46.720 Begin 4 periodic 19:52:46.993 Begin 5 manual 19:52:47.220 Begin 6 manual 19:52:47.723 Received #6 manual 19:52:48.223 Begin 7 periodic 19:52:48.728 Received #7 periodic 19:52:49.227 Begin 8 periodic 19:52:49.730 Received #8 periodic 19:52:50.226 Begin 9 periodic 

The technique of using the Scan and the DistinctUntilChanged operators in order to drop elements while the previous asynchronous operation is running, is borrowed from this question.

¹ It seems that the Rx library does not handle this messy business satisfactory though, since it just omits disposing of the CancellationTokenSources it creates.

Sign up to request clarification or add additional context in comments.

3 Comments

The implementation could become even simpler if the Observable.StartAsync returned a type that is both a Task and an IObservable. But alas it doesn't. Creating such a type is possible, but tricky.
Seriously impressive answer! I've tested and it works and solves my problem. I will most likely mark as correct answer, but first i want to understand all the suggested answers entirely. @TheodorZoulias is the CancellationToken necessary for preventing overlap in this version?
@figursagsmats thanks mate! No, observing the CancellationToken is optional, and will not affect the enforcement of the single-concurrent-execution restriction. But it is preferable that the functionAsync reacts promptly to a cancellation signal. Otherwise a discarded periodic operation may keep running in the background, preventing a manually requested operation from starting in a timely manner.
0

Here's the query that you need:

var subject = new Subject<Unit>(); var delay = TimeSpan.FromSeconds(1.0); IObservable<string> query = subject .StartWith(Unit.Default) .Select(x => Observable.Timer(TimeSpan.Zero, delay)) .Switch() .SelectMany(x => Observable.FromAsync(() => GetSomeData())); 

If any time you call subject.OnNext(Unit.Default) it will immediately trigger a call to GetSomeData and when then repeat the call based on the TimeSpan set in delay.

The use of .StartWith(Unit.Default) will set the query going immediately there is a subscriber.

The use of .Switch() cancels any pending operations based on a new subject.OnNext(Unit.Default) being called.

This should match your marble diagram.


The above version didn't introduce the delay between values.

Version 2 should.

var subject = new Subject<Unit>(); var delay = TimeSpan.FromSeconds(5.0); var source = Observable.FromAsync(() => GetSomeData()); IObservable<string> query = subject .StartWith(Unit.Default) .Select(x => source.Expand(n => Observable.Timer(delay).SelectMany(y => source))) .Switch(); 

I've used the Expand operator to introduce a delay between values. As long as source only produces a single value (which FromAsync does) this should work just fine.

6 Comments

There are some really neat and clever tricks in this implementation! But is seems that is doesn't prevent overlapping executions (caused by tasks having duration larger than delay), and also that it doesn't cancel the running operations when a manual invocation is triggered. So the results of all invocations are appearing eventually in the resulting stream.
@TheodorZoulias - It certainly does cancel with the .Switch(), but with the signature of Task<string> GetSomeData() there is no way to cancel that unless it is changed to Task<string> GetSomeData(CancellationToken ct) - then the final line of the query becomes .SelectMany(x => Observable.FromAsync(ct => GetSomeData(ct)));.
@TheodorZoulias - I've updated my answer with a query that should add the required delay. It worked in my testing.
I experimented with the Switch and the cancelable version of Observable.FromAsync. Very interesting! It requires though that the Switch affects the FromAsync observables, which is not the case in your Version 1 example. Instead it affects the Timer observables. The FromAsync are created after the Switch, and their cancellation occurs immediately after their normal completion (which is too late and ineffective). Also the cancellation is "best-effort", meaning that it is cooperative. The old tasks are not awaited before starting the new tasks, so the overlapping is still possible.
The Version 2 is also interesting. On the plus side the cancellation is functional and the overlapping is avoided (provided that the GetSomeData honors the token and completes synchronously when it receives the notification). The downsides are that it uses an Experimental operator, and that the invocation is not periodic. There is a constant delay between finishing one action and starting the next, instead of a constant delay between the starting points of subsequent actions (which is what the OP's marble diagram indicates).
|
0

I'd suggest not try to cancel an already started call. Things will get too messy. If the logic in GetSomeValueAsync involves database call and/or web API call, you simply cannot really cancel the call.

I think the key here is to make sure all the calls to GetSomeValueAsync are serialized.

I created the following solution based on Enigmativity's Version 1. It is tested on a webassembly blazor page on asp.net core 3.1, works fine.

private int _ticks = 0; //simulate a resource you want serialized access //for manual event, trigger will be 0; for Timer event, trigger will be 1,2,3... protected async Task<string> GetSomeValueAsync(string trigger) { var valueToReturn = $"{DateTime.Now.Ticks.ToString()}: {_ticks.ToString()} | ({trigger})"; await Task.Delay(1000); _ticks += 1; return valueToReturn; } //define two subjects private Subject<string> _testSubject = new Subject<string>(); private Subject<string> _getDataSubject = new Subject<string>(); //driving observable, based on Enigmativity's Version 1 var delay = TimeSpan.FromSeconds(3.0); IObservable<string> getDataObservable = _testSubject .StartWith("Init") .Select(x => Observable.Timer(TimeSpan.Zero, delay).Select(i => i.ToString())) .Switch() .WithLatestFrom(_getDataSubject.AsObservable().StartWith("IDLE")) .Where(a => a.Second == "IDLE") .Select(a => a.First); //_disposables is CompositeDisposable defined in the page _disposables.Add(getDataObservable.Subscribe(async t => { _getDataSubject.OnNext("WORKING"); //_service.LogToConsole is my helper function to log data to console await _service.LogToConsole(await GetSomeValueAsync(t)); _getDataSubject.OnNext("IDLE"); })); 

That is it. I used a button to trigger manual events. The _ticks in output is always in sequence, that is, no overlapping happened.

1 Comment

Just add a button on page and click it - <button @onclick="_ => _testSubject.OnNext(string.Empty)">Event test</button>

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.