Since the Window question was answer by @Kenn Knowles allow me to answer the other half.
I think you could use Stateful and Timely processing and use a Timer of one minute for every element.
Bare in mind that the Timers are applied for each key, so each key would need to be unique in order for this to work. I made this code sample so you can test this, reading from Topic projects/pubsub-public-data/topics/taxirides-realtime.
p .apply("Read From PubSub", PubsubIO.readStrings().fromTopic(options.getTopic())) .apply("Parse and to KV", ParDo.of(new DoFn<String, KV<String, String>>() { @ProcessElement public void processElement(ProcessContext c) throws ParseException { JSONObject json = new JSONObject(c.element()); String rideStatus = json.getString("ride_status"); // ride_id is unique for dropoff String rideId = json.getString("ride_id"); // this is the session if (rideStatus.equals("dropoff")) { c.output(KV.of(rideId, "value")); } } } )) // Stateful DoFn need to have a KV as input .apply("Timer", ParDo.of(new DoFn<KV<String, String>, String >() { private final Duration BUFFER_TIME = Duration.standardSeconds(60); @TimerId("timer") private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); @StateId("buffer") // Elements will be saved here, with type String private final StateSpec<BagState<String>> bufferedEvents = StateSpecs.bag(); @ProcessElement public void processElement(ProcessContext c, @TimerId("timer") Timer timer, @StateId("buffer") BagState<String> buffer) throws ParseException { // keys are unique, so no need to use counters to trigger the offset timer.offset(BUFFER_TIME).setRelative(); buffer.add(c.element().getKey()); // add to buffer the unique id LOG.info("TIMER: Adding " + c.element().getKey() + " buffer at " + Instant.now().toString()); } // This method is call when timers expire @OnTimer("timer") public void onTimer( OnTimerContext c, @StateId("buffer") BagState<String> buffer ) throws IOException { for (String id : buffer.read()) { // there should be only one since key is unique LOG.info("TIMER: Releasing " + id + " from buffer at " + Instant.now().toString()); c.output(id); } buffer.clear(); // clearing buffer } }) );
This was just a quick test, so probably there would be things to improve in the code.
I am not sure, though, how would this perform with a lot of elements, since you are caching all elements for one minute in individual timers. I'm currently running this pipeline in Dataflow and so far so good, will update this if something weird happens.
The advantage of this vs using sleeps is that the sleep would need to wait for every single element in the bundle to sleep, while this does the wait parallely. The disadvantage may be using too much shuffle, but I haven't test this as much to be sure about this.
Note that in "normal" Stateful DoFns (1) keys are not expected to be unique, and in that case more than one element would be added to the bag, (2) using a counter or something to know if the timer has been offset already is needed, in this case we didn't need it since the keys are unique
Here you have a screenshot of the pipeline working
