Python multiprocessing: understanding logic behind `chunksize`

multiprocessing pool chunksize
python multiprocessing pool exception handling
python multiprocessing using chunks
python-multiprocessing timeout
python multiprocessing does not scale
python processpoolexecutor map chunksize
python multiprocessing can t pickle
python multiprocessing not running in parallel

What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()? The .map() method seems to use an arbitrary heuristic for its default chunksize (explained below); what motivates that choice and is there a more thoughtful approach based on some particular situation/setup?

Example - say that I am:

  • Passing an iterable to .map() that has ~15 million elements;
  • Working on a machine with 24 cores and using the default processes = os.cpu_count() within multiprocessing.Pool().

My naive thinking is to give each of 24 workers an equally-sized chunk, i.e. 15_000_000 / 24 or 625,000. Large chunks should reduce turnover/overhead while fully utilizing all workers. But it seems that this is missing some potential downsides of giving large batches to each worker. Is this an incomplete picture, and what am I missing?


Part of my question stems from the default logic for if chunksize=None: both .map() and .starmap() call .map_async(), which looks like this:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

What's the logic behind divmod(len(iterable), len(self._pool) * 4)? This implies that the chunksize will be closer to 15_000_000 / (24 * 4) == 156_250. What's the intention in multiplying len(self._pool) by 4?

This makes the resulting chunksize a factor of 4 smaller than my "naive logic" from above, which consists of just dividing the length of the iterable by number of workers in pool._pool.

Lastly, there is also this snippet from the Python docs on .imap() that further drives my curiosity:

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.


Related answer that is helpful but a bit too high-level: Python multiprocessing: why are large chunksizes slower?.

Python multiprocessing: understanding logic behind `chunksize`, What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()? The .map() method seems to use an  Home › Python › Python multiprocessing understanding logic behind `chunksize` – Stack Overflow What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()? The .map() method seems to use an arbitrary heuristic for its default chunksize (explained below); …

About this answer

This answer is Part II of the accepted answer above.


7. Naive vs. Pool's Chunksize-Algorithm

Before going into details, consider the two gifs below. For a range of different iterable lengths, they show how the two compared algorithms chunk the passed iterable (it will be a sequence by then) and how the resulting tasks might be distributed. The order of workers is random and the number of distributed tasks per worker in reality can differ from this images for light taskels and or taskels in a Wide Scenario. As mentioned earlier, overhead is also not included here. For heavy enough taskels in a Dense Scenario with neglectable transmitted data-sizes, real computations draw a very similar picture, though.

As shown in chapter "5. Pool's Chunksize-Algorithm", with Pool's chunksize-algorithm the number of chunks will stabilize at n_chunks == n_workers * 4 for big enough iterables, while it keeps switching between n_chunks == n_workers and n_chunks == n_workers + 1 with the naive approach. For the naive algorithm applies: Because n_chunks % n_workers == 1 is True for n_chunks == n_workers + 1, a new section will be created where only a single worker will be employed.

Naive Chunksize-Algorithm:

You might think you created tasks in the same number of workers, but this will only be true for cases where there is no remainder for len_iterable / n_workers. If there is a remainder, there will be a new section with only one task for a single worker. At that point your computation will not be parallel anymore.

Below you see a figure similar to the one shown in chapter 5, but displaying the number of sections instead of the number of chunks. For Pool's full chunksize-algorithm (n_pool2), n_sections will stabilize at the infamous, hard coded factor 4. For the naive algorithm, n_sections will alternate between one and two.

For Pool's chunksize-algorithm, the stabilization at n_chunks = n_workers * 4 through the before mentioned extra-treatment, prevents creation of a new section here and keeps the Idling Share limited to one worker for long enough iterables. Not only that, but the the algorithm will keep shrinking the relative size of the Idling Share, which leads to an RDE value converging towards 100%.

