5

We are experimenting with Apache Beam (using Go SDK) and Dataflow to parallelize one of our time consuming tasks. For little more context, we have caching job which takes some queries, runs it across database and caches them. Each database query may take few seconds to many minutes and we want to run those in parallel for quicker task completion.

Created a simple pipeline that looks like this:

 // Create initial PCollection. startLoad := beam.Create(s, "InitialLoadToStartPipeline") // Emits a unit of work along with query and date range. cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad) // Emits a cache response which includes errCode, errMsg, time etc. cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads) ... 

The number units which getCachePayloadsFn emits are not a lot and will be mostly in hundreds and max few thousands in production.

Now the issue is cacheQueryDoFn is not getting executed in parallel and queries are getting executed sequentially one by one. We confirmed this by putting logs in StartBundle and ProcessElement by logging goroutine id, process id, start and end time etc in caching function to confirm that there is no overlap in execution.

We would want to run the queries always in parallel even if there are just 10 queries. From our understanding and documentations, it creates bundles from the overall input and those bundles run in parallel and within bundle it runs sequentially. Is there a way to control the number of bundles from the load or any way to increase parallelism?

Things we tried:

  1. Keeping num_workers=2 and autoscaling_algorithm=None. It starts two VMs but runs Setup method to initialize DoFn on only one VM and uses that for entire load.
  2. Found sdk_worker_parallelism option here. But not sure how to correctly set it. Tried setting it with beam.PipelineOptions.Set("sdk_worker_parallelism", "50"). No effect.

1 Answer 1

5

By default, the Create is not parallel and all the DoFns are being fused into the same stage as the Create, so they also have no parallelism. See https://beam.apache.org/documentation/runtime/model/#dependent-parallellism for some more info on this.

You can explicitly force a fusion break with the Reshuffle transform.

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for suggesting this. I was able to see some overlap in DoFn's ProcessElement start and end times. Is there a way to know how many of such parallel executions are happening and also is there a way to control it? The DoFn which we want to parallelize makes a DB connection to run the query. Have initilized DB in Setup, but we would want to control the number of parallel queries
I was also trying to experiment with the job time by increasing the --max_num_workers parameters. I see no major change in job time when I set max workers to 5, 10 or 20. When I set to 20, it autoscales up to 13 but overall time for the job remains same. Please let me know if you have some suggestions for the same as well
If you want to explicitly cap the parallelism, you can explicitly pick keys and then use GroupByKey.
Not sure if I understood that correctly. Let's say I have 1000 query payloads to be run and I want max 20 of them to be running on database at a time, can it be controlled by GroupByKey? Shall I emit these 1000 payloads with 20 different keys and go a GroupByKey? Within these 20 grouped payloads I assume it will run sequentially then. But is there a way then to ensure that all 20 keys are processed in parallel and not some lesser number?
Yeah, that understanding is correct. There's not a way to guarantee exact parallelism though; the runner will give you up to 20 keys in parallel depending on autoscaling decisions. You can likely get pretty close to what you want by specifying --numWorkers=20 to set the initial number of workers.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.