Skip to main content
1 of 2
kdbanman
  • 1.5k
  • 1
  • 13
  • 20

Does this data processing pipeline design need to be so complex?

I have a data processing pipeline with well defined stages and IO boundaries. I can choose a language to suit the needs of this design. It starts with an InputObject. At the end of each stage, there is some additional data derived from some or all of the results of previous steps and the InputObject. That is, StageN extends Stage and produces an immutable ResultsObjN, which extends Result. After the pipeline is complete, the final output I'm interested in is some subset of all results from each step:

InputObject | | v ----------- | Stage 1 | <-- Tunable Parameters ----------- | | InputObject ResultsObj1 | | v ----------- | Stage 2 | <-- Tunable Parameters ----------- | | InputObject ResultsObj1 ResultsObj2 | | v ... 

Currently, I'm modeling each stage as a Stage object. I'll explain what I mean by "tunable" parameters soon.

  • Each Stage has each tunable parameters as attributes exposed by getters and setters.
  • Each Stage is constructed with its input dependencies.
  • Each Stage has a method to compute its result based on the tunable parameters

Here it is in almost-UML:

 -------------------------------------------------- | <<abstract>> | | Stage | |--------------------------------------------------| |--------------------------------------------------| | +computeResult() : Result | -------------------------------------------------- 

For instance, Stage3 computes ResultObj3 using the InputObject and the results of Stage2. There are 2 parameters that can be set to change the results.

 -------------------------------------------------- | Stage3 | |--------------------------------------------------| | -param1 : Int | | -param2 : Float | |--------------------------------------------------| | +Stage3(raw : InputObject, corners : ResultObj2) | | +get/setParam1() | | +get/setParam2() | | +computeResult() : ResultObj3 | -------------------------------------------------- 

I would like to reuse this processing pipeline pattern, with different stages in different quantities with different input dependencies. The tunable parameters may be tweaked by an automated optimizer or by a human using some UI. In either case, they follow the same feedback process. Here it is tuning stage 3:

ResultObj3 performStage(InputObject raw, ResultObj2 corners): 1. Stage processor = new Stage3(raw, corners) 2. ResultObj result = processor.computeResult() 3. while (notAcceptable(result)): 4. tuneParams(processor, result) // tune to "more acceptable" params 5. result = processor.computeResult() 6. return result 

I feel like the tuning process should be performed for each step by a queue executor, but I'm not yet sure how to accommodate the growing set of differently typed results , or the varying construction requirements of each stage.

Here is some sample code of such a pipeline being constructed. The classes PointExtractor, PointClusterer, and ClusterPlotter all inherit from Stage. The StageTuner takes the class of the Stage to tune and implements the above feedback loop.

... QueuedPipeline pipeline = new QueuedPipeline() pipeline.setInput(raw) <-- raw is an InputObject taken as input elsewhere // addStage() takes the name of the stage, the stage runner, and the names of the stages it needs results from pipeline.addStage("ExtractPoints", new StageRunner(PointExtractor, ), []) pipeline.addStage("ClusterPoints", new StageRunner(PointClusterer), ["ExtractPoints"]) pipeline.addStage("PlotClusters", new StageRunner(ClusterPlotter), ["ExtractPoints", "ClusterPoints"]) // tune and execute each stage with the StageRunners PipelineResultList results = pipeline.run() Result someResult = results.getResult(1) Result anotherResult = results.getResult(4) saveResultFile(someResult, anotherResult) 

Simply passing around classes for construction is a headache in some languages. Also, I'm not really sure how to generically construct and set parameters of each Stage subclass because of the different number/type of parameters - perhaps each Stage subclass needs a corresponding StageRunner subclass. ...but these are just symptoms of my real problem: This is all starting to get complex and fuzzy for what I expected to be a straightforward problem.

Am I defining my objects and behaviours badly? Or is this not as straightforward a problem as I thought? Something else entirely?

kdbanman
  • 1.5k
  • 1
  • 13
  • 20