1

I'm starting to try Google Cloud Dataflow, after classic wordcount example, I wrote my own script:

import argparse import sys import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions class Split(beam.DoFn): def process(self, element): (numfact, bag, type, owner, main_owner, client) = element.splt('\t') return [{ 'numfact': int(numfact), 'type': type, 'owner': owner }] parser = argparse.ArgumentParser() parser.add_argument('--input') parser.add_argument('--output') known_args, extra_args = parser.parse_known_args(sys.argv[1:]) options = PipelineOptions(extra_args) p = beam.Pipeline(options=options) print(known_args) print(extra_args) csv_lines = (p | "Load" >> ReadFromText(known_args.input, skip_header_lines=1) | "Process" >> beam.ParDo(Split()) | "Write" >> WriteToText(known_args.output)) 

Here a sample from input file:

Numfact BAG TYPE OWNER MAIN OWNER CLIENT 728632636 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES TELETICKET Rimac 704845964 CNT Alternativos Kramer Ortiz SOAT Canal 701387639 CNT SIN ASIGNAR Sin asignar WEB VEHICULOS Canal 692571746 CNT Concesionarios Kramer Ortiz WEB VEHICULOS Canal 682823453 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal 682823452 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal 682823451 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal 682823454 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal 706853395 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES - WEB Canal 706466281 CNT Alternativos Kramer Ortiz SOAT Canal 

Finally, I call this to execute like this (file is saved as .txt):

python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1 

After this, it shows prints on console, but does not register execution in DataFlow console.

Update

This is how console looks like:

(gcp) gocht@~/script$ python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1 Namespace(input='gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt', output='gs://dummy_bucket/outputs') ['--runner', 'DataflowRunner', '--project', 'dummyproject-268120', '--temp_location', 'gs://dummy_bucket/tmp', '--region', 'us-central1'] 

This only shows prints placed on code script.

What am I missing?

Thanks!

5
  • can you provide the output of the command ? Commented Feb 25, 2020 at 16:45
  • @Pievis updated. Commented Feb 25, 2020 at 16:51
  • 1
    first time I see the python version but could the p.run().wait_until_finish() be missing in the code ? Commented Feb 25, 2020 at 16:55
  • 2
    @Pievis worked dude! So simple, so efective. Consider post it as an answer. Thanks! Commented Feb 25, 2020 at 17:18
  • I'm glad I've helped ! I'm following your advice and putting it as an answer :) Commented Feb 26, 2020 at 14:55

2 Answers 2

1

Since the answer is in the comment, writing it here also :)

You need to actually make the pipeline run by performing:

p.run().wait_until_finish() 

If you feel stuck and you are not sure what's wrong, try to have a look at the examples provided here - the java version really helped me a lot when starting with dataflow :)

Sign up to request clarification or add additional context in comments.

Comments

1

You will need

result = p.run() 

at the end of your file to run the pipeline.

Basically I think you have constructed your pipeline but didn't really ask to run it.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.