Skip to content

Commit 321ab3c

Browse files
author
Paul Betts
committed
Merge pull request #645 from reactiveui/command-improvements
Finish up ReactiveCommand
2 parents a055be5 + 5d44ea9 commit 321ab3c

10 files changed

+393
-20
lines changed
Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics.Contracts;
4+
using System.Linq;
5+
using System.Reactive;
6+
using System.Reactive.Concurrency;
7+
using System.Reactive.Threading.Tasks;
8+
using System.Reactive.Linq;
9+
using System.Reactive.Subjects;
10+
using System.Text;
11+
using System.Threading;
12+
using System.Threading.Tasks;
13+
using System.Windows.Input;
14+
using System.Linq.Expressions;
15+
using ReactiveUI;
16+
17+
using LegacyRxCmd = ReactiveUI.Legacy.ReactiveCommand;
18+
19+
namespace ReactiveUI.Legacy
20+
{
21+
/// <summary>
22+
/// ReactiveCommand is the default Command implementation in ReactiveUI, which
23+
/// conforms to the spec described in IReactiveCommand.
24+
/// </summary>
25+
public class ReactiveCommand : IReactiveCommand, IObservable<object>
26+
{
27+
IDisposable innerDisp;
28+
29+
readonly Subject<bool> inflight = new Subject<bool>();
30+
readonly ScheduledSubject<Exception> exceptions;
31+
readonly Subject<object> executed = new Subject<object>();
32+
readonly IScheduler defaultScheduler;
33+
34+
public ReactiveCommand() : this(null, false, null) { }
35+
public ReactiveCommand(IObservable<bool> canExecute, bool initialCondition = true) : this(canExecute, false, null, initialCondition) { }
36+
37+
public ReactiveCommand(IObservable<bool> canExecute, bool allowsConcurrentExecution, IScheduler scheduler, bool initialCondition = true)
38+
{
39+
canExecute = canExecute ?? Observable.Return(true);
40+
defaultScheduler = scheduler ?? RxApp.MainThreadScheduler;
41+
AllowsConcurrentExecution = allowsConcurrentExecution;
42+
43+
canExecute = canExecute.Catch<bool, Exception>(ex => {
44+
exceptions.OnNext(ex);
45+
return Observable.Empty<bool>();
46+
});
47+
48+
ThrownExceptions = exceptions = new ScheduledSubject<Exception>(defaultScheduler, RxApp.DefaultExceptionHandler);
49+
50+
var isExecuting = inflight
51+
.Scan(0, (acc, x) => acc + (x ? 1 : -1))
52+
.Select(x => x > 0)
53+
.Publish(false)
54+
.PermaRef()
55+
.DistinctUntilChanged();
56+
57+
IsExecuting = isExecuting.ObserveOn(defaultScheduler);
58+
59+
var isBusy = allowsConcurrentExecution ? Observable.Return(false) : isExecuting;
60+
var canExecuteAndNotBusy = Observable.CombineLatest(canExecute, isBusy, (ce, b) => ce && !b);
61+
62+
var canExecuteObs = canExecuteAndNotBusy
63+
.Publish(initialCondition)
64+
.RefCount();
65+
66+
CanExecuteObservable = canExecuteObs
67+
.DistinctUntilChanged()
68+
.ObserveOn(defaultScheduler);
69+
70+
innerDisp = canExecuteObs.Subscribe(x => {
71+
if (canExecuteLatest == x) return;
72+
73+
canExecuteLatest = x;
74+
defaultScheduler.Schedule(() => this.raiseCanExecuteChanged(EventArgs.Empty));
75+
}, exceptions.OnNext);
76+
}
77+
78+
/// <summary>
79+
/// This creates a ReactiveCommand that calls several child
80+
/// ReactiveCommands when invoked. Its CanExecute will match the
81+
/// combined result of the child CanExecutes (i.e. if any child
82+
/// commands cannot execute, neither can the parent)
83+
/// </summary>
84+
/// <param name="canExecute">An Observable that determines whether the
85+
/// parent command can execute</param>
86+
/// <param name="commands">The commands to combine.</param>
87+
public static LegacyRxCmd CreateCombined(IObservable<bool> canExecute, params ReactiveCommand[] commands)
88+
{
89+
var childrenCanExecute = commands
90+
.Select(x => x.CanExecuteObservable)
91+
.CombineLatest(latestCanExecute => latestCanExecute.All(x => x != false));
92+
93+
var canExecuteSum = Observable.CombineLatest(
94+
canExecute.StartWith(true),
95+
childrenCanExecute,
96+
(parent, child) => parent && child);
97+
98+
var ret = new LegacyRxCmd(canExecuteSum);
99+
ret.Subscribe(x => {
100+
foreach (var cmd in commands) cmd.Execute(x);
101+
});
102+
103+
return ret;
104+
}
105+
106+
public static ReactiveCommand CreateCombined(params ReactiveCommand[] commands)
107+
{
108+
return CreateCombined(Observable.Return(true), commands);
109+
}
110+
111+
/// <summary>
112+
/// Registers an asynchronous method to be called whenever the command
113+
/// is Executed. This method returns an IObservable representing the
114+
/// asynchronous operation, and is allowed to OnError / should OnComplete.
115+
/// </summary>
116+
/// <returns>A filtered version of the Observable which is marshaled
117+
/// to the UI thread. This Observable should only report successes and
118+
/// instead send OnError messages to the ThrownExceptions property.</returns>
119+
/// <param name="asyncBlock">The asynchronous method to call.</param>
120+
/// <typeparam name="T">The 1st type parameter.</typeparam>
121+
public IObservable<T> RegisterAsync<T>(Func<object, IObservable<T>> asyncBlock)
122+
{
123+
var ret = executed.Select(x => {
124+
return asyncBlock(x)
125+
.Catch<T, Exception>(ex => {
126+
exceptions.OnNext(ex);
127+
return Observable.Empty<T>();
128+
})
129+
.Finally(() => { lock (inflight) { inflight.OnNext(false); } });
130+
});
131+
132+
return ret
133+
.Do(_ => { lock (inflight) { inflight.OnNext(true); } })
134+
.Merge()
135+
.ObserveOn(defaultScheduler)
136+
.Publish().RefCount();
137+
}
138+
139+
/// <summary>
140+
/// Gets a value indicating whether this instance is executing. This
141+
/// Observable is guaranteed to always return a value immediately (i.e.
142+
/// it is backed by a BehaviorSubject), meaning it is safe to determine
143+
/// the current state of the command via IsExecuting.First()
144+
/// </summary>
145+
/// <value>true</value>
146+
/// <c>false</c>
147+
public IObservable<bool> IsExecuting { get; protected set; }
148+
149+
public bool AllowsConcurrentExecution { get; protected set; }
150+
151+
/// <summary>
152+
/// Fires whenever an exception would normally terminate ReactiveUI
153+
/// internal state.
154+
/// </summary>
155+
/// <value>The thrown exceptions.</value>
156+
public IObservable<Exception> ThrownExceptions { get; protected set; }
157+
158+
public IDisposable Subscribe(IObserver<object> observer)
159+
{
160+
return executed.Subscribe(
161+
Observer.Create<object>(
162+
x => marshalFailures(observer.OnNext, x),
163+
ex => marshalFailures(observer.OnError, ex),
164+
() => marshalFailures(observer.OnCompleted)));
165+
}
166+
167+
bool canExecuteLatest;
168+
public bool CanExecute(object parameter)
169+
{
170+
return canExecuteLatest;
171+
}
172+
173+
public event EventHandler CanExecuteChanged;
174+
175+
public void Execute(object parameter)
176+
{
177+
lock(inflight) { inflight.OnNext(true); }
178+
executed.OnNext(parameter);
179+
lock(inflight) { inflight.OnNext(false); }
180+
}
181+
182+
public IObservable<bool> CanExecuteObservable { get; protected set; }
183+
184+
public void Dispose()
185+
{
186+
var disp = Interlocked.Exchange(ref innerDisp, null);
187+
if (disp != null) disp.Dispose();
188+
}
189+
190+
void marshalFailures<T>(Action<T> block, T param)
191+
{
192+
try {
193+
block(param);
194+
} catch (Exception ex) {
195+
exceptions.OnNext(ex);
196+
}
197+
}
198+
199+
void marshalFailures(Action block)
200+
{
201+
marshalFailures(_ => block(), Unit.Default);
202+
}
203+
204+
protected virtual void raiseCanExecuteChanged(EventArgs e)
205+
{
206+
var handler = this.CanExecuteChanged;
207+
208+
if (handler != null) {
209+
handler(this, e);
210+
}
211+
}
212+
}
213+
214+
public static class ReactiveCommandMixins
215+
{
216+
/// <summary>
217+
/// ToCommand is a convenience method for returning a new
218+
/// ReactiveCommand based on an existing Observable chain.
219+
/// </summary>
220+
/// <param name="scheduler">The scheduler to publish events on - default
221+
/// is RxApp.MainThreadScheduler.</param>
222+
/// <returns>A new ReactiveCommand whose CanExecute Observable is the
223+
/// current object.</returns>
224+
public static ReactiveCommand ToCommand(this IObservable<bool> This, bool allowsConcurrentExecution = false, IScheduler scheduler = null)
225+
{
226+
return new ReactiveCommand(This, allowsConcurrentExecution, scheduler);
227+
}
228+
229+
/// <summary>
230+
/// A utility method that will pipe an Observable to an ICommand (i.e.
231+
/// it will first call its CanExecute with the provided value, then if
232+
/// the command can be executed, Execute() will be called)
233+
/// </summary>
234+
/// <param name="command">The command to be executed.</param>
235+
/// <returns>An object that when disposes, disconnects the Observable
236+
/// from the command.</returns>
237+
public static IDisposable InvokeCommand<T>(this IObservable<T> This, ICommand command)
238+
{
239+
return This.ObserveOn(RxApp.MainThreadScheduler).Subscribe(x => {
240+
if (!command.CanExecute(x)) {
241+
return;
242+
}
243+
command.Execute(x);
244+
});
245+
}
246+
247+
/// <summary>
248+
/// A utility method that will pipe an Observable to an ICommand (i.e.
249+
/// it will first call its CanExecute with the provided value, then if
250+
/// the command can be executed, Execute() will be called)
251+
/// </summary>
252+
/// <param name="target">The root object which has the Command.</param>
253+
/// <param name="commandProperty">The expression to reference the Command.</param>
254+
/// <returns>An object that when disposes, disconnects the Observable
255+
/// from the command.</returns>
256+
public static IDisposable InvokeCommand<T, TTarget>(this IObservable<T> This, TTarget target, Expression<Func<TTarget, ICommand>> commandProperty)
257+
{
258+
return This.CombineLatest(target.WhenAnyValue(commandProperty), (val, cmd) => new { val, cmd })
259+
.ObserveOn(RxApp.MainThreadScheduler)
260+
.Subscribe(x => {
261+
if (!x.cmd.CanExecute(x.val)) {
262+
return;
263+
}
264+
265+
x.cmd.Execute(x.val);
266+
});
267+
}
268+
269+
/// <summary>
270+
/// RegisterAsyncFunction registers an asynchronous method that returns a result
271+
/// to be called whenever the Command's Execute method is called.
272+
/// </summary>
273+
/// <param name="calculationFunc">The function to be run in the
274+
/// background.</param>
275+
/// <param name="scheduler"></param>
276+
/// <returns>An Observable that will fire on the UI thread once per
277+
/// invoecation of Execute, once the async method completes. Subscribe to
278+
/// this to retrieve the result of the calculationFunc.</returns>
279+
public static IObservable<TResult> RegisterAsyncFunction<TResult>(this LegacyRxCmd This,
280+
Func<object, TResult> calculationFunc,
281+
IScheduler scheduler = null)
282+
{
283+
Contract.Requires(calculationFunc != null);
284+
285+
var asyncFunc = calculationFunc.ToAsync(scheduler ?? RxApp.TaskpoolScheduler);
286+
return This.RegisterAsync(asyncFunc);
287+
}
288+
289+
/// <summary>
290+
/// RegisterAsyncAction registers an asynchronous method that runs
291+
/// whenever the Command's Execute method is called and doesn't return a
292+
/// result.
293+
/// </summary>
294+
/// <param name="calculationFunc">The function to be run in the
295+
/// background.</param>
296+
public static IObservable<Unit> RegisterAsyncAction(this LegacyRxCmd This,
297+
Action<object> calculationFunc,
298+
IScheduler scheduler = null)
299+
{
300+
Contract.Requires(calculationFunc != null);
301+
302+
// NB: This PermaRef isn't exactly correct, but the people using
303+
// this method probably are Doing It Wrong, so let's let them
304+
// continue to do so.
305+
return This.RegisterAsyncFunction(x => { calculationFunc(x); return new Unit(); }, scheduler)
306+
.Publish().PermaRef();
307+
}
308+
309+
/// <summary>
310+
/// RegisterAsyncTask registers an TPL/Async method that runs when a
311+
/// Command gets executed and returns the result
312+
/// </summary>
313+
/// <returns>An Observable that will fire on the UI thread once per
314+
/// invoecation of Execute, once the async method completes. Subscribe to
315+
/// this to retrieve the result of the calculationFunc.</returns>
316+
public static IObservable<TResult> RegisterAsyncTask<TResult>(this LegacyRxCmd This, Func<object, Task<TResult>> calculationFunc)
317+
{
318+
Contract.Requires(calculationFunc != null);
319+
return This.RegisterAsync(x => calculationFunc(x).ToObservable());
320+
}
321+
322+
/// <summary>
323+
/// RegisterAsyncTask registers an TPL/Async method that runs when a
324+
/// Command gets executed and returns no result.
325+
/// </summary>
326+
/// <param name="calculationFunc">The function to be run in the
327+
/// background.</param>
328+
/// <returns>An Observable that signals when the Task completes, on
329+
/// the UI thread.</returns>
330+
public static IObservable<Unit> RegisterAsyncTask(this LegacyRxCmd This, Func<object, Task> calculationFunc)
331+
{
332+
Contract.Requires(calculationFunc != null);
333+
334+
// NB: This PermaRef isn't exactly correct, but the people using
335+
// this method probably are Doing It Wrong, so let's let them
336+
// continue to do so.
337+
return This.RegisterAsync(x => calculationFunc(x).ToObservable())
338+
.Publish().PermaRef();
339+
}
340+
}
341+
}

0 commit comments

Comments
 (0)