Skip to content

A simple API, and a Rich API

johnmcclean-aol edited this page Nov 23, 2016 · 9 revisions

SimpleReact Streams

LazyFutureStream has all the functionality in this page.

SimpleReactStream is a simpler, more focused API with only the SimpleReact Core API features (and some extensions).

SimpleReact Core API

The core SimpleReact API remains very simple. Although it has expanded slightly since the initial release it is today :-

  • with
  • then
  • doOnEach
  • retry
  • onFail
  • capture
  • block
  • allOf
  • anyOf
  • run
  • toQueue
  • flatMap
  • peek
  • filter
  • merge

These are the concurrent non-blocking operations (except for block!) that represent the core of the API.

#java.util.stream.Stream

With SimpleReact v0.3 we have also added all the methods of the Stream api to this 👍

  • filter(Predicate<? super T>)
  • map(Function<? super T, ? extends R>)
  • mapToInt(ToIntFunction<? super T>)
  • mapToLong(ToLongFunction<? super T>)
  • mapToDouble(ToDoubleFunction<? super T>)
  • flatMap(Function<? super T, ? extends Stream<? extends R>>)
  • flatMapToInt(Function<? super T, ? extends IntStream>)
  • flatMapToLong(Function<? super T, ? extends LongStream>)
  • flatMapToDouble(Function<? super T, ? extends DoubleStream>)
  • distinct()
  • sorted()
  • sorted(Comparator<? super T>)
  • peek(Consumer<? super T>)
  • limit(long)
  • skip(long)
  • forEach(Consumer<? super T>)
  • forEachOrdered(Consumer<? super T>)
  • toArray()
  • toArray(IntFunction<A[]>)
  • reduce(T, BinaryOperator)
  • reduce(BinaryOperator)
  • reduce(U, BiFunction<U, ? super T, U>, BinaryOperator)
  • collect(Supplier, BiConsumer<R, ? super T>, BiConsumer<R, R>)
  • collect(Collector<? super T, A, R>)
  • min(Comparator<? super T>)
  • max(Comparator<? super T>)
  • count()
  • anyMatch(Predicate<? super T>)
  • allMatch(Predicate<? super T>)
  • noneMatch(Predicate<? super T>)
  • findFirst()
  • findAny()
  • builder()
  • empty()
  • of(T)
  • of(T...)
  • iterate(T, UnaryOperator)
  • generate(Supplier)
  • concat(Stream<? extends T>, Stream<? extends T>)

#org.jooq.lambda.Seq

