1

Another confused parallel coder here!

Our internal Hive database has an API layer to which we need to use to access the data. There is a 300 second query timeout limit, so I wanted to use multiprocessing to execute multiple queries in parallel:

from multiprocessing import Pool import pandas as pd import time from hive2pandas_anxpy import Hive2Pandas # custom module for querying our Hive db and converting the results to a Pandas dataframe import datetime def run_query(hour): start_time = time.time() start_datetime = datetime.datetime.now() query = """SELECT id, revenue from table where date='2014-05-20 %s' limit 50""" % hour h2p = Hive2Pandas(query, 'username') h2p.run() elapsed_time = int(time.time() - start_time) end_datetime = datetime.datetime.now() return {'query':query, 'start_time':start_datetime, 'end_time':end_datetime, 'elapsed_time':elapsed_time, 'data':h2p.data_df} if __name__ == '__main__': start_time = time.time() pool = Pool(4) hours = ['17','18','19'] results = pool.map_async(run_query, hours) pool.close() pool.join() print int(time.time() - start_time) 

The issue I'm having is that one of the queries always returns no data, but when I run the same query in the usual fashion, it returns data. Since I'm new to multiprocessing, I'm wondering if there are there any obvious issues with how I'm using it above?

5
  • 1
    cough Parameterized queries cough? Commented May 21, 2014 at 14:05
  • Should days_back actually be hour? The code as written will crash because hour isn't defined in run_query when you use it. Trying putting some debugging code into run_query to see if h2p is returning a valid response. If it's not, you probably need to debug what's going on in Hive2Pandas. Maybe that module isn't thread-safe? Commented May 21, 2014 at 14:33
  • @dano yes it was supposed to be hour, fixed. I had edited it in the post to make the variables more readable. Regarding threadsafe--is that a relevant concern when using multiprocessing (and not threads)? Commented May 21, 2014 at 14:50
  • You probably want to use .map not .map_async. You want those things to happen in parallel, but want to wait for the results, aka blocking. Commented May 21, 2014 at 15:06
  • multiprocessing does not share states(memory), which threads do. Therefore, it will be okay if you donot share stats among processes Commented May 21, 2014 at 15:08

1 Answer 1

1

I think the issue you are having is that the results object is not ready by the time you want to use it. Also if you have a known amount of time for a timeout, I would suggest using that to your advantage in the code.

This code shows an example of how you can force a timeout after 300 seconds if the results from all of them are not collected by then.

if __name__ == '__main__': start_time = time.time() hours = ['17','18','19'] with Pool(processes=4) as pool: results = pool.map_async(run_query, hours) print(results.get(timeout=300)) print int(time.time() - start_time) 

Otherwise you should still be using results.get() to return your data, or specify a callback function for map_async.

Sign up to request clarification or add additional context in comments.

1 Comment

this code gave me an error: AttributeError: exit

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.