1

I needed to implement a custom Stream that yields items in a sliding window (ie. [1, 2, 3] => [(1, 2), (2, 3)]). So I implemented and gave it an adapter called .tuple_windows(). Allowing the following code

let iter = stream::iter(0..=3); assert_eq!( iter.tuple_windows().collect::<Vec<_>>().await, vec![(0, 1), (1, 2), (2, 3)] ) 

I ran into a weird situation when chaining other adapters with it where the final type doesn't implement the Stream trait.

code (playground):

use anyhow; // 1.0.52 use futures; // 0.3.19 use futures::{stream, Stream, StreamExt}; use pin_project_lite; use pin_project_lite::pin_project; use std::{ pin::Pin, task::{Context, Poll}, }; use tokio; // 1.15.0 // 0.2.8 #[tokio::main] async fn main() -> anyhow::Result<()> { let mut stream = stream::iter(0..20) .map(|_| stream::iter(2..10)) // this works with the custom Stream // .map(|_| stream::iter(2..10).enumerate()) // but this doesn't .enumerate(); // this works regardless what happens in `map` // .tuple_windows(); // this only works with the first map while let Some(_) = stream.next().await {} Ok(()) } impl<T: Stream> TupleWindowsExt for T {} pub trait TupleWindowsExt: Stream + Sized { fn tuple_windows(self) -> TupleWindows<Self> { TupleWindows::new(self) } } pin_project! { #[derive(Debug)] pub struct TupleWindows<S: Stream> { #[pin] stream: S, previous: Option<S::Item>, } } impl<S: Stream> TupleWindows<S> { pub fn new(stream: S) -> Self { Self { stream, previous: None, } } } impl<S: Stream> Stream for TupleWindows<S> where S::Item: Clone, { type Item = (S::Item, S::Item); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) { Some(next) => next, None => return Poll::Ready(None), }; if let Some(previous) = this.previous { let res = (previous.clone(), current.clone()); *this.previous = Some(current); Poll::Ready(Some(res)) } else { let next = match this.stream.poll_next(cx) { Poll::Ready(next) => next, Poll::Pending => { *this.previous = Some(current); return Poll::Pending; } }; *this.previous = next.clone(); Poll::Ready(next.map(|next| (current, next))) } } fn size_hint(&self) -> (usize, Option<usize>) { let (lower, upper) = self.stream.size_hint(); ( lower.saturating_mul(2), upper.and_then(|upper| upper.checked_mul(2)), ) } } 

The compiler error is not helpful either as it only tells me that Stream is not implemented for the newly created type:

error[E0599]: the method `next` exists for struct `TupleWindows<futures::stream::Map<futures::stream::Iter<std::ops::Range<{integer}>>, [closure@src/main.rs:16:11: 16:46]>>`, but its trait bounds were not satisfied 

What am I missing here ?

2 Answers 2

2

Your Stream implementation requires that items from the inner stream be cloneable:

impl<S: Stream> Stream for TupleWindows<S> where S::Item: Clone, 

In the working case, stream::iter(0..20).map(|_| stream::iter(2..10)).tuple_windows(), you're passing an stream of futures::stream::Iter<std::ops::Range<i32>> items to tuple_windows(). Iter implements Clone when the inner iterator type implements Clone. The inner iterator type here is std::ops::Range<i32>, which does implement Clone.

When you change the code to add a call to enumerate() within the map(), you're now passing an stream of futures::stream::Enumerate<futures::stream::Iter<std::ops::Range<i32>>> items (i.e. a stream of streams) to tuple_windows(). Enumerate doesn't implement Clone at all (as of futures 0.3.19).

I can't see any reason why Enumerate couldn't implement Clone (with the appropriate trait bounds); I suppose it wasn't implemented because nobody asked for it.

Sign up to request clarification or add additional context in comments.

Comments

0

Got it, the Stream impl doesn't work because the items don't satisfy Clone which is required by my custom Stream

I should have put the Clone bound on the adapter as well as the impl block

impl<T: ?Sized> TupleWindowsExt for T where T: Stream {} pub trait TupleWindowsExt: Stream { fn tuple_windows(self) -> TupleWindows<Self> where Self: Sized, Self::Item: Clone, { TupleWindows::new(self) } } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.