Skip to main content
Tweeted twitter.com/#!/StackProgrammer/status/604792652006064128
fix StageTuner naming consistency. flesh out concrete example.
Source Link
kdbanman
  • 1.5k
  • 1
  • 13
  • 20

Sample Pipeline

HereThe point of this pipeline is some sample codeto read a set of suchcoordinates from a CSV (into String raw) and pass the string through a pipeline being constructedthat will parse the string, cluster the points, and plot the clusters. The classescluster data and plot images are extracted from the pipeline after it is run.

The string is parsed into a collection of PointExtractorPoint, objects at the PointClusterer"ExtractPoints", and stage. That collection is clustered/segmented into a set of ClusterPlotterCluster all inherit fromobjects at the Stage"ClusterPoints" stage. The StageTunerPoint takes the class of theand StageCluster data is used to tune and implementsplot the above feedback looppoints visually into a Plot image object at the "PlotClusters" stage.

... QueuedPipeline pipeline = new QueuedPipeline()   String raw = readFile("points.csv") pipeline.setInput(raw) <-- raw is an InputObject taken as input elsewhere // addStage() takes the name of the stage, the stage runner, and the names of the stages it needs results from pipeline.addStage("ExtractPoints", new StageRunnerStageTuner(PointExtractor, ), []) pipeline.addStage("ClusterPoints", new StageRunnerStageTuner(PointClusterer), ["ExtractPoints"]) pipeline.addStage("PlotClusters", new StageRunnerStageTuner(ClusterPlotter), ["ExtractPoints", "ClusterPoints"]) // tune and execute each stage with the StageRunnersStageTuners PipelineResultList results = pipeline.run() // pipeline is done, collect the interesting results Result someResultpointClusters = results.getResult(1"ClusterPoints") Result anotherResultclusterPlot = results.getResult(4"PlotClusters") saveResultFilesaveClusterFile(someResult, anotherResultpointClusters) saveCllusterPlotImage(clusterPlot) 
  • The classes PointExtractor, PointClusterer, and ClusterPlotter all inherit from Stage.
  • Each StageTuner takes the class of the Stage to tune and implements the above feedback loop.
  • Variables pointClusters and clusterPlots are of Result type

Problem

SimplyBut simply passing around classes for construction is a headache in some languages. Also, I'm not really sure how to generically construct and set parameters of each Stage subclass because of the different number/type of parameters - perhaps each Stage subclass needs a corresponding StageRunner subclass. Finally, casting Result objects to the type I need within the final two functions sounds dangerous. ...but these are just symptoms of my real problem: This is all starting to get complex and fuzzy for what I expected to be a straightforward problem.

Here is some sample code of such a pipeline being constructed. The classes PointExtractor, PointClusterer, and ClusterPlotter all inherit from Stage. The StageTuner takes the class of the Stage to tune and implements the above feedback loop.

... QueuedPipeline pipeline = new QueuedPipeline() pipeline.setInput(raw) <-- raw is an InputObject taken as input elsewhere // addStage() takes the name of the stage, the stage runner, and the names of the stages it needs results from pipeline.addStage("ExtractPoints", new StageRunner(PointExtractor, ), []) pipeline.addStage("ClusterPoints", new StageRunner(PointClusterer), ["ExtractPoints"]) pipeline.addStage("PlotClusters", new StageRunner(ClusterPlotter), ["ExtractPoints", "ClusterPoints"]) // tune and execute each stage with the StageRunners PipelineResultList results = pipeline.run() Result someResult = results.getResult(1) Result anotherResult = results.getResult(4) saveResultFile(someResult, anotherResult) 

Simply passing around classes for construction is a headache in some languages. Also, I'm not really sure how to generically construct and set parameters of each Stage subclass because of the different number/type of parameters - perhaps each Stage subclass needs a corresponding StageRunner subclass. ...but these are just symptoms of my real problem: This is all starting to get complex and fuzzy for what I expected to be a straightforward problem.

Sample Pipeline

The point of this pipeline is to read a set of coordinates from a CSV (into String raw) and pass the string through a pipeline that will parse the string, cluster the points, and plot the clusters. The cluster data and plot images are extracted from the pipeline after it is run.

The string is parsed into a collection of Point objects at the "ExtractPoints" stage. That collection is clustered/segmented into a set of Cluster objects at the "ClusterPoints" stage. The Point and Cluster data is used to plot the points visually into a Plot image object at the "PlotClusters" stage.

