Skip to main content
AI Assist is now on Stack Overflow. Start a chat to get instant answers from across the network. Sign up to save and share your chats.
Active reading [<https://en.wikipedia.org/wiki/Python_%28programming_language%29> <https://en.wikipedia.org/wiki/Multi-core_processor#Commercial> <https://en.wikipedia.org/wiki/Microsoft_Windows>].
Source Link
Peter Mortensen
  • 31.4k
  • 22
  • 110
  • 134

Here is a pythonPython program to use the multiprocessing library to distribute the line counting across machines/cores. My test improves counting a 20million20 million line file from 26 seconds to 7 seconds using an 8 core windows-core Windows 64-bit server. Note: not using memory mapping makes things much slower.

import multiprocessing, sys, time, os, mmap import logging, logging.handlers def init_logger(pid): console_format = 'P{0} %(levelname)s %(message)s'.format(pid) logger = logging.getLogger() # New logger at root level logger.setLevel( logging.INFO ) logger.handlers.append( logging.StreamHandler() ) logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) ) def getFileLineCount( queues, pid, processes, file1 ): init_logger(pid) logging.info( 'start' ) physical_file = open(file1, "r") # mmap.mmap(fileno, length[, tagname[, access[, offset]]] m1 = mmap.mmap( physical_file.fileno(), 0, access=mmap.ACCESS_READ ) #work# Work out file size to divide up line counting fSize = os.stat(file1).st_size chunk = (fSize / processes) + 1 lines = 0 #get# Get where I start and stop _seedStart = chunk * (pid) _seekEnd = chunk * (pid+1) seekStart = int(_seedStart) seekEnd = int(_seekEnd) if seekEnd < int(_seekEnd + 1): seekEnd += 1 if _seedStart < int(seekStart + 1): seekStart += 1 if seekEnd > fSize: seekEnd = fSize #find# Find where to start if pid > 0: m1.seek( seekStart ) #read# Read next line l1 = m1.readline() # needNeed to use readline with memory mapped files seekStart = m1.tell() #tell# Tell previous rank my seek start to make their seek end if pid > 0: queues[pid-1].put( seekStart ) if pid < processes-1: seekEnd = queues[pid].get() m1.seek( seekStart ) l1 = m1.readline() while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break logging.info( 'done' ) # addAdd up the results if pid == 0: for p in range(1, processes): lines += queues[0].get() queues[0].put(lines) # theThe total lines counted else: queues[0].put(lines) m1.close() physical_file.close() if __name__ == '__main__': init_logger( 'main' ) if len(sys.argv) > 1: file_name = sys.argv[1] else: logging.fatal( 'parameters required: file-name [processes]' ) exit() t = time.time() processes = multiprocessing.cpu_count() if len(sys.argv) > 2: processes = int(sys.argv[2]) queues=[]queues = [] # aA queue for each process for pid in range(processes): queues.append( multiprocessing.Queue() ) jobs=[]jobs = [] prev_pipe = 0 for pid in range(processes): p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) ) p.start() jobs.append(p) jobs[0].join() #wait# Wait for counting to finish lines = queues[0].get() logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) ) 

Here is a python program to use the multiprocessing library to distribute the line counting across machines/cores. My test improves counting a 20million line file from 26 seconds to 7 seconds using an 8 core windows 64 server. Note: not using memory mapping makes things much slower.

import multiprocessing, sys, time, os, mmap import logging, logging.handlers def init_logger(pid): console_format = 'P{0} %(levelname)s %(message)s'.format(pid) logger = logging.getLogger() # New logger at root level logger.setLevel( logging.INFO ) logger.handlers.append( logging.StreamHandler() ) logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) ) def getFileLineCount( queues, pid, processes, file1 ): init_logger(pid) logging.info( 'start' ) physical_file = open(file1, "r") # mmap.mmap(fileno, length[, tagname[, access[, offset]]] m1 = mmap.mmap( physical_file.fileno(), 0, access=mmap.ACCESS_READ ) #work out file size to divide up line counting fSize = os.stat(file1).st_size chunk = (fSize / processes) + 1 lines = 0 #get where I start and stop _seedStart = chunk * (pid) _seekEnd = chunk * (pid+1) seekStart = int(_seedStart) seekEnd = int(_seekEnd) if seekEnd < int(_seekEnd + 1): seekEnd += 1 if _seedStart < int(seekStart + 1): seekStart += 1 if seekEnd > fSize: seekEnd = fSize #find where to start if pid > 0: m1.seek( seekStart ) #read next line l1 = m1.readline() # need to use readline with memory mapped files seekStart = m1.tell() #tell previous rank my seek start to make their seek end if pid > 0: queues[pid-1].put( seekStart ) if pid < processes-1: seekEnd = queues[pid].get() m1.seek( seekStart ) l1 = m1.readline() while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break logging.info( 'done' ) # add up the results if pid == 0: for p in range(1,processes): lines += queues[0].get() queues[0].put(lines) # the total lines counted else: queues[0].put(lines) m1.close() physical_file.close() if __name__ == '__main__': init_logger( 'main' ) if len(sys.argv) > 1: file_name = sys.argv[1] else: logging.fatal( 'parameters required: file-name [processes]' ) exit() t = time.time() processes = multiprocessing.cpu_count() if len(sys.argv) > 2: processes = int(sys.argv[2]) queues=[] # a queue for each process for pid in range(processes): queues.append( multiprocessing.Queue() ) jobs=[] prev_pipe = 0 for pid in range(processes): p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) ) p.start() jobs.append(p) jobs[0].join() #wait for counting to finish lines = queues[0].get() logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) ) 

Here is a Python program to use the multiprocessing library to distribute the line counting across machines/cores. My test improves counting a 20 million line file from 26 seconds to 7 seconds using an 8-core Windows 64-bit server. Note: not using memory mapping makes things much slower.

import multiprocessing, sys, time, os, mmap import logging, logging.handlers def init_logger(pid): console_format = 'P{0} %(levelname)s %(message)s'.format(pid) logger = logging.getLogger() # New logger at root level logger.setLevel(logging.INFO) logger.handlers.append(logging.StreamHandler()) logger.handlers[0].setFormatter(logging.Formatter(console_format, '%d/%m/%y %H:%M:%S')) def getFileLineCount(queues, pid, processes, file1): init_logger(pid) logging.info('start') physical_file = open(file1, "r") # mmap.mmap(fileno, length[, tagname[, access[, offset]]] m1 = mmap.mmap(physical_file.fileno(), 0, access=mmap.ACCESS_READ) # Work out file size to divide up line counting fSize = os.stat(file1).st_size chunk = (fSize / processes) + 1 lines = 0 # Get where I start and stop _seedStart = chunk * (pid) _seekEnd = chunk * (pid+1) seekStart = int(_seedStart) seekEnd = int(_seekEnd) if seekEnd < int(_seekEnd + 1): seekEnd += 1 if _seedStart < int(seekStart + 1): seekStart += 1 if seekEnd > fSize: seekEnd = fSize # Find where to start if pid > 0: m1.seek(seekStart) # Read next line l1 = m1.readline() # Need to use readline with memory mapped files seekStart = m1.tell() # Tell previous rank my seek start to make their seek end if pid > 0: queues[pid-1].put(seekStart) if pid < processes-1: seekEnd = queues[pid].get() m1.seek(seekStart) l1 = m1.readline() while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break logging.info('done') # Add up the results if pid == 0: for p in range(1, processes): lines += queues[0].get() queues[0].put(lines) # The total lines counted else: queues[0].put(lines) m1.close() physical_file.close() if __name__ == '__main__': init_logger('main') if len(sys.argv) > 1: file_name = sys.argv[1] else: logging.fatal('parameters required: file-name [processes]') exit() t = time.time() processes = multiprocessing.cpu_count() if len(sys.argv) > 2: processes = int(sys.argv[2]) queues = [] # A queue for each process for pid in range(processes): queues.append(multiprocessing.Queue()) jobs = [] prev_pipe = 0 for pid in range(processes): p = multiprocessing.Process(target = getFileLineCount, args=(queues, pid, processes, file_name,)) p.start() jobs.append(p) jobs[0].join() # Wait for counting to finish lines = queues[0].get() logging.info('finished {} Lines:{}'.format( time.time() - t, lines)) 
in some cases, seekStart and seekEnd was returning Float. Now they are integers
Source Link
namit
  • 7k
  • 4
  • 37
  • 41
 #LineCount multiprocessing.py import multiprocessing, sys, time, os, mmap  import logging, logging.handlers  def init_logger(pid): console_format = 'P{0} %(levelname)s %(message)s'.format(pid) logger = logging.getLogger() # New logger at root level logger.setLevel( logging.INFO ) logger.handlers.append( logging.StreamHandler() ) logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) )  def getFileLineCount( queues, pid, processes, file1 ): init_logger(pid) logging.info( 'start' ) physical_file = open(file1, "r") # mmap.mmap(fileno, length[, tagname[, access[, offset]]]  m1 = mmap.mmap( physical_file.fileno(), 0, None, mmapaccess=mmap.ACCESS_READ ) #work out file size to divide up line counting fSize = os.stat(file1).st_size  chunk = (fSize / processes) + 1 lines = 0 #get where I start and stop seekStart_seedStart = chunk * (pid) seekEnd_seekEnd = chunk * (pid+1) seekStart = int(_seedStart) seekEnd = int(_seekEnd)   if seekEnd < int(_seekEnd + 1): seekEnd += 1 if _seedStart < int(seekStart + 1): seekStart += 1 if seekEnd > fSize: seekEnd = fSize #find where to start if pid > 0: m1.seek( seekStart ) #read next line l1 = m1.readline() # need to use readline with memory mapped files seekStart = m1.tell() #tell previous rank my seek start to make their seek end if pid > 0: queues[pid-1].put( seekStart ) if pid < processes-1: seekEnd = queues[pid].get() m1.seek( seekStart )  l1 = m1.readline()   while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break   logging.info( 'done' ) # add up the results  if pid == 0: for p in range(1,processes): lines += queues[0].get() queues[0].put(lines) # the total lines counted else: queues[0].put(lines) m1.close() physical_file.close()  if __name__ == '__main__': init_logger( 'main' ) if len(sys.argv) > 1: file_name = sys.argv[1] else: logging.fatal( 'parameters required: file-name [processes]' ) exit()   t = time.time() processes = multiprocessing.cpu_count() if len(sys.argv) > 2: processes = int(sys.argv[2]) queues=[] # a queue for each process for pid in range(processes): queues.append( multiprocessing.Queue() ) jobs=[] prev_pipe = 0 for pid in range(processes): p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) ) p.start() jobs.append(p)   jobs[0].join() #wait for counting to finish lines = queues[0].get() logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) ) 
 #LineCount multiprocessing.py import multiprocessing, sys, time, os, mmap  import logging, logging.handlers  def init_logger(pid): console_format = 'P{0} %(levelname)s %(message)s'.format(pid) logger = logging.getLogger() # New logger at root level logger.setLevel( logging.INFO ) logger.handlers.append( logging.StreamHandler() ) logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) )  def getFileLineCount( queues, pid, processes, file1 ): init_logger(pid) logging.info( 'start' ) physical_file = open(file1, "r") m1 = mmap.mmap( physical_file.fileno(), 0, None, mmap.ACCESS_READ ) #work out file size to divide up line counting fSize = os.stat(file1).st_size  chunk = (fSize / processes) + 1 lines = 0 #get where I start and stop seekStart = chunk * (pid) seekEnd = chunk * (pid+1) if seekEnd > fSize: seekEnd = fSize #find where to start if pid > 0: m1.seek( seekStart ) #read next line l1 = m1.readline() # need to use readline with memory mapped files seekStart = m1.tell() #tell previous rank my seek start to make their seek end if pid > 0: queues[pid-1].put( seekStart ) if pid < processes-1: seekEnd = queues[pid].get() m1.seek( seekStart )  l1 = m1.readline()   while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break   logging.info( 'done' ) # add up the results  if pid == 0: for p in range(1,processes): lines += queues[0].get() queues[0].put(lines) # the total lines counted else: queues[0].put(lines) m1.close() physical_file.close()  if __name__ == '__main__': init_logger( 'main' ) if len(sys.argv) > 1: file_name = sys.argv[1] else: logging.fatal( 'parameters required: file-name [processes]' ) exit()   t = time.time() processes = multiprocessing.cpu_count() if len(sys.argv) > 2: processes = int(sys.argv[2]) queues=[] # a queue for each process for pid in range(processes): queues.append( multiprocessing.Queue() ) jobs=[] prev_pipe = 0 for pid in range(processes): p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) ) p.start() jobs.append(p)   jobs[0].join() #wait for counting to finish lines = queues[0].get() logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) ) 
import multiprocessing, sys, time, os, mmap import logging, logging.handlers def init_logger(pid): console_format = 'P{0} %(levelname)s %(message)s'.format(pid) logger = logging.getLogger() # New logger at root level logger.setLevel( logging.INFO ) logger.handlers.append( logging.StreamHandler() ) logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) ) def getFileLineCount( queues, pid, processes, file1 ): init_logger(pid) logging.info( 'start' ) physical_file = open(file1, "r") # mmap.mmap(fileno, length[, tagname[, access[, offset]]]  m1 = mmap.mmap( physical_file.fileno(), 0, access=mmap.ACCESS_READ ) #work out file size to divide up line counting fSize = os.stat(file1).st_size chunk = (fSize / processes) + 1 lines = 0 #get where I start and stop _seedStart = chunk * (pid) _seekEnd = chunk * (pid+1) seekStart = int(_seedStart) seekEnd = int(_seekEnd)   if seekEnd < int(_seekEnd + 1): seekEnd += 1 if _seedStart < int(seekStart + 1): seekStart += 1 if seekEnd > fSize: seekEnd = fSize #find where to start if pid > 0: m1.seek( seekStart ) #read next line l1 = m1.readline() # need to use readline with memory mapped files seekStart = m1.tell() #tell previous rank my seek start to make their seek end if pid > 0: queues[pid-1].put( seekStart ) if pid < processes-1: seekEnd = queues[pid].get() m1.seek( seekStart ) l1 = m1.readline() while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break logging.info( 'done' ) # add up the results if pid == 0: for p in range(1,processes): lines += queues[0].get() queues[0].put(lines) # the total lines counted else: queues[0].put(lines) m1.close() physical_file.close() if __name__ == '__main__': init_logger( 'main' ) if len(sys.argv) > 1: file_name = sys.argv[1] else: logging.fatal( 'parameters required: file-name [processes]' ) exit() t = time.time() processes = multiprocessing.cpu_count() if len(sys.argv) > 2: processes = int(sys.argv[2]) queues=[] # a queue for each process for pid in range(processes): queues.append( multiprocessing.Queue() ) jobs=[] prev_pipe = 0 for pid in range(processes): p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) ) p.start() jobs.append(p) jobs[0].join() #wait for counting to finish lines = queues[0].get() logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) ) 
Refactored to use the standard multiprocessing library rather than pypar
Source Link
Martlark
  • 14.6k
  • 14
  • 86
  • 103

Here is a python functionprogram to use mpi (pypar)the multiprocessing library to distribute the line counting across machines/cores. My test improves counting a 20mil20million line file from 26 seconds to 7 seconds using an 8 core windows 64 server. Note: not using memory mapping makes things much slower.

 #LineCount multiprocessing.py import pyparmultiprocessing, sys, time, os, mmap import logging, syslogging.handlers def init_logger(pid): console_format = 'P{0} %(levelname)s %(message)s'.format(pid) logger = logging.getLogger() # New logger at root level logger.setLevel( logging.INFO )  logger.handlers.append( logging.StreamHandler() )  logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) ) def getFileLineCount( queues, pid, processes, file1 ): m1init_logger(pid)  logging.info( 'start' ) physical_file = open(file1, "r")   #workm1 out= filemmap.mmap( sizephysical_file.fileno(), 0, None, mmap.ACCESS_READ )  #divide#work upout file size to farmdivide outup line counting fSize = os.stat(file1).st_size chunk = (fSize / pypar.size()processes) + 1 lines = 0   #first get max width#get ofwhere eachI fieldstart and the namesstop seekStart = chunk * (pypar.rank()pid) seekEnd = chunk * (pypar.rank()+1pid+1) if seekEnd > fSize: seekEnd = fSize #find where to start if pypar.rank()pid > 0: m1.seek( seekStart ) #read next line l1 = m1.readline() # need to use readline with memory mapped files seekStart = m1.tell()   #tell previous rank my seek start to make their seek end   if pypar.rank()pid > 0: pyparqueues[pid-1].sendput( seekStart, pypar.rank()-1 ) if pypar.rank()pid < pypar.size()processes-1: seekEnd = pypar.receive( pyparqueues[pid].rankget()+1 )   m1.seek( seekStart ) l1 = m1.readline()  while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break   if   pypar logging.rankinfo( 'done' ) # add up the results if pid == 0: for p in range(1,pypar.size()processes): lines += pyparqueues[0].receiveget()  p  queues[0].put(lines) # the total lines counted else:  queues[0].put(lines)  m1.close() physical_file.close()  if __name__ for== p'__main__':  in range init_logger( 'main' ) if len(sys.argv) > 1,pypar: file_name = sys.sizeargv[1] else: logging.fatal( 'parameters required: file-name [processes]' ) exit():      t = time.time()  pyparprocesses = multiprocessing.sendcpu_count()  lines, p  if len(sys.argv) > 2: else processes = int(sys.argv[2]) queues=[] # a queue for each process for pid in range(processes): pyparqueues.sendappend( lines,multiprocessing.Queue() ) jobs=[] prev_pipe = 0  for pid in range(processes): linesp = pyparmultiprocessing.receiveProcess(0 target = getFileLineCount, args=(queues, pid, processes, file_name,) ) p.start() m1 jobs.closeappend(p) return jobs[0].join() #wait for counting to finish  lines = queues[0].get() logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) ) 

Here is a python function to use mpi (pypar) to distribute the line counting across machines/cores. My test improves counting a 20mil line file from 26 seconds to 7 seconds using an 8 core windows 64 server.

import pypar, os, logging, sys def getFileLineCount( file1 ): m1 = open(file1, "r")   #work out file size #divide up to farm out line counting fSize = os.stat(file1).st_size chunk = (fSize / pypar.size()) + 1 lines = 0   #first get max width of each field and the names seekStart = chunk * (pypar.rank()) seekEnd = chunk * (pypar.rank()+1) if seekEnd > fSize: seekEnd = fSize #find where to start if pypar.rank() > 0: m1.seek( seekStart ) #read next line l1 = m1.readline() seekStart = m1.tell()   #tell previous rank my seek start to make their seek end if pypar.rank() > 0: pypar.send( seekStart, pypar.rank()-1 ) if pypar.rank() < pypar.size()-1: seekEnd = pypar.receive( pypar.rank()+1 )   m1.seek( seekStart ) l1 = m1.readline() while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break   if pypar.rank() == 0: for p in range(1,pypar.size()): lines += pypar.receive( p ) for p in range(1,pypar.size()): pypar.send( lines, p ) else: pypar.send( lines,0 ) lines = pypar.receive(0) m1.close() return lines 

Here is a python program to use the multiprocessing library to distribute the line counting across machines/cores. My test improves counting a 20million line file from 26 seconds to 7 seconds using an 8 core windows 64 server. Note: not using memory mapping makes things much slower.

 #LineCount multiprocessing.py import multiprocessing, sys, time, os, mmap import logging, logging.handlers def init_logger(pid): console_format = 'P{0} %(levelname)s %(message)s'.format(pid) logger = logging.getLogger() # New logger at root level logger.setLevel( logging.INFO )  logger.handlers.append( logging.StreamHandler() )  logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) ) def getFileLineCount( queues, pid, processes, file1 ): init_logger(pid)  logging.info( 'start' ) physical_file = open(file1, "r") m1 = mmap.mmap( physical_file.fileno(), 0, None, mmap.ACCESS_READ )  #work out file size to divide up line counting fSize = os.stat(file1).st_size chunk = (fSize / processes) + 1 lines = 0 #get where I start and stop seekStart = chunk * (pid) seekEnd = chunk * (pid+1) if seekEnd > fSize: seekEnd = fSize #find where to start if pid > 0: m1.seek( seekStart ) #read next line l1 = m1.readline() # need to use readline with memory mapped files seekStart = m1.tell() #tell previous rank my seek start to make their seek end   if pid > 0: queues[pid-1].put( seekStart ) if pid < processes-1: seekEnd = queues[pid].get() m1.seek( seekStart ) l1 = m1.readline()  while len(l1) > 0: lines += 1 l1 = m1.readline() if m1.tell() > seekEnd or len(l1) == 0: break     logging.info( 'done' ) # add up the results if pid == 0: for p in range(1,processes): lines += queues[0].get()    queues[0].put(lines) # the total lines counted else:  queues[0].put(lines)  m1.close() physical_file.close()  if __name__ == '__main__':   init_logger( 'main' ) if len(sys.argv) > 1: file_name = sys.argv[1] else: logging.fatal( 'parameters required: file-name [processes]' ) exit()      t = time.time()  processes = multiprocessing.cpu_count()    if len(sys.argv) > 2:  processes = int(sys.argv[2]) queues=[] # a queue for each process for pid in range(processes): queues.append( multiprocessing.Queue() ) jobs=[] prev_pipe = 0  for pid in range(processes): p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) ) p.start()  jobs.append(p)  jobs[0].join() #wait for counting to finish  lines = queues[0].get() logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) ) 
deleted 603 characters in body
Source Link
Martlark
  • 14.6k
  • 14
  • 86
  • 103
Loading
Source Link
Martlark
  • 14.6k
  • 14
  • 86
  • 103
Loading