Skip to content Skip to sidebar Skip to footer

A Threadpoolexecutor Inside A Processpoolexecutor

I am new to the futures module and have a task that could benefit from parallelization; but I don't seem to be able to figure out exactly how to setup the function for a thread and

Solution 1:

I'll give you working code that mixes processes with threads for solving the problem, but it's not what you're expecting ;-) First thing is to make a mock program that doesn't endanger your real data. Experiment with something harmless. So here's the start:

classParticle:def__init__(self, i):
        self.i = i
        self.fitness = None
    defgetfitness(self):
        self.fitness = 2 * self.i

Now we have something to play with. Next some constants:

MAX_PROCESSES = 3MAX_THREADS = 2# per processCHUNKSIZE = 100

Fiddle those to taste. CHUNKSIZE will be explained later.

The first surprise for you is what my lowest-level worker function does. That's because you're overly optimistic here:

Since the side-effects of calling p.getFitness are stored in each particle itself, I don't have to worry about getting a return from futures.ProcessPoolExecutor().

Alas, nothing done in a worker process can have any effect on the Particle instances in your main program. A worker process works on copies of Particle instances, whether via a copy-on-write implementation of fork() or because it's working on a copy made from unpickling a Particle pickle passed across processes.

So if you want your main program to see the fitness results, you need to arrange to send information back to the main program. Because I don't know enough about your actual program, here I'm assuming that Particle().i is a unique integer, and that the main program can easily map integers back to Particle instances. With that in mind, the lowest-level worker function here needs to return a pair: the unique integer and the fitness result:

defthread_worker(p):
    p.getfitness()
    return (p.i, p.fitness)

Given that, it's easy to spread a list of Particles across threads, and return a list of (particle_id, fitness) results:

defproc_worker(ps):
    import concurrent.futures as cf
    with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
        result = list(e.map(thread_worker, ps))
    return result

Notes:

  1. That's the function each worker process will run.
  2. I'm using Python 3, so use list() to force e.map() to materialize all the results in a list.
  3. As mentioned in a comment, under CPython spreading CPU-bound tasks across threads is slower than doing them all in a single thread.

It only remains to write code to spread a list of Particles across processes, and retrieve the results. This is dead easy to do with multiprocessing, so that's what I'm going to use. I have no idea whether concurrent.futures can do it (given that we're also mixing in threads), but don't care. But because I'm giving you working code, you can play with that and report back ;-)

if __name__ == "__main__":
    import multiprocessing

    particles = [Particle(i) for i inrange(100000)]
    # Note the code below relies on that particles[i].i == iassertall(particles[i].i == i for i inrange(len(particles)))

    pool = multiprocessing.Pool(MAX_PROCESSES)
    for result_list in pool.imap_unordered(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i inrange(0, len(particles), CHUNKSIZE))):
        for i, fitness in result_list:
            particles[i].fitness = fitness

    pool.close()
    pool.join()

    assertall(p.fitness == 2*p.i for p in particles)

Notes:

  1. I'm breaking the list of Particles into chunks "by hand". That's what CHUNKSIZE is for. That's because a worker process wants a list of Particles to work on, and in turn that's because that's what the futuresmap() function wants. It's a Good Idea to chunk up work regardless, so you get some real bang for the buck in return for the per-invocation interprocess overheads.
  2. imap_unordered() makes no guarantees about the order in which results are returned. That gives the implementation more freedom to arrange work as efficiently as possible. And we don't care about the order here, so that's fine.
  3. Note that the loop retrieves the (particle_id, fitness) results, and modifies the Particle instances accordingly. Perhaps your real .getfitness makes other mutations to Particle instances - can't guess. Regardless, the main program will never see any mutations made in workers "by magic" - you have to explicitly arrange for that. In the limit, you could return (particle_id, particle_instance) pairs instead, and replace the Particle instances in the main program. Then they'd reflect all mutations made in worker processes.

Have fun :-)

Futures all the way down

