2

I have a code that downloads files from gstorage dumps them to json turn that json to csv then to parquet and finally uploaded to aws s3 (dont ask why its like that im not the one that wrote it).

I discovered from my logs that sometimes the run ends without all the subprocesses completing. Does someone has any idea on why that could happen? and if not do you think that maybe switching ProcessPoolExecutor with a plain multiprocessing could help?

In my main to start this whole thing I use:

with ProcessPoolExecutor(max_workers=NUM_OF_PROCESS_WORKERS) as process_executor: for table_type in TABLES_COLUMNS_MAPPER.keys(): for node in nodes: process_executor.submit(handle_sstable_group_files_per_node, node, table_type) 

I'm using ubuntu if that any help

Thanks.

3
  • 1
    aside from the core question, why are you creating a NEW process pool inside the nested loop? It just looks like you are creating a new instance for every node & table and submitting a single job every time. if you wanna know what's going on with your jobs, gather up and monitor the Futures that were returned by the submit calls. they'll tell you what's going on. my 2c worth Commented Apr 25, 2021 at 7:29
  • That doesn’t look right. You create TABLES_COLUMNS_MAPPER x nodes executors. Create a single one, submit all your tasks and then wait for the results. Commented Apr 25, 2021 at 7:32
  • you are both right i'm sorry I copied it wrong I've fixed it Commented Apr 25, 2021 at 8:04

1 Answer 1

1

So to continue from the comments, you still don't await for the results and because you use a context manager the exception will be "swallowed":

test.py:

from concurrent.futures import ProcessPoolExecutor def worker(i): if i == 3: raise Exception(f"ERROR: {i}") print(f"TASK: {i}") return i * i def main(): futures = [] with ProcessPoolExecutor() as executor: for i in range(10): futures.append(executor.submit(worker, i)) # for future in futures: # print(future.result()) if __name__ == "__main__": main() 

Test:

$ python test.py TASK: 0 TASK: 1 TASK: 2 TASK: 4 TASK: 6 TASK: 8 TASK: 7 TASK: 9 

Now, when you uncomment these two lines:

for future in futures: print(future.result()) 

You can see the error now (assuming you don't handle the errors in the worker function):

$ python test.py TASK: 0 TASK: 1 TASK: 2 TASK: 4 0 1 4 TASK: 8 TASK: 6 TASK: 7 TASK: 9 concurrent.futures.process._RemoteTraceback: """ Traceback (most recent call last): File "/usr/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker r = call_item.fn(*call_item.args, **call_item.kwargs) File "test.py", line 8, in worker raise Exception(f"ERROR: {i}") Exception: ERROR: 3 """ The above exception was the direct cause of the following exception: Traceback (most recent call last): File "f.py", line 30, in <module> main() File "test.py", line 25, in main print(future.result()) File "/usr/lib/python3.8/concurrent/futures/_base.py", line 439, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result raise self._exception Exception: ERROR: 3 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.