dask: shared memory in parallel model
dask threads vs processes
I've read the dask documentation, blogs and SO, but I'm still not 100% clear on how to do it. My use case:
- I have about 10GB of reference data. Once loaded they are read-only. Usually we are loading them into Dask/Pandas dataframes
- I need these ref-data to process (enrich, modify, transform) about 500 mio events a day (multiple files)
- The "process" is a pipeline of about 40 tasks. Execution sequence is relevant (dependencies).
- Each individual task is not complicated or time consuming, mostly lookups, enrichments, mappings, etc.
- There are no dependencies between the events. In theory I could process every event by a separate thread, combine the output into a single file and I'm done. The output events don't even need to be in the same order as the input events.
- we can massively parallalize event processing
- Every parallel thread needs the same 10 GB of (raw) ref-data
- Processing a single event means applying a sequence/pipeline of 40 tasks onto them
- Each individual Task is not time consuming (read ref-data and modify the event)
Possible pitfalls / issues:
- spend more time on serialization/deserialisation rather then processing the data (we did experience this in some of our trials which used a pipe-like approaches)
- ref-data are loaded multiple times, once by each (parallel) process
- preferabbly I would like to dev/test it on my laptop, but I don't have enough memory to load the ref-data. May be if the solution would leverage memory_maps?
The most efficient solution seems to be, if we were able to load the ref-data in memory only once, make it available read-only to multiple other processes processing the events
Scale out to multiple computers by loading the ref-data in each computer. Push filenames to the computers for execution.
Any idea how to achieve this?
Thanks a lot for your help
I have also came across the similar issue of running embarissingly parallel jobs that were all fetching data in the same lookup "reference" table (or any big-memory read-only variable needed by each instance of the parallel process. As long as you stay in an environment which follow the "copy-on-write" semantics (e.g. linux), placing the lookup table in the global scope always worked very efficiently as explained nicely here: Shared-memory objects in multiprocessing
Here is a simple parallel workflow:
from multiprocessing import Pool # Load your reference data, do that only once # here in the parent process my_ref_lookup = load_ref_data(your_data_file) def your_parallel_function(my_file_path): my_new_data = load_data(my_file_path) # process my_new_data with some lookup in my_ref_lookup # which is known from the parent process. processed_data = do_stuff(my_new_data) # you could here write something on disk # and/or return the processed_data return processed_data with Pool(processes = 5) as Pool: list_of_result = Pool.map(your_parallel_function, your_list_of_file_paths)
Here the execution of
your_parallel_function will execute in parallel over e.g. 5 workers, fetching 5 files inside
your_list_of_file_paths at a time and all child processes will have access to
my_ref_lookup without having to copy them.
After some time spent with Dask and bag collections, I never found a similar or simpler behavior than this. In my attempts at using Dask, the read-only variable shared this way in the global scope ended up being copied by as many workers which needed it, which exploded the memory and made my kernel crash. I have never seen this case handled in any of the Dask documention. The only remotely related reference to this in the Dask documentation is about avoiding global state: https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-global-state but this shows the case of the shared variable being modified the delayed function, which is different from the current issue of just sharing "read-only" data.
dask: shared memory in parallel model, each dask worker process can have any number of threads. it is generally better to load data in the workers rather than pass from the client, even though replicating amongst the processes is fairly efficient. every task introduces some overhead, and may result in intermediates being moved from one machine to the other. Shared Memory¶ The asynchronous scheduler requires an apply_async function and a Queue. These determine the kind of worker and parallelism that we exploit. apply_async functions can be found in the following places: multithreading.Pool().apply_async - uses multiple processes; multithreading.pool.ThreadPool().apply_async - uses multiple threads
I've found a blog post about the (python) Ray framework. Even though Ray's business purpose is very different, they faced the same core requirements: read-only shared-memory dataframes leveraged by many parallel processes. They are describing and explaining why they settled on Apache Arrow and pyarrow. Sounds interesting and we'll give it a try for our use case.
Parallelizing Feature Engineering with Dask, How to scale Featuretools using parallel processing Simply getting a bigger machine — in terms of RAM or cores — will not solve the Once we have the individual feature matrices, we can directly use them for modeling if we are using an A Medium publication sharing concepts, ideas, and codes. The design objective for Dask is really to support parallel data analytics and exploration on data that was too big to keep in memory. Dask was not on our radar when we wrote the drafts for our book, but it certainly worth discussing now. Dask in Action. This is not intended as a full Dask tutorial.
Speeding up your Algorithms Part 4— Dask, Run your Pandas/Numpy/Sklearn/Python code in parallel with Dask Large Models: Data fits in RAM, but training takes too long. While embarassingly parallel, this can also result in repeated work, as earlier stages in the pipeline are refit multiple times on the same parameter + data combinations. In contrast, the dask version hashes all inputs (forming a sort of Merkle DAG), resulting in the intermediate results being shared. Keeping with the pseudocode above, the dask
Easy Distributed Training with Joblib and Dask, Scikit-learn uses joblib for simple parallelism in many places. workloads, threads are better than processes because of shared memory. y_pred is Dask arary. Workers can write the predicted values to a shared file system, without ever having to collect the data on a single machine. Or we can check the models score on the entire large dataset. The computation will be done in parallel, and no single machine will have to hold all the data.
(PDF) Parallel Programming in the Cloud with Python Dask, We broke these down into five models: (1) HPC-style "Single The design objective for Dask is really to support parallel data missing module s3fs in the dask container, but it did work on our massive shared memory VM It will show three different ways of doing this with Dask: dask.delayed. concurrent.Futures. dask.bag. This example focuses on using Dask for building large embarrassingly parallel computation as often seen in scientific communities and on High Performance Computing facilities, for example with Monte Carlo methods.
DASK, Dask is a parallel computing python library that can run across a cluster of machines. Have you ever tried working with a large dataset on a 4GB RAM machine? It starts Machine learning models have multiple hyperparameters and it is not easy to figure out which Thank you very much for sharing this. Shared Memory Model (without threads) In this programming model, processes/tasks share a common address space, which they read and write to asynchronously. Various mechanisms such as locks / semaphores are used to control access to the shared memory, resolve contentions and to prevent race conditions and deadlocks.
- See if joblib can do the job : joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html . Ability to use shared memory efficiently with worker processes for large numpy-based datastructures.
- More details here : github.com/joblib/joblib/blob/master/doc/parallel_numpy.rst
- The question is about multiple machines.
- multiple machine is not the core question. The core question is: share read-only ref-data across multiple processes which process events parallel
- Thumbs up. It addresses my issue and it's easy to understand. Also thanks for your insight in Dask.
- If you ever come accross a similar easy way in Dask, i'll be happy to hear more about it.
- Just short sidenote on ray, if of interest: It requires all code to be fed into the processing from inside a controller node. There is (afaik) no external control option, just in case you might need ray as a remote processing system. Also, it looks like a one-shot solution, clusters are hardly manageable across multiple runs, because ray does not clean its redis storage.
- - we are using SDDs and our problem is CPU bound. Because of GIL, threads are no option - we want to pass the filenames only to the workers. But all workers need the same ref-data. Hence the idea to use share read-only memory thanks for your comments.
- You could access unix shared memory from each worker function, if you know how to do that,
- multiple processes can also share memory, checkout memory map.
- Yes, I know, I said so: it is called shared memory (not memory map) man7.org/linux/man-pages/man7/shm_overview.7.html