2

I have a simple script in which a function does some calculation on a Pandas.Series object which I want to parallel process. I have made the Pandas.Series object as a shared memory object so that different processess can use it.

My code is given below.

from multiprocessing import shared_memory import pandas as pd import numpy as np import multiprocessing s = pd.Series(np.random.randn(50)) s = s.to_numpy() # Create a shared memory variable shm which can be accessed by other processes shm_s = shared_memory.SharedMemory(create=True, size=s.nbytes) b = np.ndarray(s.shape, dtype=s.dtype, buffer=shm_s.buf) b[:] = s[:] # create a dictionary to store the results and which can be accessed after the processes works mgr = multiprocessing.Manager() pred_sales_all = mgr.dict() forecast_period =1000 # my sudo function to run parallel process def predict_model(b,model_list_str,forecast_period,pred_sales_all): c = pd.Series(b) temp_add = model_list_str + forecast_period temp_series = c.add(model_list_str) pred_sales_all[str(temp_add)] = temp_series # parallel processing with shared memory if __name__ == '__main__': model_list = [1, 2, 3, 4] all_process = [] for model_list_str in model_list: # setup a process to run process = multiprocessing.Process(target=predict_model, args=(b,model_list_str, forecast_period, pred_sales_all)) # start the process we need to join() them separately else they will finish execution before moving to next process process.start() # Append all process together all_process.append(process) # Finish execution of all process for p in all_process: p.join() 

This code is working in ubuntu I checked. But when I run this in windows I am getting the following error.

RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. 

Also I tried the solution mentioned here in the stack-overflow question

What is wrong with the code and can someone solve the issue ? Is my parallelization code wrong ?

10
  • Defining methods is fine, but you really don't want to do any processing outside of if __name__ == '__main__': when multiprcessing. Commented Sep 4, 2021 at 14:31
  • I am running multiprocessing inside the if __name__=='__main__': only. Commented Sep 4, 2021 at 14:44
  • First, mgr = multiprocessing.Manager() creates a process so if you are running under Windows, that statement needs to be moved to inside the if __name__ == '__main__': block. Also, buffer = s will clobber the previous assignment to buffer. You have other statements at global scope that should also be moved. I don't really think you are sharing anything. Commented Sep 4, 2021 at 14:46
  • will s = s.to_numpy() shm_s = shared_memory.SharedMemory(create=True, size=s.nbytes) b=np.ndarray(s.shape,dtype=s.dtype,buffer=shm_s.buf) b[:]=a[:]. Then passing b in the args of the pocess solve my shared memory issue ? Commented Sep 4, 2021 at 18:12
  • @Booboo Can you check my code, Is it correct i am copying the data to b and passing b. Is it sharable now ? I am getting the output though. Commented Sep 4, 2021 at 18:22

1 Answer 1

1

See my comments about moving statements at global scope to within the if __name__ == '__main__': block. Otherwise they will be executed by each subprocess as part of their initialization and there is no point in that. Moreover, the statement mgr = multiprocessing.Manager() has to be moved because this results in the creation of a new process.

from multiprocessing import shared_memory import pandas as pd import numpy as np import multiprocessing # my sudo function to run parallel process def predict_model(b,model_list_str,forecast_period,pred_sales_all): c = pd.Series(b) temp_add = model_list_str + forecast_period temp_series = c.add(model_list_str) pred_sales_all[str(temp_add)] = temp_series # parallel processing with shared memory if __name__ == '__main__': forecast_period =1000 s = pd.Series(np.random.randn(50)) s = s.to_numpy() # Create a shared memory variable shm which can be accessed by other processes shm_s = shared_memory.SharedMemory(create=True, size=s.nbytes) b = np.ndarray(s.shape, dtype=s.dtype, buffer=shm_s.buf) b[:] = s[:] # create a dictionary to store the results and which can be accessed after the processes works mgr = multiprocessing.Manager() pred_sales_all = mgr.dict() model_list = [1, 2, 3, 4] all_process = [] for model_list_str in model_list: # setup a process to run process = multiprocessing.Process(target=predict_model, args=(b,model_list_str, forecast_period, pred_sales_all)) # start the process we need to join() them separately else they will finish execution before moving to next process process.start() # Append all process together all_process.append(process) # Finish execution of all process for p in all_process: p.join() print(pred_sales_all) 