And we have also implemented Seq, which adds the following functions

  • stream()
  • concat(Stream)
  • concat(T)
  • concat(T...)
  • cycle()
  • zip(Seq)
  • zip(Seq, BiFunction<T, U, R>)
  • zipWithIndex()
  • foldLeft(U, BiFunction<U, ? super T, U>)
  • foldRight(U, BiFunction<? super T, U, U>)
  • scanLeft(U, BiFunction<U, ? super T, U>)
  • scanRight(U, BiFunction<? super T, U, U>)
  • reverse()
  • shuffle()
  • shuffle(Random)
  • skipWhile(Predicate<? super T>)
  • skipUntil(Predicate<? super T>)
  • limitWhile(Predicate<? super T>)
  • limitUntil(Predicate<? super T>)
  • intersperse(T)
  • duplicate()
  • partition(Predicate<? super T>)
  • splitAt(long)
  • splitAtHead()
  • slice(long, long)
  • toCollection(Supplier)
  • toList()
  • toSet()
  • toMap(Function<T, K>, Function<T, V>)
  • toString(String)
  • minBy(Function<T, U>)
  • minBy(Function<T, U>, Comparator<? super U>)
  • maxBy(Function<T, U>)
  • maxBy(Function<T, U>, Comparator<? super U>)
  • ofType(Class)
  • cast(Class)
  • groupBy(Function<? super T, ? extends K>)
  • groupBy(Function<? super T, ? extends K>, Collector<? super T, A, D>)
  • groupBy(Function<? super T, ? extends K>, Supplier, Collector<? super T, A, D>)
  • join()
  • join(CharSequence)
  • join(CharSequence, CharSequence, CharSequence)
  • of(T)
  • of(T...)
  • empty()
  • iterate(T, UnaryOperator)
  • generate()
  • generate(T)
  • generate(Supplier)
  • seq(Stream)
  • seq(Iterable)
  • seq(Iterator)
  • seq(Map<K, V>)
  • seq(Optional)
  • cycle(Stream)
  • unzip(Stream<Tuple2<T1, T2>>)
  • unzip(Stream<Tuple2<T1, T2>>, Function<T1, U1>, Function<T2, U2>)
  • unzip(Stream<Tuple2<T1, T2>>, Function<Tuple2<T1, T2>, Tuple2<U1, U2>>)
  • unzip(Stream<Tuple2<T1, T2>>, BiFunction<T1, T2, Tuple2<U1, U2>>)
  • zip(Stream, Stream)
  • zip(Stream, Stream, BiFunction<T1, T2, R>)
  • zipWithIndex(Stream)
  • foldLeft(Stream, U, BiFunction<U, ? super T, U>)
  • foldRight(Stream, U, BiFunction<? super T, U, U>)
  • scanLeft(Stream, U, BiFunction<U, ? super T, U>)
  • scanRight(Stream, U, BiFunction<? super T, U, U>)
  • unfold(U, Function<U, Optional<Tuple2<T, U>>>)
  • reverse(Stream)
  • shuffle(Stream)
  • shuffle(Stream, Random)
  • concat(Stream...)
  • duplicate(Stream)
  • toString(Stream<?>)
  • toString(Stream<?>, String)
  • toCollection(Stream, Supplier)
  • toList(Stream)
  • toSet(Stream)
  • toMap(Stream<Tuple2<K, V>>)
  • toMap(Stream, Function<T, K>, Function<T, V>)
  • slice(Stream, long, long)
  • skip(Stream, long)
  • skipWhile(Stream, Predicate<? super T>)
  • skipUntil(Stream, Predicate<? super T>)
  • limit(Stream, long)
  • limitWhile(Stream, Predicate<? super T>)
  • limitUntil(Stream, Predicate<? super T>)
  • intersperse(Stream, T)
  • partition(Stream, Predicate<? super T>)
  • splitAt(Stream, long)
  • splitAtHead(Stream)
  • ofType(Stream, Class)
  • cast(Stream, Class)
  • groupBy(Stream, Function<? super T, ? extends K>)
  • groupBy(Stream, Function<? super T, ? extends K>, Collector<? super T, A, D>)
  • groupBy(Stream, Function<? super T, ? extends K>, Supplier, Collector<? super T, A, D>)
  • join(Stream<?>)
  • join(Stream<?>, CharSequence)
  • join(Stream<?>, CharSequence, CharSequence, CharSequence)
  • filter(Predicate<? super T>)
  • map(Function<? super T, ? extends R>)
  • mapToInt(ToIntFunction<? super T>)
  • mapToLong(ToLongFunction<? super T>)
  • mapToDouble(ToDoubleFunction<? super T>)
  • flatMap(Function<? super T, ? extends Stream<? extends R>>)
  • flatMapToInt(Function<? super T, ? extends IntStream>)
  • flatMapToLong(Function<? super T, ? extends LongStream>)
  • flatMapToDouble(Function<? super T, ? extends DoubleStream>)
  • distinct()
  • sorted()
  • sorted(Comparator<? super T>)
  • peek(Consumer<? super T>)
  • limit(long)
  • skip(long)
  • onClose(Runnable)
  • close()
  • sequential()
  • parallel()
  • unordered()
  • spliterator()
  • forEach(Consumer<? super T>)

#com.aol.cyclops.sequence.ReactiveSeq

