Here is the apache beam custom pipeline code (dataflow).
last_sync_data = ( p | "Read last sync data" >> ReadFromBigQuery( query='SELECT MAX(time) as last_sync FROM ' '[wisdomcircle-350611:custom_test_data.last_sync]') | "Extract last sync time" >> beam.Map(lambda elem: elem['last_sync']) ) p = beam.Pipeline(options=options) wisgen_data = p | "wisgen job" >> ReadFromJdbc ( jdbc_url=jdbc_url, username=username, password=password, driver_class_name='org.postgresql.Driver', query="""SELECT users.id AS user_id, CONCAT(users.first_name,' ', users.last_name) AS full_name""", table_name="users" ) recruiter_data = p | "recruiter job" >> ReadFromJdbc( jdbc_url=jdbc_url, username=username, password=password, driver_class_name='org.postgresql.Driver', query="""SELECT users.id AS user_id, '""", table_name="users" ) wisgen_data | "Convert TableRow to dict(wisgen data)" >> beam.Map( lambda row: row._asdict() ) | "Write to BigQuery in wisgen data table" >> WriteToBigQuery( wisgen_table, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, schema='user_id:INTEGER,full_name:STRING) recruiter_data | "Convert TableRow to dict(recruiter data)" >> beam.Map( lambda row: row._asdict()) | "Write to BigQuery in recruiter data table" >> WriteToBigQuery( recruiter_table, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, schema='user_id:INTEGER,full_name:STRING' ) _ = (p | "Create current timestamp" >> beam.Create([{'time': datetime.datetime.utcnow()}]) | "Write to last_sync" >> WriteToBigQuery, last_sync_tab schema='time:TIMESTAMP', write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, ) The pipeline operates asynchronously or completes all job operations in parallel, but I want to first execute the starting job last_sync_data (which is written at the beginning of the above code), then just the jobs below should run, and at last, my timstamp (which is written at the end of the above code) job should run when all the above operations are finished. Can somebody assist me in rewriting the code I have above to meet my needs ?
p |), will start at the beginning of the job. If you want to create dependencies between steps, you need to pipe those together instead of sourcing fromp.