I'm starting to try Google Cloud Dataflow, after classic wordcount example, I wrote my own script:
import argparse import sys import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions class Split(beam.DoFn): def process(self, element): (numfact, bag, type, owner, main_owner, client) = element.splt('\t') return [{ 'numfact': int(numfact), 'type': type, 'owner': owner }] parser = argparse.ArgumentParser() parser.add_argument('--input') parser.add_argument('--output') known_args, extra_args = parser.parse_known_args(sys.argv[1:]) options = PipelineOptions(extra_args) p = beam.Pipeline(options=options) print(known_args) print(extra_args) csv_lines = (p | "Load" >> ReadFromText(known_args.input, skip_header_lines=1) | "Process" >> beam.ParDo(Split()) | "Write" >> WriteToText(known_args.output)) Here a sample from input file:
Numfact BAG TYPE OWNER MAIN OWNER CLIENT 728632636 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES TELETICKET Rimac 704845964 CNT Alternativos Kramer Ortiz SOAT Canal 701387639 CNT SIN ASIGNAR Sin asignar WEB VEHICULOS Canal 692571746 CNT Concesionarios Kramer Ortiz WEB VEHICULOS Canal 682823453 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal 682823452 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal 682823451 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal 682823454 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal 706853395 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES - WEB Canal 706466281 CNT Alternativos Kramer Ortiz SOAT Canal Finally, I call this to execute like this (file is saved as .txt):
python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1 After this, it shows prints on console, but does not register execution in DataFlow console.
Update
This is how console looks like:
(gcp) gocht@~/script$ python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1 Namespace(input='gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt', output='gs://dummy_bucket/outputs') ['--runner', 'DataflowRunner', '--project', 'dummyproject-268120', '--temp_location', 'gs://dummy_bucket/tmp', '--region', 'us-central1'] This only shows prints placed on code script.
What am I missing?
Thanks!