"Long enough" for n_workers=4 is len_iterable=210 for example. For iterables equal or bigger than that, the Idling Share will be limited to one worker, a trait originally lost because of the 4-multiplication within the chunksize-algorithm in the first place.

The naive chunksize-algorithm also converges towards 100%, but it does so slower. The converging effect solely depends on the fact that the relative portion of the tail shrinks for cases where there will be two sections. This tail with only one employed worker is limited to x-axis length n_workers - 1, the possible maximum remainder for len_iterable / n_workers.

How do actual RDE values differ for the naive and Pool's chunksize-algorithm?

Below you find two heatmaps showing the RDE values for all iterable lengths up to 5000, for all numbers of workers from 2 up to 100. The color-scale goes from 0.5 to 1 (50%-100%). You will notice much more dark areas (lower RDE values) for the naive algorithm in the left heatmap. In contrast, Pool's chunksize-algorithm on the right draws a much more sunshiny picture.

The diagonal gradient of lower-left dark corners vs. upper-right bright corners, is again showing the dependence on the number of workers for what to call a "long iterable".

How bad can it get with each algorithm?

With Pool's chunksize-algorithm a RDE value of 81.25 % is the lowest value for the range of workers and iterable lengths specified above:

With the naive chunksize-algorithm, things can turn much worse. The lowest calculated RDE here is 50.72 %. In this case, nearly for half of the computation time just a single worker is running! So, watch out, proud owners of Knights Landing. ;)


8. Reality Check

In the previous chapters we considered a simplified model for the purely mathematical distribution problem, stripped from the nitty-gritty details which make multiprocessing such a thorny topic in the first place. To better understand how far the Distribution Model (DM) alone can contribute to explain observed worker utilization in reality, we will now take some looks at Parallel Schedules drawn by real computations.

Setup

The following plots all deal with parallel executions of a simple, cpu-bound dummy-function, which gets called with various arguments so we can observe how the drawn Parallel Schedule varies in dependence of the input values. The "work" within this function consists only of iteration over a range object. This is already enough to keep a core busy since we pass huge numbers in. Optionally the function takes some taskel-unique extra data which is just returned unchanged. Since every taskel comprises the exact same amount of work, we are still dealing with a Dense Scenario here.

The function is decorated with a wrapper taking timestamps with ns-resolution (Python 3.7+). The timestamps are used to calculate the timespan of a taskel and therefore enable the drawing of an empiric Parallel Schedule.

@stamp_taskel
def busy_foo(i, it, data=None):
    """Dummy function for CPU-bound work."""
    for _ in range(int(it)):
        pass
    return i, data


