1

I am trying to use RxSwift to calculate SMA(simple moving average) and EMA(exponentially weighted moving average)

The setup is like below, the methods take the close price stream as input Observable<Double>. so every time if there is a new close price being emitted, the sma obervable will emit a new calculated value to the stream

I finished the SMA version, which is working fine

func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> { let bag = DisposeBag() return Observable<Double?>.create { observer -> Disposable in source.scan([]) { Array($0 + [$1].suffix(length)) }.subscribe(onNext: { value in if value.count < length { observer.onNext(nil) } else { observer.onNext(value.reduce(0.0, { $0 + $1 / Double(length) })) } }).disposed(by: bag) return Disposables.create() } } 

But the EMA formula is a bit complex

https://www.investopedia.com/ask/answers/122314/what-exponential-moving-average-ema-formula-and-how-ema-calculated.asp

the formula involve the previous EMA value.

I do not have clear idea how I can get the stream last value inside a Observable creation block :thinking

below is the code I tried to implement, but .withLatestFrom(ema(source, length)) did not work out

enter image description here

func ema(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> { let bag = DisposeBag() return Observable<Double?>.create { observer -> Disposable in source.scan([]) { Array($0 + [$1].suffix(length)) }.withLatestFrom(ema(source, length)) { return ($0, $1) } .subscribe(onNext: { value in let alpha: Double = Double(2) / Double(length + 1) let src = value.0 var sum: Double? = 0.0 let sum1 = value.1 sum = na(sum1) ? sma(src, length) : alpha * src.last! + (1 - alpha) * nz(sum1) observer.onNext(sum) }).disposed(by: bag) return Disposables.create() } } 

any help is greatly appreciated :pray

2 Answers 2

1

First let's cleanup your sma operator. You are creating dispose bags inside the function which is inappropriate. The subscribe returns a disposable and the create's closure needs to return a disposable. Just return the subscribe's disposable...

func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> { Observable<Double?>.create { observer -> Disposable in source .scan([]) { Array($0 + [$1].suffix(length)) } .subscribe(onNext: { value in if value.count < length { observer.onNext(nil) } else { observer.onNext(value.reduce(0.0, { $0 + $1 / Double(length) })) } }) } } 

But since you are outputting the same number of events as you input, we can simplify even more. Whenever you are outputting the same number of events as you input, think map.

func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> { source .scan([]) { Array($0 + [$1].suffix(length)) } .map { value in if value.count < length { return nil } else { return value.reduce(0.0, { $0 + $1 / Double(length) }) } } } 

And whenever you only have one Observable as input, consider making it an extension on the Observable type so it can be easily inserted into a chain...

extension ObservableType where Element == Double { func sma(_ length: Int) -> Observable<Double?> { scan([]) { Array($0 + [$1].suffix(length)) } .map { $0.count < length ? nil : $0.reduce(0.0, { $0 + $1 / Double(length) }) } } } 

Now that we have done all that, let's tackle your actual question. First express your formula as a function...

func getEMA(prices: [Double], k: Double? = nil) -> Double { guard !prices.isEmpty else { return 0 } let k = k ?? Double(2 / (prices.count + 1)) return prices[0] * k + getEMA(prices: prices.suffix(prices.count - 1), k: k) * (1 - k) } 

The above should be easy to test using some sample values. I'll leave you to do that. Once we have the above, we can use the same pattern you used for your sma to create the operator:

extension ObservableType where Element == Double { func ema(_ length: Int) -> Observable<Double?> { scan([]) { Array([$1] + $0).suffix(length) } // put the most recent price in front to correctly handle the formula .map { $0.count < length ? nil : getEMA(prices: $0) } } } 
Sign up to request clarification or add additional context in comments.

3 Comments

My initial expression of the getEMA formula was wrong. I updated the answer with the correct expression.
Huge thanks @Daniel T. The revised version of SMA looks much cleaner. Regarding to the EMA one, the problem is, it take EMA-1 in order to calculate the next EMA, its like a recursive problem. I could not find a rx operator to directly retrieve current stream's last value at the stream creation stage. So I came up with this, stackoverflow.com/a/65549401/1005570
The code I posted is recursive and solves the problem.
0

I found a solution (not sure if its the cleanest way)

which is to create a BehaviourRelay inside the function declaration while outside of the Observable.create block, which keeps a local copy of the latest EMA calculated.

in this way, it does not need function's consumer to dependency inject an Observable nor having a complex stream transformation.

Below is the EMA(exponentially weighted moving average) implementation using RxSwift

func ema(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> { let bag = DisposeBag() let lastEMA: BehaviorRelay<Double?> = BehaviorRelay.init(value: nil) return Observable<Double?>.create { observer -> Disposable in source.scan([]) { Array($0 + [$1].suffix(length)) } .subscribe(onNext: { value in let alpha: Double = Double(2) / Double(length + 1) let src = value var sum: Double? = 0.0 sum = na(lastEMA.value) ? sma(src, length) : alpha * src.last! + (1 - alpha) * nz(lastEMA.value) observer.onNext(sum) lastEMA.accept(sum) }).disposed(by: bag) return Disposables.create() } } 

Remarks:

na and nz are method copied from TradingView pinescript

https://www.tradingview.com/pine-script-reference/#var_na

https://www.tradingview.com/pine-script-reference/#fun_nz

https://www.tradingview.com/pine-script-reference/#fun_ema

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.