Turns out it was very easy to replace multiprocessing. Here are the changes. This also (as mentioned earlier) replaces the original Particle instances, so as to capture all mutations. There's a tradeoff here, though: pickling an instance requires "a lot more" bytes than pickling a single "fitness" result. More network traffic. Pick your poison ;-)

Returning the mutated instance just requires replacing the last line of thread_worker(), like so:

return (p.i, p)

Then replace all of the "main" block with this:

defupdate_fitness():
    import concurrent.futures as cf
    with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
        for result_list in e.map(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i inrange(0, len(particles), CHUNKSIZE))):
            for i, p in result_list:
                particles[i] = p

if __name__ == "__main__":
    particles = [Particle(i) for i inrange(500000)]
    assertall(particles[i].i == i for i inrange(len(particles)))

    update_fitness()

    assertall(particles[i].i == i for i inrange(len(particles)))
    assertall(p.fitness == 2*p.i for p in particles)

The code is very similar to the multiprocessor dance. Personally, I'd use the multiprocessing version, because imap_unordered is valuable. That's a problem with simplified interfaces: they often buy simplicity at the cost of hiding useful possibilities.

Solution 2:

First, are you sure to leverage from running multiple thread while loading all your cores with processes? If it is cpu-bound, hardly yes. At least some tests has to be made.

If adding threads leverage your performance, the next question is whether one can achive better performance with hand-made load balancing, or automatic. By hand-made I mean careful workload partitioning into chunks of similar computational complexity and instatiating a new task processor per chunk, your orinal but doubted solution. By automatic, creation of pool of processes/threads and communication on work queue for new tasks, that one you strive for. In my view, first approach is one of Apache Hadoop paradigm, second is implemented by works queue processors, such as Celery. First approach may suffer from some tasks chunks being slower and running while others completed, second adds commutication and waiting-on-task overheads, and this is second point of performance tests to be made.

Last, if you wish to have a static collection of processes with multithreads within, AFAIK, you can't achive it with concurrent.futures as is, and have to modify it a bit. I don't know, whether there are existing solutions for this task, but as concurrent is a pure python solution (with no C code), it can easely be done. Work processor is defined in _adjust_process_countroutine of ProcessPoolExecutor class, and subclassing and overriding it with multi-threaded approach is rather straigtforward, you just have to supply your custom _process_worker, based on concurrent.features.thread

Original ProcessPoolExecutor._adjust_process_count for reference:

def_adjust_process_count(self):
    for _ inrange(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                      self._result_queue))
        p.start()
        self._processes[p.pid] = p

Solution 3:

This is a generalized answer that leverages the threadedprocess package which implements ThreadedProcesPoolExecutor, allowing a combined use of a thread pool inside a process pool. Below is a somewhat general-purpose utility function which uses it:

import concurrent.futures
import logging
from typing importCallable, Iterable, Optionalimport threadedprocess

log = logging.getLogger(__name__)


defconcurrently_execute(fn: Callable, fn_args: Iterable, max_processes: Optional[int] = None, max_threads_per_process: Optional[int] = None) -> None:
    """Execute the given callable concurrently using multiple threads and/or processes."""# Ref: https://stackoverflow.com/a/57999709/if max_processes == 1:
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_threads_per_process)
    elif max_threads_per_process == 1:
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_processes)  # type: ignoreelse:
        executor = threadedprocess.ThreadedProcessPoolExecutor(max_processes=max_processes, max_threads=max_threads_per_process)

    if max_processes and max_threads_per_process:
        max_workers = max_processes * max_threads_per_process
        log.info("Using %s with %s processes and %s threads per process, i.e. with %s workers.", executor.__class__.__name__, max_processes, max_threads_per_process, max_workers)

    with executor:
        futures = [executor.submit(fn, *fn_args_cur) for fn_args_cur in fn_args]

    for future in concurrent.futures.as_completed(futures):
        future.result()  # Raises exception if it occurred in process worker.

Post a Comment for "A Threadpoolexecutor Inside A Processpoolexecutor"