24

I have a function that is side-effect free. I would like to run it for every element in an array and return an array with all of the results.

Does Python have something to generate all of the values?

5
  • 1
    do you mean map as in map-reduce? can you give an example of an input and an output row? Commented Apr 1, 2010 at 18:44
  • No, I do not mean map reduce. Since I want all of the data from each individual function returned. It's just that each value can be calculated independently of one another. Although, since I think that I want the max of the set, maybe I can use map reduce here... Commented Apr 1, 2010 at 20:28
  • 1
    map() would do what you say, operate on each element independently (with the GIL caveats mentioned below) Commented Apr 1, 2010 at 22:17
  • streams.fastmap() from Pyxtension: github.com/asuiu/pyxtension does exactly this - multithreaded map. Commented May 24, 2020 at 13:13
  • pyxtension library can now be found on PyPi also : pypi.org/project/pyxtension Commented Nov 15, 2022 at 22:59

6 Answers 6

21

Try the Pool.map function from multiprocessing:

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

It's not multithreaded per-se, but that's actually good since multithreading is severely crippled in Python by the GIL.

Sign up to request clarification or add additional context in comments.

3 Comments

Very cool, that last line sold me. I saw this multiprocessing library before but I thought that it was too heavy weight for my needs. I think that I see the light now :) Thank you.
Is this answer up to date/still relevant in regards to multithreading/GIL?
Yes, for some values of "severe." I think it's a matter of opinion how bad it is, but I still prefer multiple processes on Linux where processes aren't too heavy.
14

Try concurrent.futures.ThreadPoolExecutor.map in Python Standard Library (New in version 3.2).

Similar to map(func, *iterables) except:

  • the iterables are collected immediately rather than lazily;
  • func is executed asynchronously and several calls to func may be made concurrently.

A simple example (modified from ThreadPoolExecutor Example):

import concurrent.futures import urllib.request URLS = [ 'http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', ] # Retrieve a single page and report the URL and contents def load_url(url, timeout): # Do something here # For example with urllib.request.urlopen(url, timeout=timeout) as conn: try: data = conn.read() except Exception as e: # You may need a better error handler. return b'' else: return data # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: # map l = list(executor.map(lambda url: load_url(url, 60), URLS)) print('Done.') 

Comments

4

Python now has the concurrent.futures module, which is the simplest way of getting map to work with either multiple threads or multiple processes.

https://docs.python.org/3/library/concurrent.futures.html

Comments

2

You can use the multiprocessing python package (http://docs.python.org/library/multiprocessing.html). The cloud python package, available from PiCloud (http://www.picloud.com), offers a multi-processing map() function as well, which can offload your map to the cloud.

Comments

1

Below is my map_parallel function. It works just like map, except it can run each element in parallel in a separate thread (but see note below). This answer builds upon another SO answer.

import threading import logging def map_parallel(f, iter, max_parallel = 10): """Just like map(f, iter) but each is done in a separate thread.""" # Put all of the items in the queue, keep track of order. from queue import Queue, Empty total_items = 0 queue = Queue() for i, arg in enumerate(iter): queue.put((i, arg)) total_items += 1 # No point in creating more thread objects than necessary. if max_parallel > total_items: max_parallel = total_items # The worker thread. res = {} errors = {} class Worker(threading.Thread): def run(self): while not errors: try: num, arg = queue.get(block = False) try: res[num] = f(arg) except Exception as e: errors[num] = sys.exc_info() except Empty: break # Create the threads. threads = [Worker() for _ in range(max_parallel)] # Start the threads. [t.start() for t in threads] # Wait for the threads to finish. [t.join() for t in threads] if errors: if len(errors) > 1: logging.warning("map_parallel multiple errors: %d:\n%s"%( len(errors), errors)) # Just raise the first one. item_i = min(errors.keys()) type, value, tb = errors[item_i] # Print the original traceback logging.info("map_parallel exception on item %s/%s:\n%s"%( item_i, total_items, "\n".join(traceback.format_tb(tb)))) raise value return [res[i] for i in range(len(res))] 

NOTE: One thing to be careful of is Exceptions. Like normal map, the above function raises an exception if one of it's sub-thread raises an exception, and will stop iteration. However, due to the parallel nature, there's no guarantee that the earliest element will raise the first exception.

Comments

0

Maybe try the Unladen Swallow Python 3 implementation? That might be a major project, and not guaranteed to be stable, but if you're inclined it could work. Then list or set comprehensions seem like the proper functional structure to use.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.