I am trying to use RxSwift to calculate SMA(simple moving average) and EMA(exponentially weighted moving average)
The setup is like below, the methods take the close price stream as input Observable<Double>. so every time if there is a new close price being emitted, the sma obervable will emit a new calculated value to the stream
I finished the SMA version, which is working fine
func sma(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> { let bag = DisposeBag() return Observable<Double?>.create { observer -> Disposable in source.scan([]) { Array($0 + [$1].suffix(length)) }.subscribe(onNext: { value in if value.count < length { observer.onNext(nil) } else { observer.onNext(value.reduce(0.0, { $0 + $1 / Double(length) })) } }).disposed(by: bag) return Disposables.create() } } But the EMA formula is a bit complex
the formula involve the previous EMA value.
I do not have clear idea how I can get the stream last value inside a Observable creation block :thinking
below is the code I tried to implement, but .withLatestFrom(ema(source, length)) did not work out
func ema(_ source: Observable<Double>, _ length: Int) -> Observable<Double?> { let bag = DisposeBag() return Observable<Double?>.create { observer -> Disposable in source.scan([]) { Array($0 + [$1].suffix(length)) }.withLatestFrom(ema(source, length)) { return ($0, $1) } .subscribe(onNext: { value in let alpha: Double = Double(2) / Double(length + 1) let src = value.0 var sum: Double? = 0.0 let sum1 = value.1 sum = na(sum1) ? sma(src, length) : alpha * src.last! + (1 - alpha) * nz(sum1) observer.onNext(sum) }).disposed(by: bag) return Disposables.create() } } any help is greatly appreciated :pray
