0

Here is the apache beam custom pipeline code (dataflow).

last_sync_data = ( p | "Read last sync data" >> ReadFromBigQuery( query='SELECT MAX(time) as last_sync FROM ' '[wisdomcircle-350611:custom_test_data.last_sync]') | "Extract last sync time" >> beam.Map(lambda elem: elem['last_sync']) ) p = beam.Pipeline(options=options) wisgen_data = p | "wisgen job" >> ReadFromJdbc ( jdbc_url=jdbc_url, username=username, password=password, driver_class_name='org.postgresql.Driver', query="""SELECT users.id AS user_id, CONCAT(users.first_name,' ', users.last_name) AS full_name""", table_name="users" ) recruiter_data = p | "recruiter job" >> ReadFromJdbc( jdbc_url=jdbc_url, username=username, password=password, driver_class_name='org.postgresql.Driver', query="""SELECT users.id AS user_id, '""", table_name="users" ) wisgen_data | "Convert TableRow to dict(wisgen data)" >> beam.Map( lambda row: row._asdict() ) | "Write to BigQuery in wisgen data table" >> WriteToBigQuery( wisgen_table, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, schema='user_id:INTEGER,full_name:STRING) recruiter_data | "Convert TableRow to dict(recruiter data)" >> beam.Map( lambda row: row._asdict()) | "Write to BigQuery in recruiter data table" >> WriteToBigQuery( recruiter_table, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, schema='user_id:INTEGER,full_name:STRING' ) _ = (p | "Create current timestamp" >> beam.Create([{'time': datetime.datetime.utcnow()}]) | "Write to last_sync" >> WriteToBigQuery, last_sync_tab schema='time:TIMESTAMP', write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, ) 

The pipeline operates asynchronously or completes all job operations in parallel, but I want to first execute the starting job last_sync_data (which is written at the beginning of the above code), then just the jobs below should run, and at last, my timstamp (which is written at the end of the above code) job should run when all the above operations are finished. Can somebody assist me in rewriting the code I have above to meet my needs ?

1
  • Not clear what you want to do. Can you please clarify with what you are seeing vs expected? Anything that you pipe to the original pipeline (p | ), will start at the beginning of the job. If you want to create dependencies between steps, you need to pipe those together instead of sourcing from p. Commented Mar 5, 2023 at 16:38

1 Answer 1

1

why is p = beam.Pipeline(options=options) created again in your code? and have you tried this:Wait.On() on Apache Beam Python SDK version

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.