2

I have a few thousand CSV files in a directory on my machine that need to be validated based on a regex that I have formulated. The path_to_validator points to a Scala script, that is run through a windows .bat file on the command line. It reads the regex and the csv file and gives it a PASS/FAIL grade, which is printed to output.txt.

The constraint is that this Scala script takes a directory as argument, not a Python list, therefore I cannot split the workload so easily between processes. I could have each process' files moved to a temporary directory, but the details of the project are such that, ideally, my deployed program should not need write privileges to the CSV files.


This is the code:

with open("output.txt", 'w') as output: for filename in os.listdir(path_to_csv_folder): print("Processing file " + str(current_file_count) + "/" + str(TOTAL_FILE_COUNT), end='\r') output.write(filename + ': ') validator = subprocess.Popen([path_to_validator, path_to_csv_folder + filename, path_to_csv_schema, "-x", CSV_ENCODING, "-y", CSV_SCHEMA_ENCODING], stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = validator.stdout.read() output.write(result.decode('windows-1252')) current_file_count += 1 

The issue is that it takes 1h 30min+ while utilizing only about 20% CPU. This should be an obvious candidate for parallelization speedup. The directory has 5000+ CSV files, and they all need to be processed. How can I split the workload onto 4 different processes in order to utilize all CPU power?

This is the code I actually made:

""" Command line API to CSV validator using Scala implementation from: http://digital-preservation.github.io/csv-validator/#toc7 """ PATH_TO_VALIDATOR = r"C:\prog\csv\csv-validator-cmd-1.2-RC2\bin\validate.bat" PATH_TO_CSV_FOLDER = r"C:\prog\csv\CSVFiles" PATH_TO_CSV_SCHEMA = r"C:\prog\csv\ocr-schema.csvs" # Set defaults CSV_ENCODING = "windows-1252" CSV_SCHEMA_ENCODING = "UTF-8" def open_csv(CSV_LIST): import subprocess # To be used to display a simple progress indicator TOTAL_FILE_COUNT = len(CSV_LIST) current_file_count = 1 with open("output.txt", 'w') as output: for filename in CSV_LIST: print("Processing file " + str(current_file_count) + "/" + str(TOTAL_FILE_COUNT)) output.write(filename + ': ') validator = subprocess.Popen( [PATH_TO_VALIDATOR, PATH_TO_CSV_FOLDER + "/" + filename, PATH_TO_CSV_SCHEMA, "--csv-encoding", CSV_ENCODING, "--csv-schema-encoding", CSV_SCHEMA_ENCODING, '--fail-fast', 'true'], stdout=subprocess.PIPE) result = validator.stdout.read() output.write(result.decode('windows-1252')) current_file_count += 1 # Split a list into n sublists of roughly equal size def split_list(alist, wanted_parts=1): length = len(alist) return [alist[i * length // wanted_parts: (i + 1) * length // wanted_parts] for i in range(wanted_parts)] if __name__ == '__main__': import argparse import multiprocessing import os parser = argparse.ArgumentParser(description="Command line API to Scala CSV validator") parser.add_argument('-pv', '--PATH_TO_VALIDATOR', help="Specify the path to csv-validator-cmd/bin/validator.bat", required=True) parser.add_argument('-pf', '--PATH_TO_CSV_FOLDER', help="Specify the path to the folder containing the csv files " "you want to validate", required=True) parser.add_argument('-ps', '--PATH_TO_CSV_SCHEMA', help="Specify the path to CSV schema you want to use to " "validate the given files", required=True) parser.add_argument('-cenc', '--CSV_ENCODING', help="Optional parameter to specify the encoding used by the CSV " "files. Choose UTF-8 or windows-1252. Default windows-1252") parser.add_argument('-csenc', '--CSV_SCHEMA_ENCODING', help="Optional parameter to specify the encoding used by " "the CSV Schema. Choose UTF-8 or windows-1252. " "Default UTF-8") args = vars(parser.parse_args()) if args['CSV_ENCODING'] is not None: CSV_ENCODING = args['CSV_ENCODING'] if args['CSV_SCHEMA_ENCODING'] is not None: CSV_SCHEMA_ENCODING = args['CSV_SCHEMA_ENCODING'] PATH_TO_VALIDATOR = args["PATH_TO_VALIDATOR"] PATH_TO_CSV_SCHEMA = args["PATH_TO_CSV_SCHEMA"] PATH_TO_CSV_FOLDER = args["PATH_TO_CSV_FOLDER"] CPU_COUNT = multiprocessing.cpu_count() split_csv_directory = split_list(os.listdir(args["PATH_TO_CSV_FOLDER"]), wanted_parts=CPU_COUNT) # Spawn a Process for each CPU on the system for csv_list in split_csv_directory: p = multiprocessing.Process(target=open_csv, args=(csv_list,)) p.start() 

Please let me know of any pitfalls in my code.

1
  • There are options like dask, ray and spark. Commented Dec 22, 2018 at 14:49

1 Answer 1

1

Have a look on This introduction to the multiprocessing package.

for example, try:

import multiprocessing as mp import os def process_csv(csv): % process the csv return {csv: collected_debug_information} pool = mp.Pool(processes=4) results = pool.map(process_csv, os.listdir(path_to_csv_folder)) 

with the returned dict, you can have a look on the results to evaluate some parsing errors or so. It will be a list of dicts with the csv names as keys.

Also a good package for that is joblib, have a look on that too, under the hood it uses the multiprocessing package.

Sign up to request clarification or add additional context in comments.

2 Comments

results will end up being a list of boolean values in some random order with this approach since there's no way to tell to which csv file each list value corresponds. Also, the if at the end of the process_csv() function could be replaced with return bool(success).
Yeah of course, it was just a first draft. Would be smart to store some debug information in there, I'll edit it