Background
I am trying to run an Apache Beam pipeline as part of an Airflow DAG. The DAG run is triggered manually with a user_id parameter. Typically, in my PythonOperator I can access my DAG run conf as part of the arguments to the python_callable. For example:
def python_operator_example(ds, conf, dag_run): # Can easily access my user_id parameter user_id = dag_run.conf['user_id'] Problem
Now, I have created a BeamRunPythonPipelineOperator in which I want to access the user_id as well.
process_data = BeamRunPythonPipelineOperator( dag=dag, task_id="process_data", pipeline_options={ "input": "/Users/myuser/Desktop/test.txt", "output": "/Users/myuser/Desktop/foo/" }, py_file='pipeline/process_data.py' ) However, I can't seem to find a way to access the DAG run conf within the BeamRunPythonPipelineOperator. There seems to be no way to pass inputs from dag_run.conf to a Beam pipeline at runtime. Ideally, I want to do something like this:
## 1. Define a function to set pipeline inputs dynamically using dag_run def create_beam_pipeline_options(ds, conf, dag_run): return { "user_id": dag_run.conf['user_id'], "input": "/Users/myuser/Desktop/test.txt", "output": "/Users/myuser/Desktop/foo/" } ## 2. Pass the function into operator to generate options when pipeline task is invoked process_data = BeamRunPythonPipelineOperator( dag=dag, task_id="process_data", pipeline_options=create_beam_pipeline_options, py_file='pipeline/process_data.py' ) Question
What is the correct way to pass values like dag_run.conf that are defined at runtime as pipeline options into the BeamRunPythonPipelineOperator?
What I have already tried
- I checked whether
argvin the pipleline'srunfunction containsdag_run.confbut it does not. - I looked into whether I could use XCom to pass data from a pervious DAG task using
BeamRunPythonPipelineOperatorbut didn't find anything. - I explored some solutions that allow me to dynamically create a new DAG for each
user_idbut that seems wasteful and anti-pattern. - I read the docs and code for suggestions
My sincere apologies in advance if this is a silly question, I am quite new to using Beam / Airflow!