1

I try to use python ProcessPoolExecutor to calculate some FFT parallel, see following code:

import concurrent.futures import numpy as np from scipy.fft import fft def fuc(sig): C = fft(sig,axis=-1) return C def main() P, M, K = 20, 30, 1024 FKP = np.array([P,M,K],dtype='cdouble') fkp = np.array([P,M,K],dtype='float32') fkp = np.random.rand(P,M,K) with concurrent.futures.ProcessPoolExecutor(max_workers=4) as ex: results = ex.map(fuc,(fkp[p,m].reshape(1,K) for p in range(P) for m in range(M))) FKP = list(results) if __name__ == '__main__': main() 

questions:

  1. why the kernel keeps busy, but I did not see 4 workers from windows task manager?
  2. do I use the right way to get parallel calculated results in line "FKP = list(results)"?

1 Answer 1

0

Q1 :
" why the kernel keeps busy, but I did not see 4 workers from windows task manager? "

A1 :
Let's solve this in code itself :

import os import time ... def fuc( sig ): print( ( "INF[{0:}]: fuc() starts " + "running in process[{1:}]" + "-called-from-process[{2:}]" ).format( time.get_perf_ns(), os.getpid(), os.getppid() ) ) C = fft( sig, axis = -1 ) print( ( "INF[{0:}]: fuc() FFT done " + "running in process[{1:}]" + "-called-from-process[{2:}]" ).format( time.get_perf_ns(), os.getpid(), os.getppid() ) ) return C 

This code will self-document, when, what, how long actually computes the FFT-part of the plan.


Q2 :
" do I use the right way to get parallel calculated results in line "FKP = list(results)"? "

A2 :
Yes, yet at a set of remarkable add-on overhead costs for each SER/COMMS/DES process-to-process border-crossing, where all data gets SER/DES coded ( pickle.dumps()-alike CPU/RAM costs in [TIME]- + [SPACE]-Domains + nonzero ipc-p2p-transfer times ) :

def Pinf(): print( ( "NEW[{0:}]: ProcessPoolExecutor process-pool has " + "started process[{1:}]" + "-called-from-process[{2:}]" ).format( time.get_perf_ns(), os.getpid(), os.getppid() ) ) def main(): ... # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - print( ( "INF[{0:}]: context-manager" + 30*"_" + " entry point" ).format( time.get_perf_ns() ) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - with concurrent.futures.ProcessPoolExecutor( max_workers = 4, initializer = Pinf ) as ex: # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - print( ( "INF[{0:}]: context-manager" + " is to start .map()" ).format( time.get_perf_ns() ) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - results = ex.map( fuc, ( fkp[p,m].reshape( 1, K ) for p in range( P ) for m in range( M ) ) ) ... # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - print( ( "INF[{0:}]: context-manager" + " .map() returned / __main__ has received all <_results_>" ).format( time.get_perf_ns() ) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - pass # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - print( ( "INF[{0:}]: context-manager" + 30*"_" + " exited" ).format( time.get_perf_ns() ) ... print( type( results ) ) ... 

For the actual add-on costs of each process-pool process instantiation, see the reported ns-traces. Details are platform specific as { MacOS | Linux | Windows }-methods for spawning new processes differ a lot. The same is valid for Python-versions, as more recent Py3 versions do well different scope of calling Python-interpreter process copying, than was common in Py2 and earlier versions of Py3.x - some copying the whole, stateful copy of the calling Python-interpreter, with its complete replica of data, file-descriptors and likes - bearing thus even larger process-instantiation costs, due to all associated RAM-allocations for storing the n-many replicas of the calling Python-interpreter.

Given the scaling :

>>> len( [ ( p, m ) for p in range( P ) for m in range( M ) ] ) 600 

efficiency matters. Passing just one tuple of ( p_start, p_end, m_start, m_end ) with indices of sub-ranges to 4 processes, where the FFT-processing of signal-sections shall take place and return sub-lists of FFT-results thereof, will avoid passing the same, static data many times in small chunks and completely avoid 596x passing the ( CPU- RAM- and latency-wise ) expensive SER/COMMS/DES-SED/COMMS/DES ipc-p2p DATA-passing corridor at all.

For more details you may like to re-read this and this.

Sign up to request clarification or add additional context in comments.

1 Comment

Many thanks for your detailed answers! I use Windows10, python 3.10.2. I copy above code, but it gives error (I am new in python) : Input In [10] with concurrent.futures.ProcessPoolExecutor(max_workers = 4,initializer = Pinf) as ex: ^ SyntaxError: invalid syntax

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.