* flatten() * Optional<List<T>> toOptional(); * CompletableFuture<List<T>> toCompletableFuture(); * cycle(int times) * cycle() * cycle(Monoid<T> m, int times) ; * <R> ReactiveSeq<R> cycle(Class<R> monadC, int times); * ReactiveSeq<T> cycleWhile(Predicate<? super T> predicate); * ReactiveSeq<T> cycleUntil(Predicate<? super T> predicate); * <U> ReactiveSeq<Tuple2<T, U>> zipStream(Stream<U> other); * <S,U> ReactiveSeq<Tuple3<T,S,U>> zip3(Stream<? extends S> second,Stream<? extends U> third); * <T2,T3,T4> ReactiveSeq<Tuple4<T,T2,T3,T4>> zip4(Stream<T2> second,Stream<T3> third,Stream<T4> fourth); * <S, R> ReactiveSeq<R> zipSequence(ReactiveSeq<? extends S> second, * BiFunction<? super T, ? super S, ? extends R> zipper) * <S, R> ReactiveSeq<R> zipAnyM(AnyM<? extends S> second, * BiFunction<? super T, ? super S, ? extends R> zipper) ; * <S, R> ReactiveSeq<R> zipStream(BaseStream<? extends S,? extends BaseStream<? extends S,?>> second, * BiFunction<? super T, ? super S, ? extends R> zipper); * Tuple2<ReactiveSeq<T>,ReactiveSeq<T>> duplicateSequence(); * Tuple3<ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>> triplicate(); * Tuple4<ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>> quadruplicate(); * Tuple2<Optional<T>,ReactiveSeq<T>> splitSequenceAtHead(); * Tuple2<ReactiveSeq<T>,ReactiveSeq<T>> splitBy(Predicate<T> splitter); * ReactiveSeq<List<T>> sliding(int windowSize); * ReactiveSeq<List<T>> sliding(int windowSize,int increment); * ReactiveSeq<List<T>> grouped(int groupSize); * ReactiveSeq<T> scanLeft(Monoid<T> monoid); * ReactiveSeq<T> scanRight(Monoid<T> monoid); * boolean xMatch(int num, Predicate<? super T> c); * HeadAndTail<T> headAndTail(); * Optional<HeadAndTail<T>> headAndTailOptional(); * <R> R mapReduce(Monoid<R> reducer); * <R> R mapReduce(Function<? super T,? extends R> mapper, Monoid<R> reducer); * List collectStream(Stream<Collector> collectors); * <R> List<R> collectIterable(Iterable<Collector> collectors); * T reduce(Monoid<T> reducer); * List<T> reduce(Stream<Monoid<T>> reducers); * List<T> reduce(Iterable<Monoid<T>> reducers); * T foldLeft(Monoid<T> reducer); * <T> T foldLeftMapToType(Monoid<T> reducer); * T foldRight(Monoid<T> reducer); * public <T> T foldRightMapToType(Monoid<T> reducer); * Streamable<T> toStreamable(); * <T> Stream<T> toStream(); * startsWith(Iterable<T> iterable); * startsWith(Iterator<T> iterator); * AnyM<T> anyM(); * flatMapAnyM(Function<? super T,AnyM<? extends R>> fn); * flatMapCollection(Function<? super T,Collection<? extends R>> fn); * flatMapStream(Function<? super T,BaseStream<? extends R,?>> fn); * flatMapOptional(Function<? super T,Optional<? extends R>> fn) ; * flatMapCompletableFuture(Function<? super T,CompletableFuture<? extends R>> fn); * flatMapCharSequence(Function<? super T,CharSequence> fn); * flatMapFile(Function<? super T,File> fn); * flatMapURL(Function<? super T, URL> fn) ; * flatMapBufferedReader(Function<? super T,BufferedReader> fn); * Collection<T> toLazyCollection(); * Collection<T> toConcurrentLazyCollection(); * Streamable<T> toConcurrentLazyStreamable(); * ReactiveSeq<T> appendStream(Stream<T> stream); * ReactiveSeq<T> prependStream(Stream<T> stream); * ReactiveSeq<T> append(T... values); * ReactiveSeq<T> prepend(T... values) ; * ReactiveSeq<T> insertAt(int pos, T... values); * ReactiveSeq<T> deleteBetween(int start,int end); * ReactiveSeq<T> insertStreamAt(int pos, Stream<T> stream); * FutureOperations<T> futureOperations(Executor exec); * boolean endsWith(Iterable<T> iterable); * boolean endsWith(Stream<T> stream); * ReactiveSeq<T> skip(long time, final TimeUnit unit); * ReactiveSeq<T> limit(long time, final TimeUnit unit); * ReactiveSeq<T> skipLast(int num); * ReactiveSeq<T> limitLast(int num); * HotStream<T> hotStream(Executor e); * T firstValue(); * T single() * Optional<T> elementAt(long index) * Tuple2<T,ReactiveSeq<T>> get(long index) * ReactiveSeq<Tuple2<T,Long>> elapsed() * ReactiveSeq<Tuple2<T,Long>> timestamp() * <T> CyclopsSubscriber<T> subscriber() * ReactiveSeq<T> xPer(int x, long time, TimeUnit t); * ReactiveSeq<T> onePer(long time, TimeUnit t); * ReactiveSeq<T> fixedDelay(long l, TimeUnit unit); * ReactiveSeq<T> jitter(long maxJitterPeriodInNanos); * ReactiveSeq<T> debounce(long time, TimeUnit t); * ReactiveSeq<List<T>> batchBySizeAndTime(int size, long time, TimeUnit t); * <C extends Collection<T>> ReactiveSeq<C> batchBySizeAndTime(int size,long time, TimeUnit unit, Supplier<C> factory); * ReactiveSeq<List<T>> batchByTime(long time, TimeUnit t); * <C extends Collection<T>> ReactiveSeq<C> batchByTime(long time, TimeUnit unit, Supplier<C> factory); * ReactiveSeq<List<T>> batchBySize(int size); * <C extends Collection<T>>ReactiveSeq<C> batchBySize(int size, Supplier<C> supplier); * ReactiveSeq<Streamable<T>> windowBySizeAndTime(int maxSize, long maxTime, TimeUnit maxTimeUnit); * ReactiveSeq<Streamable<T>> windowWhile(Predicate<T> predicate); * ReactiveSeq<Streamable<T>> windowUntil(Predicate<T> predicate); * ReactiveSeq<Streamable<T>> windowStatefullyWhile(BiPredicate<Streamable<T>,T> predicate); * ReactiveSeq<Streamable<T>> windowByTime(long time, TimeUnit t); * ReactiveSeq<List<T>> batchUntil(Predicate<T> predicate); * ReactiveSeq<List<T>> batchWhile(Predicate<T> predicate); * <C extends Collection<T>> ReactiveSeq<C> batchWhile(Predicate<T> predicate, Supplier<C> factory); * <C extends Collection<T>> ReactiveSeq<C> batchUntil(Predicate<T> predicate, Supplier<C> factory); * ReactiveSeq<T> recover(final Function<Throwable, T> fn); * <EX extends Throwable> ReactiveSeq<T> recover(Class<EX> exceptionClass, final Function<EX, T> fn); * <R> ReactiveSeq<R> retry(Function<T,R> fn)

