0

We use python beam SDK with GCP's dataflow. Our pipeline depends on external system that we know has some delay. How can I write a pipeline that waits for N minutes (where N is constant I provide when launching job).

Something like pubsub -> (sleep for 1 minutes) -> read data from external system

My understanding of "FixedWindow" is it groups data into timeframe, so if I use 60 seconds fixed window I can achieve "up to 60 seconds" delay but I want here is constant 60 seconds delay for all incoming data.

2 Answers 2

2

Since the Window question was answer by @Kenn Knowles allow me to answer the other half.

I think you could use Stateful and Timely processing and use a Timer of one minute for every element.

Bare in mind that the Timers are applied for each key, so each key would need to be unique in order for this to work. I made this code sample so you can test this, reading from Topic projects/pubsub-public-data/topics/taxirides-realtime.

p .apply("Read From PubSub", PubsubIO.readStrings().fromTopic(options.getTopic())) .apply("Parse and to KV", ParDo.of(new DoFn<String, KV<String, String>>() { @ProcessElement public void processElement(ProcessContext c) throws ParseException { JSONObject json = new JSONObject(c.element()); String rideStatus = json.getString("ride_status"); // ride_id is unique for dropoff String rideId = json.getString("ride_id"); // this is the session if (rideStatus.equals("dropoff")) { c.output(KV.of(rideId, "value")); } } } )) // Stateful DoFn need to have a KV as input .apply("Timer", ParDo.of(new DoFn<KV<String, String>, String >() { private final Duration BUFFER_TIME = Duration.standardSeconds(60); @TimerId("timer") private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); @StateId("buffer") // Elements will be saved here, with type String private final StateSpec<BagState<String>> bufferedEvents = StateSpecs.bag(); @ProcessElement public void processElement(ProcessContext c, @TimerId("timer") Timer timer, @StateId("buffer") BagState<String> buffer) throws ParseException { // keys are unique, so no need to use counters to trigger the offset timer.offset(BUFFER_TIME).setRelative(); buffer.add(c.element().getKey()); // add to buffer the unique id LOG.info("TIMER: Adding " + c.element().getKey() + " buffer at " + Instant.now().toString()); } // This method is call when timers expire @OnTimer("timer") public void onTimer( OnTimerContext c, @StateId("buffer") BagState<String> buffer ) throws IOException { for (String id : buffer.read()) { // there should be only one since key is unique LOG.info("TIMER: Releasing " + id + " from buffer at " + Instant.now().toString()); c.output(id); } buffer.clear(); // clearing buffer } }) ); 

This was just a quick test, so probably there would be things to improve in the code.

I am not sure, though, how would this perform with a lot of elements, since you are caching all elements for one minute in individual timers. I'm currently running this pipeline in Dataflow and so far so good, will update this if something weird happens.

The advantage of this vs using sleeps is that the sleep would need to wait for every single element in the bundle to sleep, while this does the wait parallely. The disadvantage may be using too much shuffle, but I haven't test this as much to be sure about this.

Note that in "normal" Stateful DoFns (1) keys are not expected to be unique, and in that case more than one element would be added to the bag, (2) using a counter or something to know if the timer has been offset already is needed, in this case we didn't need it since the keys are unique

Here you have a screenshot of the pipeline working

enter image description here

Sign up to request clarification or add additional context in comments.

Comments

1

FixedWindows does not introduce any delay.

In Beam, windowing groups elements according to their timestamps. This is separate from when the elements arrive.

The PubsubIO transform maintains a "watermark" which measures the timestamps that are still remaining in the Pubsub queue. So the watermark will lag real time by 1 minute.

If the pubsub topic becomes empty for a long time, the watermark will sync up with real time. So in that case you may need to allow late data in your pipeline.

2 Comments

Thank you Kenn this confirms my understanding but doesn't answer my question :) I know fixedwindow is not what I want, I'm looking for solution for my case. Would calling sleep from one of pardo bad idea?
Oh I see, so the pubsub message is just an indicator that it is time to read from the external system? If that is the case, you should be able to use splittable DoFn. This is a DoFn where processing of a single element (the pubsub message) can be split across workers (good for reading) and can set watermark holds.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.