Generator Function Of Child Processes Runs In The Parent Process
Solution 1:
One way of achieving communications between two processes is by using a Queue
instance. In the following example, instead of creating two individual processes I have opted instead to create a process pool of two processes:
from multiprocessing import Pool, Manager
import os
defp(q):
pid = os.getpid()
q.put(pid)
for i inrange(5):
q.put(i)
q.put(None) # signify "end of file"defmain():
manager = Manager()
q1 = manager.Queue()
q2 = manager.Queue()
with Pool(2) as pool: # create a pool of 2 processes
pool.apply_async(p, args=(q1,))
pool.apply_async(p, args=(q2,))
q1_eof = False
q2_eof = Falsewhilenot q1_eof ornot q2_eof:
ifnot q1_eof:
obj = q1.get() # blocking getif obj isNone:
q1_eof = Trueelse:
print(obj)
ifnot q2_eof:
obj = q2.get() # blocking getif obj isNone:
q2_eof = Trueelse:
print(obj)
if __name__ == '__main__':
main()
Prints:
5588241040011223344
The code that uses explicit Process
instances rather than creating a pool follows (I don't tend to subclass the Process
class as that requires more coding:)
from multiprocessing import Process, Queue
import os
defp(q):
pid = os.getpid()
q.put(pid)
for i inrange(5):
q.put(i)
q.put(None) # signify "end of file"defmain():
q1 = Queue()
q2 = Queue()
p1 = Process(target=p, args=(q1,))
p1.start()
p2 = Process(target=p, args=(q2,))
p2.start()
q1_eof = False
q2_eof = Falsewhilenot q1_eof ornot q2_eof:
ifnot q1_eof:
obj = q1.get() # blocking getif obj isNone:
q1_eof = Trueelse:
print(obj)
ifnot q2_eof:
obj = q2.get() # blocking getif obj isNone:
q2_eof = Trueelse:
print(obj)
p1.join()
p2.join()
if __name__ == '__main__':
main()
Important Note
The two coding examples (one that uses a process pool and one that doesn't) uses two different types of Queue instances.
See Python multiprocessing.Queue vs multiprocessing.manager().Queue()
You can always use multiprocessing.manager().Queue()
in all cases (I generally do) but at a possible loss of some efficiency.
Solution 2:
I expect that speaking about processing with generators you really want next things to be accomplished:
- Main process generates some tasks lazily through some generator, tasks are represented by some data (
arg
). - These tasks might be generated by generator very slowly, e.g. by fetching chunks of data from Internet, hence should be processed one by one as soon as they are ready to be processed.
- Main process sends these tasks to several child processes to be processed.
- Processing in children might also take slow and random amount of time.
- Children should report some results (sucessfully processed result data or encoded error in case of failure).
- Main process should gather all results also lazily, i.e. as soon as they are ready reports them.
- Results inside main process could be gathered in strictly same order as generated (strict order
True
) or in arbitrary order as soon as they are processed (strict orderFalse
), second variant may be considerably faster. - All CPU cores should be used for efficiency, one process per core.
For all these purposes I created example template code that you can use for your specific problem:
def ProcessTask(arg):
import time, osprint('Started task', arg[0], arg[1], 'by', os.getpid())
time.sleep(arg[1])
print('Finished task', arg[0], arg[1], 'by', os.getpid())
return (arg[0], arg[1] * 2)
def Main():
import multiprocessing as mp
def GenTasks(n):
import random, os, timefor i in range(n):
t = round(random.random() * 2., 3)
print('Created task', i, t, 'by', os.getpid())
yield (i, t)
time.sleep(random.random())
num_tasks = 4for strict_order in [True, False]:
print('\nIs strict order', strict_order)
with mp.Pool() as pool:
for res in (pool.imap_unordered, pool.imap)[strict_order](
ProcessTask, GenTasks(num_tasks)
):
print('Result from task', res)
if __name__ == '__main__':
Main()
Outputs:
IsstrictorderTrue
Created task 00.394by10536
Created task 10.357by10536
Started task 00.394by8740
Started task 10.357by5344
Finished task 10.357by5344
Finished task 00.394by8740
Result from task (0, 0.788)
Result from task (1, 0.714)
Created task 20.208by10536
Started task 20.208by5344
Finished task 20.208by5344
Result from task (2, 0.416)
Created task 30.937by10536
Started task 30.937by8740
Finished task 30.937by8740
Result from task (3, 1.874)
IsstrictorderFalse
Created task 01.078by10536
Started task 01.078by7256
Created task 10.029by10536
Started task 10.029by5440
Finished task 10.029by5440
Result from task (1, 0.058)
Finished task 01.078by7256
Result from task (0, 2.156)
Created task 21.742by10536
Started task 21.742by5440
Created task 30.158by10536
Started task 30.158by7256
Finished task 30.158by7256
Result from task (3, 0.316)
Finished task 21.742by5440
Result from task (2, 3.484)
PS:
- In the previous code and when using
multiprocessing
in general same single module-script is used by both main and child processes, main and children all start by executing whole script.if __name__ == '__main__':
block is run only by main process, the rest of module's code is executed both by main and children. - Good practice is to put everything what is needed to execute by main into one function (
Main()
in my case) and by children into another function (ProcessTask()
in my case), and some other functions and variable into global scope that is shared and run by both main and children (I don't have anything shared in code). - Processing function (
ProcessTask()
in my code) should be in global scope of module. - Other documentation regarding
multiprocessing
is available here.
Post a Comment for "Generator Function Of Child Processes Runs In The Parent Process"