1

Background

I am trying to run an Apache Beam pipeline as part of an Airflow DAG. The DAG run is triggered manually with a user_id parameter. Typically, in my PythonOperator I can access my DAG run conf as part of the arguments to the python_callable. For example:

def python_operator_example(ds, conf, dag_run): # Can easily access my user_id parameter user_id = dag_run.conf['user_id'] 

Problem

Now, I have created a BeamRunPythonPipelineOperator in which I want to access the user_id as well.

process_data = BeamRunPythonPipelineOperator( dag=dag, task_id="process_data", pipeline_options={ "input": "/Users/myuser/Desktop/test.txt", "output": "/Users/myuser/Desktop/foo/" }, py_file='pipeline/process_data.py' ) 

However, I can't seem to find a way to access the DAG run conf within the BeamRunPythonPipelineOperator. There seems to be no way to pass inputs from dag_run.conf to a Beam pipeline at runtime. Ideally, I want to do something like this:

## 1. Define a function to set pipeline inputs dynamically using dag_run def create_beam_pipeline_options(ds, conf, dag_run): return { "user_id": dag_run.conf['user_id'], "input": "/Users/myuser/Desktop/test.txt", "output": "/Users/myuser/Desktop/foo/" } ## 2. Pass the function into operator to generate options when pipeline task is invoked process_data = BeamRunPythonPipelineOperator( dag=dag, task_id="process_data", pipeline_options=create_beam_pipeline_options, py_file='pipeline/process_data.py' ) 

Question

What is the correct way to pass values like dag_run.conf that are defined at runtime as pipeline options into the BeamRunPythonPipelineOperator?

What I have already tried

  • I checked whether argv in the pipleline's run function contains dag_run.conf but it does not.
  • I looked into whether I could use XCom to pass data from a pervious DAG task using BeamRunPythonPipelineOperator but didn't find anything.
  • I explored some solutions that allow me to dynamically create a new DAG for each user_id but that seems wasteful and anti-pattern.
  • I read the docs and code for suggestions

My sincere apologies in advance if this is a silly question, I am quite new to using Beam / Airflow!  

1 Answer 1

1

I achieved this by using a PythonOperator that calls a BeamHook. It looks something like this.

from airflow.providers.apache.beam.operators.beam import ( BeamHook, BeamRunnerType ) beam_hook = BeamHook(BeamRunnerType.DirectRunner) def process_data(ds, conf, dag_run): beam_hook.start_python_pipeline( variables={"user_id": dag_run.conf["user_id"]}, py_file='dags/pipeline/pipline/process_data.py', py_options=[], py_requirements=['apache-beam'], py_interpreter='python3', py_system_site_packages=False, ) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.