def stamp_taskel(func):
    """Decorator for taking timestamps on start and end of decorated
    function execution.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time_ns()
        result = func(*args, **kwargs)
        end_time = time_ns()
        return (current_process().name, (start_time, end_time)), result
    return wrapper

Pool's starmap method is also decorated in such a way that only the starmap-call itself is timed. "Start" and "end" of this call determine minimum and maximum on the x-axis of the produced Parallel Schedule.

We're going to observe computation of 40 taskels on four worker processes on a machine with these specs: Python 3.7.1, Ubuntu 18.04.2, Intel® Core™ i7-2600K CPU @ 3.40GHz × 8

The input values which will be varied are the number of iterations in the for-loop (30k, 30M, 600M) and the additionally send data size (per taskel, numpy-ndarray: 0 MiB, 50 MiB).

...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3  # 30e6, 600e6
DATA_MiB = 0  # 50

iterable = [
    # extra created data per taskel
    (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
    for i in range(LEN_ITERABLE)
]


with Pool(N_WORKERS) as pool:
    results = pool.starmap(busy_foo, iterable)

The shown runs below were handpicked to have the same ordering of chunks so you can spot the differences better compared to the Parallel Schedule from the Distribution Model, but don't forget the order in which the workers get their task is non-deterministic.

DM Prediction

To reiterate, the Distribution Model "predicts" a Parallel Schedule like we've seen it already before in chapter 6.2:

1st RUN: 30k iterations & 0 MiB data per taskel

Our first run here is very short, the taskels are very "light". The whole pool.starmap()-call only took 14.5 ms in total. You will notice, that contrary to with the DM, the idling is not restricted to the tail-section, but also takes place between tasks and even between taskels. That's because our real schedule here naturally includes all sorts of overhead. Idling here means just everything outside of a taskel. Possible real idling during a taskel is not captured how already mentioned before.

Further you can see, that not all workers get their tasks at the same time. That's due to the fact that all workers are fed over a shared inqueue and only one worker can read from it at a time. The same applies for the outqueue. This can cause bigger upsets as soon as you're transmitting non-marginal sizes of data how we will see later.

Furthermore you can see that despite the fact that every taskel comprises the same amount of work, the actual measured timespan for a taskel varies greatly. The taskels distributed to worker-3 and worker-4 need more time than the ones processed by the first two workers. For this run I suspect it is due to turbo boost not being available anymore on the cores for worker-3/4 at that moment, so they processed their tasks with a lower clock-rate.

The whole computation is so light that hardware or OS-introduced chaos-factors can skew the PS drastically. The computation is a "leaf on the wind" and the DM-prediction has little significance, even for a theoretically fitting scenario.

2nd RUN: 30M iterations & 0 MiB data per taskel

Increasing the number of iterations in the for-loop from 30,000 to 30 millions, results in a real Parallel Schedule which is close to a perfect match with the one predicted by data provided by the DM, hurray! The computation per taskel is now heavy enough to marginalize the idling parts at the start and in between, letting only the big Idling Share visible which the DM predicted.

3rd RUN: 30M iterations & 50 MiB data per taskel

Keeping the 30M iterations, but additionally sending 50 MiB per taskel back and forth skews the picture again. Here the queueing-effect is well visible. Worker-4 needs to wait longer for its second task than Worker-1. Now imagine this schedule with 70 workers!

In case the taskels are computationally very light, but afford a notable amount of data as payload, the bottleneck of a single shared queue can prevent any additional benefit of adding more workers to the Pool, even if they are backed by physical cores. In such a case, Worker-1 could be done with its first task and awaiting a new one even before Worker-40 has gotten its first task.

It should become obvious now why computation times in a Pool don't always decrease lineary with the number of workers. Sending relatively big amounts of data along can lead to scenarios where most of the time is spend on waiting for the data to be copied into the address space of a worker and only one worker can be fed at once.

4th RUN: 600M iterations & 50 MiB data per taskel

Here we send 50 MiB again, but raise the number of iterations from 30M to 600M, which brings the total computation time up from 10 s to 152 s. The drawn Parallel Schedule again, is close to a perfect match with the predicted one, the overhead through the data copying is marginalized.


9. Conclusion

The discussed multiplication by 4 increases scheduling flexibility, but also leverages the unevenness in taskel-distributions. Without this multiplication, the Idling Share would be limited to a single worker even for short iterables (for DM with Dense Scenario) . Pool's chunksize-algorithm needs input-iterables to be of certain size to regain that trait.

As this answer has hopefully shown, Pool's chunksize-algorithm leads to a better core utilization on average compared to the naive approach, at least for the average case and as long overhead is not considered. The naive algorithm here can have a Distribution Efficiency (DE) as low as ~51%, while Pool's chunksize algorithm has its low at ~81%. DE however doesn't comprise Parallelization Overhead (PO) like IPC. Chapter 8 has shown that DE still can have great predictive power for the Dense Scenario with marginalized overhead.

Despite the fact that Pool's chunksize-algorithm achieves a higher DE compared to the naive approach, it does not provide optimal taskel distributions for every input constellation. While a simple static chunking-algorithm can not optimize (overhead-including) Parallelization Efficiency (PE), there is no inherent reason why it could not always provide a Relative Distribution Efficiency (RDE) of 100 %, that means, the same DE as with chunksize=1. A simple chunksize-algorithm consists only of basic math and is free to "slice the cake" in any way.

Unlike Pool's implementation of an "equal-size-chunking" algorithm, an "even-size-chunking" algorithm would provide a RDE of 100% for every len_iterable / n_workers combination. An even-size-chunking algorithm would be slightly more complicated to implement in Pool's source, but can be modulated on top of the existing algorithm just by packaging the tasks externally (I'll link from here in case I drop an Q/A on how to do that).

Python multiprocessing: understanding logic behind chunksize , What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()? The .map() method seems to use an arbitrary heuristic for  Python multiprocessing: understanding logic behind `chunksize` What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map() ? From the docs:

I think that part of what you're missing is that your naive estimate assumes that each unit of work takes the same amount of time in which case your strategy would be the best. But if some jobs finish sooner than others then some cores may become idle waiting for the slow jobs to finish.

Thus, by breaking the chunks up into 4 times more pieces, then if one chunk finished early that core can start the next chunk ( while the other cores keep working on their slower chunk).

I don't know why they picked the factor 4 exactly but it would be a trade off between minimising the overhead of the map code ( which wants the largest chunks possible) and balancing chunks taking different amount of times ( which wants the smallest chunk possible).

Python multiprocessing understanding logic behind , A fight I've had over and over again with folks is that I think chunking is more fundamental to "using multiple cores to get work done faster" than  Python multiprocessing: understanding logic behind `chunksize` (2) What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map() ? The .map() method seems to use an arbitrary heuristic for its default chunksize (explained below); what motivates that choice and is there a more thoughtful approach based on some particular situation/setup?

Python Multiprocessing: Understanding Logic Behind chunksize , View the details of “Python Multiprocessing: Understanding Logic Behind chunksize” as bookmarked by ~pycoders. You can read user notes and if you are​  Using Python 3.5 multiprocessing.pool.startmap with multiple arguments and chunksize is always 1 2 Separate one big dictionary into smaller dictionaries inside a list

Python multiprocessing understanding logic behind `chunksize , Python multiprocessing: understanding logic behind `chunksize`. python multiprocessing pool exception handling python multiprocessing not using all cores Python 3 multiprocessing: optimal chunk size. each worker is sent a chunk of chunksize tasks at a time for Python multiprocessing: understanding logic behind

Python multiprocessing: understanding logic behind , What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()? The .map() method seems to use an arbitrary heuristic for  Python multiprocessing understanding logic behind `chunksize` – Stack Overflow What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()? The.map() method seems to use an arbitrary heuristic for its default chunksize (explained below);

Comments
  • 4 Is arbitrary and the whole calculation of chunksize is a heuristic. The relevant factor is how much your actual processing time may vary. A bit more on this here until I have time for an answer if still needed then.
  • Have you checked this question?
  • Thanks @AndrewNaguib, I had actually not stumbled across that one somehow
  • Just to let you know: I didn't forget this question. In fact, I'm working on a canonical answer of biblical dimensions (lots of useful code snippets and fancy graphics) since the day you asked. The bounty still came 1-2 weeks too early to have it all complete, but I'm confident I will be able to drop something close enough before deadline.
  • @BradSolomon You welcome :). Does it answer your question tho?
  • One of the most epic answers I've seen on SO.
  • Oh this was your short answer :P
  • But forreal.. this is an excellent answer. I have starred the question for future cases where I want to understand this better. Skimming through it taught me a great deal already! Thanks
  • Thanks for your answer. I want to know which tool you use to draw those figure. Thanks.
  • @L.Iridium You're welcome! I did use matplotlib where possible and otherwise ... LibreOffice calc + Pinta (basic image editing). Yeah I know ... but it works, somehow. ;)