I needed to implement a custom Stream that yields items in a sliding window (ie. [1, 2, 3] => [(1, 2), (2, 3)]). So I implemented and gave it an adapter called .tuple_windows(). Allowing the following code
let iter = stream::iter(0..=3); assert_eq!( iter.tuple_windows().collect::<Vec<_>>().await, vec![(0, 1), (1, 2), (2, 3)] ) I ran into a weird situation when chaining other adapters with it where the final type doesn't implement the Stream trait.
code (playground):
use anyhow; // 1.0.52 use futures; // 0.3.19 use futures::{stream, Stream, StreamExt}; use pin_project_lite; use pin_project_lite::pin_project; use std::{ pin::Pin, task::{Context, Poll}, }; use tokio; // 1.15.0 // 0.2.8 #[tokio::main] async fn main() -> anyhow::Result<()> { let mut stream = stream::iter(0..20) .map(|_| stream::iter(2..10)) // this works with the custom Stream // .map(|_| stream::iter(2..10).enumerate()) // but this doesn't .enumerate(); // this works regardless what happens in `map` // .tuple_windows(); // this only works with the first map while let Some(_) = stream.next().await {} Ok(()) } impl<T: Stream> TupleWindowsExt for T {} pub trait TupleWindowsExt: Stream + Sized { fn tuple_windows(self) -> TupleWindows<Self> { TupleWindows::new(self) } } pin_project! { #[derive(Debug)] pub struct TupleWindows<S: Stream> { #[pin] stream: S, previous: Option<S::Item>, } } impl<S: Stream> TupleWindows<S> { pub fn new(stream: S) -> Self { Self { stream, previous: None, } } } impl<S: Stream> Stream for TupleWindows<S> where S::Item: Clone, { type Item = (S::Item, S::Item); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) { Some(next) => next, None => return Poll::Ready(None), }; if let Some(previous) = this.previous { let res = (previous.clone(), current.clone()); *this.previous = Some(current); Poll::Ready(Some(res)) } else { let next = match this.stream.poll_next(cx) { Poll::Ready(next) => next, Poll::Pending => { *this.previous = Some(current); return Poll::Pending; } }; *this.previous = next.clone(); Poll::Ready(next.map(|next| (current, next))) } } fn size_hint(&self) -> (usize, Option<usize>) { let (lower, upper) = self.stream.size_hint(); ( lower.saturating_mul(2), upper.and_then(|upper| upper.checked_mul(2)), ) } } The compiler error is not helpful either as it only tells me that Stream is not implemented for the newly created type:
error[E0599]: the method `next` exists for struct `TupleWindows<futures::stream::Map<futures::stream::Iter<std::ops::Range<{integer}>>, [closure@src/main.rs:16:11: 16:46]>>`, but its trait bounds were not satisfied What am I missing here ?