I've written a Python script that will download files from a website. To speed it up, I've made the downloading of the files multithreaded. Obviously, this is faster than doing the downloads serially, but I've come across some effects that I cannot explain.
- The first
xfiles (seems proportional to the amount of threads created) downloaded are incredibly fast--the output shows upwards of 40 files per second--but after that, slows down a lot. - Up to a point (near 200 threads), the maximum speed at which I can download files averages 10 files per second. If I increase the thread count to, say, 700, it still maxes out at 10 files per second. Increasing the thread count to a very large number (over 1,000) seems to limit the download speed based on CPU speed.
So, my questions are:
- Why are the first files I download downloaded so fast compared to the rest and can I maintain the original speed?
- Why does the thread count have such diminishing returns for download speeds?
Here is my script:
#!/usr/bin/python import inspect import math from queue import Queue from urllib.request import ProxyHandler, build_opener from ast import literal_eval from time import time, sleep from datetime import timedelta import random from threading import Thread, activeCount import os proxies = Queue() threads = Queue() agents = [] total_files = 0 finished_files = 0 downloaded_files = 0 start_time = 0 class Config(object): DEBUG = False PROXIES_PATH = '/home/shane/bin/proxies.txt' AGENTS_PATH = '/home/shane/bin/user-agents.txt' DESTINATION_PATH = '/home/shane/images/%d.jpg' SOURCE_URL = 'https://example.org/%d.jpg' MAX_THREADS = 500 TIMEOUT = 62 RETRIES = 1 RETRIES_TIME = 1 def get_files_per_second(): return float(downloaded_files) / (time() - start_time) def get_time_remaining(): delta = timedelta(seconds=float(total_files - finished_files) / get_files_per_second()) seconds = delta.total_seconds() days, remainder = divmod(seconds, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) days = str(int(days)).zfill(2) hours = str(int(hours)).zfill(2) minutes = str(int(minutes)).zfill(2) seconds = str(int(seconds)).zfill(2) return "%s:%s:%s:%s" % (days, hours, minutes, seconds) def release_proxy(opener): if Config.DEBUG: print('Releasing proxy') for handler in opener.handlers: if type(handler) is ProxyHandler: proxies.put(handler) return raise Exception('No proxy found') def get_new_proxy(): if Config.DEBUG: print('Getting new proxy') if proxies.empty(): raise Exception('No proxies') return proxies.get() def get_new_agent(): if len(agents) == 0: raise Exception('No user agents') return random.choice(agents) def get_new_opener(): opener = build_opener(get_new_proxy()) opener.addheaders = [('User-Agent', get_new_agent())] return opener def download(opener, source, destination, tries=0): global finished_files, downloaded_files if Config.DEBUG: print('Downloading %s to %s' % (source, destination)) try: result = opener.open(source, timeout=Config.TIMEOUT).read() with open(destination, 'wb') as d: d.write(result) release_proxy(opener) finished_files += 1 downloaded_files += 1 to_print = '(%d/%d files) (%d proxies) (%f files/second, %s left) (%d threads) %s' print(to_print % (finished_files, total_files, proxies.qsize(), round(get_files_per_second(), 2), get_time_remaining(), activeCount(), source)) except Exception as e: if Config.DEBUG: print(e) if tries < Config.RETRIES: sleep(Config.RETRIES_TIME) download(opener, source, destination, tries + 1) else: if proxies.qsize() < Config.MAX_THREADS * 2: release_proxy(opener) download(get_new_opener(), source, destination, 0) class Downloader(Thread): def __init__(self, source, destination): Thread.__init__(self) self.source = source self.destination = destination def run(self): if Config.DEBUG: print('Running thread') download(get_new_opener(), self.source, self.destination) if threads.qsize() > 0: threads.get().start() def populate_proxies(): if Config.DEBUG: print('Populating proxies') with open(Config.PROXIES_PATH, 'r') as fh: for line in fh: line = line.replace('\n', '') if Config.DEBUG: print('Adding %s to proxies' % line) proxies.put(ProxyHandler(literal_eval(line))) def populate_agents(): if Config.DEBUG: print('Populating agents') with open(Config.AGENTS_PATH, 'r') as fh: for line in fh: line = line.replace('\n', '') if Config.DEBUG: print('Adding %s to agents' % line) agents.append(line) def populate_threads(): global total_files, finished_files if Config.DEBUG: print('Populating threads') for x in range(0, 100000): destination = Config.SOURCE_URL % x # queue threads print('Queueing %s' % destination) threads.put(Downloader(source, destination)) def start_work(): global start_time if threads.qsize() == 0: raise Exception('No work to be done') start_time = time() for x in range(0, min(threads.qsize(), Config.MAX_THREADS)): if Config.DEBUG: print('Starting thread %d' % x) threads.get().start() populate_proxies() populate_agents() populate_threads() start_work()