0

I am in need of processing several thousands small log files.

I opted for Databricks to handle this problem, because it has good parallel computing capacities and interacts nicely with the Azure Blob storage account where the files are hosted.

After some researching, I always retrieve the same snippet of code (in PySpark).

# Getting your list of files with custom function list_of_files = get_my_files() # Create a path_rdd and use a custom udf to parse it path_rdd = sc.parallelize(list_of_files) content = path_rdd.map(parse_udf).collect() 

Is there a better any method to do this? Would you opt for a flatmap if the logfiles are in a CSV format?

Thank you!

7
  • If all your files are in same directory (or you can move them in same directory), you can read at the folder level (spark.read.format('csv').load("folder_name")) - this way you will leverage spark internal parallel processing instead parsing every file as a UDF. Commented Dec 9, 2019 at 17:55
  • They are in a hierarchical directory structure "{location}/{YYYY}{MM}". Do I get any performance gain if I change my list_of_files to a list_of_lowest_dirs in that case? Commented Dec 9, 2019 at 18:23
  • If you can get directory structure updated to "location/year=YYYY/month=MM/date=DD/*.csv" - then while reading folder like df = spark.read.format("csv").option("header", "true").load("cars_data/") will automatically add year, month and date as a column, which you can utilize for filter and that will certainly provide you performance gain. Commented Dec 9, 2019 at 19:36
  • 2
    @HussainBohra You don’t have to use Hive partitioning. You could use the wildcard: spark.read.csv("location/*/*/"). Commented Dec 9, 2019 at 20:22
  • @OliverW. thanks for the tip! Can you also do custom comment header parsing with read.csv? Commented Dec 10, 2019 at 8:08

1 Answer 1

2

My current solution is:

content = sc.wholeTextFiles('/mnt/container/foo/*/*/', numPartitions=XX) parsed_content = content.flatMap(custom_parser).collect() 

I read all the content of the files as a string and keep their filenames. I then pass this to my parsing function "custom_parser" using a flatMap, "where custom_parser" is defined as

def custom_parser(*argv): file, content = argv # Apply magic return parsed_content_ 

I am currently finishing with a .collect() action, but I will alter this to save the output directly.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.