Other operators

(many of these are also available on ReactiveSeq also)


zipping operators

  • combineLatest
  • withLatest

sharding operators :

  • shard (map, fn)

Control operators -

  • debounce
  • onePer
  • xPer
  • control (fn)
  • skipUntil (stream)
  • takeUntil (stream)
  • jitter
  • fixedDelay

Batching operators

  • batchBySize
  • batchByTime
  • batch (fn)

Chunking operators

  • chunkSinceLastRead
  • chunkSinceLastReadIterator

Futures operators

  • limitFutures

  • skipFutures

  • sliceFutures

  • duplicateFutures

  • partitionFutures

  • splitAtFutures

  • zipFutures

  • zipFuturesWithIndex

  • firstOf

batchBySizeAndTime : batches results until a time or size limit is reached

e.g. batch in 10's or whatever has returned within 5 seconds

 lazyReact.from(urls) .map(this::load) .batchBySizeAndTime(10,5,TimeUnit.SECONDS) .toList();

switchOnNextValue : creates a new stream that takes the lasted value from a number of streams

 LazyFutureStream<Integer> fast = ... // [1,2,3,4,5,6,7..] LazyFutureStream<Integer> slow = ... // [100,200,300,400,500,600..] LazyFutureStream<Integer> merged = fast.switchOnNextValue(Stream.of(slow)); //[1,2,3,4,5,6,7,8,100,9,10,11,12,13,14,15,16,200..] 

copy : copies a Stream the specified number of times

 LazyFutureStream.of(1,2,3,4,5,6)	.map(i->i+2)	.copy(5)	.forEach(s -> System.out.println(s.toList()));

toLazyCollection : creates a Collection placeholder but doesn't block. EagerFutureStreams and SimpleReactStreams can populate the Collection asynchronously immediately and LazyFutureStreams won't populate it until a method is invoked

 Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)	.map(i->i+2)	.toLazyCollection();

toConcurrentLazyCollection : creates a lazy collection that can be shared across threads

 Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)	.map(i->i+2)	.toConcurrentLazyCollection(); 

firstValue : return the first value in a stream, must be present - no optional

 int first = LazyFutureStream.of(1,2,3,4) .firstValue(); //first is 1

single : return a single entry, exception if no entries or multiple

 int num =LazyFutureStream.of(1) .single(); //num is 1

futureOperations() & futureOperations(executor) - access terminal operations that return a future and execute asyncrhonously

 CompletableFuture<Integer> sum = LazyFutureStream.of(1,2,3,4,5)	.map(it -> it*100)	.futureOperations()	.reduce( 50,(acc,next) -> acc+next); //sum is CompletableFuture[1550]

sliding(size) : creates a sliding window over the data in the stream

 //futureStream 

Clone this wiki locally