Prints:

{'1004': 0 4.084857 1 2.871219 2 5.644114 3 2.146666 4 3.395946 5 3.362894 6 2.366361 7 3.209334 8 4.226132 9 3.158135 10 4.090616 11 5.299314 12 3.155669 13 5.602719 14 3.107825 15 1.809457 16 3.938050 17 1.144159 18 3.286502 19 4.302809 20 3.917498 21 5.055629 22 2.230594 23 3.255307 24 2.459930 25 3.591691 26 3.248188 27 3.635262 28 4.547589 29 4.883547 30 2.635874 31 5.551306 32 2.434944 33 5.358516 34 4.606322 35 5.383417 36 2.886735 37 4.267562 38 2.053871 39 3.863734 40 3.233764 41 4.089593 42 4.754793 43 4.125400 44 2.174840 45 7.207996 46 2.925736 47 4.604850 48 4.067672 49 4.397330 dtype: float64, '1001': 0 1.084857 1 -0.128781 2 2.644114 3 -0.853334 4 0.395946 5 0.362894 6 -0.633639 7 0.209334 8 1.226132 9 0.158135 10 1.090616 11 2.299314 12 0.155669 13 2.602719 14 0.107825 15 -1.190543 16 0.938050 17 -1.855841 18 0.286502 19 1.302809 20 0.917498 21 2.055629 22 -0.769406 23 0.255307 24 -0.540070 25 0.591691 26 0.248188 27 0.635262 28 1.547589 29 1.883547 30 -0.364126 31 2.551306 32 -0.565056 33 2.358516 34 1.606322 35 2.383417 36 -0.113265 37 1.267562 38 -0.946129 39 0.863734 40 0.233764 41 1.089593 42 1.754793 43 1.125400 44 -0.825160 45 4.207996 46 -0.074264 47 1.604850 48 1.067672 49 1.397330 dtype: float64, '1002': 0 2.084857 1 0.871219 2 3.644114 3 0.146666 4 1.395946 5 1.362894 6 0.366361 7 1.209334 8 2.226132 9 1.158135 10 2.090616 11 3.299314 12 1.155669 13 3.602719 14 1.107825 15 -0.190543 16 1.938050 17 -0.855841 18 1.286502 19 2.302809 20 1.917498 21 3.055629 22 0.230594 23 1.255307 24 0.459930 25 1.591691 26 1.248188 27 1.635262 28 2.547589 29 2.883547 30 0.635874 31 3.551306 32 0.434944 33 3.358516 34 2.606322 35 3.383417 36 0.886735 37 2.267562 38 0.053871 39 1.863734 40 1.233764 41 2.089593 42 2.754793 43 2.125400 44 0.174840 45 5.207996 46 0.925736 47 2.604850 48 2.067672 49 2.397330 dtype: float64, '1003': 0 3.084857 1 1.871219 2 4.644114 3 1.146666 4 2.395946 5 2.362894 6 1.366361 7 2.209334 8 3.226132 9 2.158135 10 3.090616 11 4.299314 12 2.155669 13 4.602719 14 2.107825 15 0.809457 16 2.938050 17 0.144159 18 2.286502 19 3.302809 20 2.917498 21 4.055629 22 1.230594 23 2.255307 24 1.459930 25 2.591691 26 2.248188 27 2.635262 28 3.547589 29 3.883547 30 1.635874 31 4.551306 32 1.434944 33 4.358516 34 3.606322 35 4.383417 36 1.886735 37 3.267562 38 1.053871 39 2.863734 40 2.233764 41 3.089593 42 3.754793 43 3.125400 44 1.174840 45 6.207996 46 1.925736 47 3.604850 48 3.067672 49 3.397330 dtype: float64} 
Sign up to request clarification or add additional context in comments.

1 Comment

Apologies for the intrusion, Can you help me with a variation of your previous solution please

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.