QueuedPipeline pipeline = new QueuedPipeline()   String raw = readFile("points.csv") pipeline.setInput(raw) // addStage() takes the name of the stage, the stage runner, and the names of the stages it needs results from pipeline.addStage("ExtractPoints", new StageTuner(PointExtractor, ), []) pipeline.addStage("ClusterPoints", new StageTuner(PointClusterer), ["ExtractPoints"]) pipeline.addStage("PlotClusters", new StageTuner(ClusterPlotter), ["ExtractPoints", "ClusterPoints"]) // tune and execute each stage with the StageTuners PipelineResultList results = pipeline.run() // pipeline is done, collect the interesting results Result pointClusters = results.getResult("ClusterPoints") Result clusterPlot = results.getResult("PlotClusters") saveClusterFile(pointClusters) saveCllusterPlotImage(clusterPlot) 
  • The classes PointExtractor, PointClusterer, and ClusterPlotter all inherit from Stage.
  • Each StageTuner takes the class of the Stage to tune and implements the above feedback loop.
  • Variables pointClusters and clusterPlots are of Result type

Problem

But simply passing around classes for construction is a headache in some languages. Also, I'm not really sure how to generically construct and set parameters of each Stage subclass because of the different number/type of parameters - perhaps each Stage subclass needs a corresponding StageRunner subclass. Finally, casting Result objects to the type I need within the final two functions sounds dangerous. ...but these are just symptoms of my real problem: This is all starting to get complex and fuzzy for what I expected to be a straightforward problem.

Source Link
kdbanman
  • 1.5k
  • 1
  • 13
  • 20

Does this data processing pipeline design need to be so complex?

I have a data processing pipeline with well defined stages and IO boundaries. I can choose a language to suit the needs of this design. It starts with an InputObject. At the end of each stage, there is some additional data derived from some or all of the results of previous steps and the InputObject. That is, StageN extends Stage and produces an immutable ResultsObjN, which extends Result. After the pipeline is complete, the final output I'm interested in is some subset of all results from each step:

InputObject | | v ----------- | Stage 1 | <-- Tunable Parameters ----------- | | InputObject ResultsObj1 | | v ----------- | Stage 2 | <-- Tunable Parameters ----------- | | InputObject ResultsObj1 ResultsObj2 | | v ... 

Currently, I'm modeling each stage as a Stage object. I'll explain what I mean by "tunable" parameters soon.

  • Each Stage has each tunable parameters as attributes exposed by getters and setters.
  • Each Stage is constructed with its input dependencies.
  • Each Stage has a method to compute its result based on the tunable parameters

Here it is in almost-UML:

 -------------------------------------------------- | <<abstract>> | | Stage | |--------------------------------------------------| |--------------------------------------------------| | +computeResult() : Result | -------------------------------------------------- 

For instance, Stage3 computes ResultObj3 using the InputObject and the results of Stage2. There are 2 parameters that can be set to change the results.

 -------------------------------------------------- | Stage3 | |--------------------------------------------------| | -param1 : Int | | -param2 : Float | |--------------------------------------------------| | +Stage3(raw : InputObject, corners : ResultObj2) | | +get/setParam1() | | +get/setParam2() | | +computeResult() : ResultObj3 | -------------------------------------------------- 

I would like to reuse this processing pipeline pattern, with different stages in different quantities with different input dependencies. The tunable parameters may be tweaked by an automated optimizer or by a human using some UI. In either case, they follow the same feedback process. Here it is tuning stage 3:

ResultObj3 performStage(InputObject raw, ResultObj2 corners): 1. Stage processor = new Stage3(raw, corners) 2. ResultObj result = processor.computeResult() 3. while (notAcceptable(result)): 4. tuneParams(processor, result) // tune to "more acceptable" params 5. result = processor.computeResult() 6. return result 

I feel like the tuning process should be performed for each step by a queue executor, but I'm not yet sure how to accommodate the growing set of differently typed results , or the varying construction requirements of each stage.

Here is some sample code of such a pipeline being constructed. The classes PointExtractor, PointClusterer, and ClusterPlotter all inherit from Stage. The StageTuner takes the class of the Stage to tune and implements the above feedback loop.

... QueuedPipeline pipeline = new QueuedPipeline() pipeline.setInput(raw) <-- raw is an InputObject taken as input elsewhere // addStage() takes the name of the stage, the stage runner, and the names of the stages it needs results from pipeline.addStage("ExtractPoints", new StageRunner(PointExtractor, ), []) pipeline.addStage("ClusterPoints", new StageRunner(PointClusterer), ["ExtractPoints"]) pipeline.addStage("PlotClusters", new StageRunner(ClusterPlotter), ["ExtractPoints", "ClusterPoints"]) // tune and execute each stage with the StageRunners PipelineResultList results = pipeline.run() Result someResult = results.getResult(1) Result anotherResult = results.getResult(4) saveResultFile(someResult, anotherResult) 

Simply passing around classes for construction is a headache in some languages. Also, I'm not really sure how to generically construct and set parameters of each Stage subclass because of the different number/type of parameters - perhaps each Stage subclass needs a corresponding StageRunner subclass. ...but these are just symptoms of my real problem: This is all starting to get complex and fuzzy for what I expected to be a straightforward problem.

Am I defining my objects and behaviours badly? Or is this not as straightforward a problem as I thought? Something else entirely?