2

I was trying to look for a Bidirectional/Omnidirectional Queue to send jobs back and forth between processes.

the best solution I could come up with was to use two multiprocessing queues that are filled from one process and read through the other (or a Pipe which is apparently faster, still haven't tried it yet).

I came across this answer that describes the difference between a Pipe and a Queue, it states that

A Queue() can have multiple producers and consumers.

I know a queue can be shared between multiple processes( > 2 processes ), but how should I organize the communication between the processes so that a message has a targeted process, or at least the process does not read the jobs it inserted to the queue, and how I scale it to more than 2 processes.

EX: I have 2 (or more) Processes (A, B) they they share the same Queue, A needs to send a job to B and B sends a job to A, if I simply use queue.put(job), the job might be read from either processes depending on who called queue.get() first, so the job that was put by A intended to B might be read by A, which is not the targeted process, if I added a flag of which process it should be executed by, it would destroy the sequentiality of the queue.

1 Answer 1

2

For those facing the same problem, I have found the solution, it is multiprocessing.Pipe() it is faster than queues but it only works if you have 2 processes.

Here is a simple example to help

import multiprocessing as mp from time import time def process1_function(conn, events): for event in events: # send jobs to the process_2 conn.send((event, time())) print(f"Event Sent: {event}") # check if there are any messages in the pipe from process_2 if conn.poll(): # read the message from process_2 print(conn.recv()) # continue checking the messages in the pipe from process_2 while conn.poll(): print(conn.recv()) def process2_function(conn): while True: # check if there are any messages in the pipe from process_1 if conn.poll(): # read messages in the pipe from process_1 event, sent = conn.recv() # send messages to process_1 conn.send(f"{event} complete, {time() - sent}") if event == "eod": break conn.send("all events finished") def run(): events = ["get up", "brush your teeth", "shower", "work", "eod"] conn1, conn2 = mp.Pipe() process_1 = mp.Process(target=process1_function, args=(conn1, events)) process_2 = mp.Process(target=process2_function, args=(conn2,)) process_1.start() process_2.start() process_1.join() process_2.join() if __name__ == "__main__": run() 
Sign up to request clarification or add additional context in comments.

1 Comment

This does not appear to work in Windows.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.