0

Novice to threading here. I'm borrowing a lot of the code from this thread while trying to build my first script using threading/queue:

import threading, urllib2 import Queue import sys from PIL import Image import io, sys def avhash(url,queue): if not isinstance(url, Image.Image): try: im = Image.open(url) except IOError: fd=urllib2.urlopen(url) image_file=io.BytesIO(fd.read()) im=Image.open(image_file) im = im.resize((8, 8), Image.ANTIALIAS).convert('L') avg = reduce(lambda x, y: x + y, im.getdata()) / 64. hash = reduce(lambda x, (y, z): x | (z << y), enumerate(map(lambda i: 0 if i < avg else 1, im.getdata())), 0) queue.put({url:hash}) queue.task_done() def fetch_parallel(job_list): q = Queue.Queue() threads = [threading.Thread(target=avhash, args = (job,q)) for job in job_list[0:50]] for t in threads: t.daemon = True t.start() for t in threads: t.join() return [q.get() for _ in xrange(len(job_list))] 

In this case the job_list is a list of URLs. I've found that this code works fine when this list is equal to or less than 50, but it hangs when > 50. There must be something I'm fundamentally not understanding about how threading works?

1 Answer 1

0

Your problem is this line:

return [q.get() for _ in xrange(len(job_list))] 

If job_list has more than 50 elements, then you try to read more results from your queue than you have put in. Therefore:

return [q.get() for _ in xrange(len(job_list[:50]))] 

or, even better:

MAX_LEN = 50 ... threads = [... for job in job_list[:MAXLEN]] ... return [q.get() for _ in job_list[:MAXLEN]] 

[EDIT]

It seems you want your program to do something different than what it does. Your program takes the first 50 entries in job_list, handles each of these in a thread and disregards all other jobs. From your comment below I assume you want to handle all jobs, but only 50 at a time. For this, you should use a thread pool. In Python >= 3.2 you could use concurrent.futures.ThreadPoolExecutor [link].

In Python < 3.2 you have to roll your own:

CHUNK_SIZE = 50 def fetch_parallel(job_list): results = [] queue = Queue.Queue() while job_list: threads = [threading.Thread(target=avhash, args=(job, queue)) for job in job_list[:CHUNK_SIZE]] job_list = job_list[CHUNK_SIZE:] for thread in threads: thread.daemon = True thread.start() for thread in threads: thread.join() results.extend(queue.get() for _ in threads) return results 

(untested)

[/EDIT]

Sign up to request clarification or add additional context in comments.

1 Comment

Hi, the code runs without error, but it only returns the first 50 results.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.