Compiling Executable with dask or joblib multiprocessing with cython results in errors
I'm converting some serial processed python jobs to multiprocessing with dask or joblib. Sadly I need to work on windows.
When running from within IPython or from command line invoking the py-file with python everything is running fine.
When compiling an executable with cython, it is no longer running fine: Step by step more and more processes (unlimited and bigger than the number of requested processes) get startet and block my system.
It somehow feels like Multiprocessing Bomb - but of course I used
if __name__=="__main__:" for having the control block - approved by fine running from python call at the command line.
My cython call is
cython --embed --verbose --annotate THECODE.PY and I'm compiling with
gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE resulting in a windows executable
With other (single processing) code that is running fine.
The problem seems to be the same for dask and joblib (what might mean, that dask works like or is based on joblib).
For those interested in a mcve: Just taking the first code from Multiprocessing Bomb and compiling it with my cython commands above will result in an executable blowing your system. (I just tried :-) )
I just found something interesting by adding one line to the code sample for showing the
import multiprocessing def worker(): """worker function""" print('Worker') return print("-->" + __name__ + "<--") if __name__ == '__main__': jobs =  for i in range(5): p = multiprocessing.Process(target=worker) jobs.append(p) p.start()
When running that piece of code with
python it shows
__main__ __mp_main__ __mp_main__ __mp_main__ __mp_main__ __mp_main__
(other output supressed). Explaining that the if decision works. When running the executable after cython and compilation is shows
__main__ __main__ __main__ __main__ __main__ __main__
and more and more. Thus the workers call to the module are no longer
masqueraded like an import and thus each workers tries to start five new ones in a recursive manner.
When starting a new python-process
spawn-method on Windows (this behavior can be also triggered on Linux by using
Command-line arguments are passed to the interpreter in the new process, so the communication with the parent process can be established, for example:
python -c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork
The problem with embeded cython modules (or with frozen (i.e. created with cx_Freeze, py2exe and similar) modules in general), that passing command line arguments to them corresponds more to
python my_script.py <arguments>
i.e. the command line aren't automatically processed by the interpeter, but needs to be handled in the script.
multiprocessing provides a function called
multiprocessing.freeze_support(), which handles the command line arguments correctly and which can be used as shown in Bastian's answer:
if __name__ == '__main__': # needed for Cython, as it doesn't set `frozen`-attribute setattr(sys, 'frozen', True) # parse command line options and execute it if needed multiprocessing.freeze_support()
This solution works however only for Windows, as can be seen in the code:
def freeze_support(self): '''Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit. ''' if sys.platform == 'win32' and getattr(sys, 'frozen', False): from .spawn import freeze_support freeze_support()
There is a bug-report: multiprocessing freeze_support needed outside win32 which might/might not be fixed soon.
As explained in the above bug-report, it is not enough to set
frozen attribute to
True and to call
freeze_support directly from the
multiprocessing.spawn because than the semaphore tracker isn't handled correctly.
There are two options I see: either to patch your installation with a yet unreleased patch from the above bug report or to use the do-it-yourself approach presented bellow.
Here are an earlier version of this answer which is more "experimental" but offers more insights/details and proposes a solution in a somewhat Do-It-Yourself-style.
I'm on linux, so I use
mp.set_start_method('spawn') to simulate the behavior of windows.
What happens in the
spawn-mode? Let's add some
sleeps, so we can investigate the processes:
#bomb.py import multiprocessing as mp import sys import time def worker(): time.sleep(50) print('Worker') return if __name__ == '__main__': print("Starting...") time.sleep(20) mp.set_start_method('spawn') ## use spawn! jobs =  for i in range(5): p = mp.Process(target=worker) jobs.append(p) p.start()
pgrep python we can see that at first there is only one-python process, then 7(!) different
pids. We can see the command-line arguments via
cat /proc/<pid>/cmdline. 5 of the new processes have command line
-c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork
-c "from multiprocessing.semaphore_tracker import main;main(4)"
That means, the parent process starts 6 new python interpreter instances and every newly started interpreter executes a code sent from the parent via the command line options, the information is shared via pipes. One of these 6 python-instances is a tracker, which observes the whole thing.
Ok, what happens if cythonized+embeded? The same as with the normal python, the only difference is that the
bomb-executable is started instead of python. But differently as the python-interpreter, it doesn't execute/isn't aware of the command line arguments, so the
main function runs over and over and over again.
There is an easy fix: let the
bomb-exe to start the python interpreter
... if __name__ == '__main__': mp.set_executable(<PATH TO PYTHON>) ....
bomb is no longer a multiprocessing bomb!
However, the goal is probably not to have a python-interpreter around, so we need to make our program aware of possible command lines:
import re ...... if __name__ == '__main__': if len(sys.argv)==3: # should start in semaphore_tracker mode nr=list(map(int, re.findall(r'\d+',sys.argv))) sys.argv='--multiprocessing-fork' # this canary is needed for multiprocessing module to work from multiprocessing.semaphore_tracker import main;main(nr) elif len(sys.argv)>3: # should start in slave mode fd, pipe=map(int, re.findall(r'\d+',sys.argv)) print("I'm a slave!, fd=%d, pipe=%d"%(fd,pipe)) sys.argv='--multiprocessing-fork' # this canary is needed for multiprocessing module to work from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=fd, pipe_handle=pipe) else: #main mode print("Starting...") mp.set_start_method('spawn') jobs =  for i in range(5): p = mp.Process(target=worker) jobs.append(p) p.start()
Now, our bomb doesn't need a stand-alone python-interpreter and stops after the workers are done. Please note the following:
- The way it is decide, in which mode
bombshould be started is not very error-safe, but I hope you get the gist
--multiprocessing-forkis just a canary, it doesn't do anything it only must be there, see here.
NB: The changed code can be also used with python, because after executing
"from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork python changes the
sys.argv so the code no longer sees the original command line and
python: Compiling Executable with dask or joblib multiprocessing , Compiling Executable with dask or joblib multiprocessing with cython it with my cython commands above will result in an executable blowing your is not very error-safe, but I hope you get the gist; --multiprocessing-fork is� I tried to optimize some of my jobs with cython and compiled them as executables for use in Windows (sorry, I can not use another OS). Every time when using DASK or JOBLIB, I could not run the job, but my executable behaves as a Multiprocessing bomb. At first I found out, that after compilation, the workers could no longer be identified (by
I think based on the detail from the submitted bug report, I can offer the maybe most elegant solution over here
if __name__ == '__main__': if sys.argv[-4:] == '.exe': setattr(sys, 'frozen', True) multiprocessing.freeze_support() YOURMAINROUTINE()
freeze_support()-call is needed on windows - see python multiprocessing documentation.
If running within python only with that line it is already fine.
But somehow cython is obviously not aware of some of those things (the docs tell it is tested with
cx_Freeze). It could be alleviated by the
setattr-call, which only may be used when compiling, thus the decision by file extension.
Joblib with Loky multiprocessing backend unable to use in frozen , best reference so far was Compiling Executable with dask or joblib multiprocessing with cython results in errors, provided solutions didn't work for� Joblib version 0.12 and later are no longer subject to this problem thanks to the use of loky as the new default backend for process-based parallelism. Prior to Python 3.4 the 'multiprocessing' backend of joblib can only use the fork strategy to create worker processes under non-Windows systems. This can cause some third-party libraries to
Inspired by the answer (or the given ideas there) from ead, I found a very simple solution - or lets better call it workaround. For me just changing the if clause to
if __name__ == '__main__': if len(sys.argv) == 1: main() else: sys.argv = sys.argv exec(sys.argv)
The reason why that works is (in my case):
When calling the original .py-file the worker's
__name__ is set to
__mp_main__ (but all processes are just the plain .py-file).
When running the (cython) compiled version the worker's
name is not usable, but the workers get called different and thus we can identify them by more that one argument in argv. In my case worker's argv reads
['MYPROGRAMM.exe', '-c', 'from multiprocessing.spawn import spawn_main; spawn_main(parent_pid=9316, pipe_handle =392)', '--multiprocessing-fork']
argv the code for activation of the workers is found and gets executed with the upper commands.
Of course if you need arguments for your compiled file, you need a bigger effort, maybe parsing for the parent_pid in the call. But in my case, that would simply be overdone.
Spawning-Problem when Cython-Compiling (to executable) DASK , Every time when using DASK or JOBLIB, I could not run the job, but Spawning- Problem when Cython-Compiling (to executable) DASK or JOBLIB multiprocessing While running from python it won't work - it results in many� When using the distributed dask or ipyparallel backends for joblib, nothing is actually run on the worker nodes. I am using a linux HPC cluster, Python3.5, and the latest dask, distributed, joblib available from pip (also tried the latest joblib from git). I am using the development version 0.18.dev0 of scikit-learn. Steps/Code to Reproduce
Since the solutions proposed didn't work for me, I am providing an additional answer with a workaround.
My frozen app also resulted in a multiprocessing bomb. I could solve it by
- using Thread-based parallelism, instead of process-based multiprocessing and
- within Joblib Parallel execution, using
Parallel(n_jobs=4, prefer="threads"), as suggested by this answer (instead of the default
I couldn't get
multiprocessing.Pool to work in the frozen app (neither with
prefer="threads" nor with
prefer="multiprocessing"), but one can switch to thread-based multiprocessing by see docs:
# a dependency with joblib from dep_with_joblib import BigJob # multiprocessing wrapper for threaded.Thread from multiprocessing.dummy import Pool as ThreadPool # instead of # from multiprocessing import Pool # thread based parallelism, # works if `Parallel(n_jobs=4, prefer="threads")` is used # in joblib (e.g. inside big_job()) POOL = ThreadPool(processes=1) # as far as I can tell, # the following Process based Parallelism # does _not_ work with frozen app/joblib atm # POOL = Pool(processes=1) class MainClass(): def __init__(self): """Init ClusterGen""" return @staticmethod def run_big_job(big_job, data): """Run big_job on parallel thread""" big_job() return big_job def big_job_exec(self): """Big job execution""" bigjob = BigJob() big_job_input_data = ... # Start big_job on different thread async_result = POOL.apply_async( MainClass.run_big_job, (bigjob, big_job_input_data)) # get results from clusterer bigjob_results = async_result.get()
More explicit example with
import threading import queue # a dependency with joblib from dep_with_joblib import BigJob job_queue = queue.Queue() def store_in_queue(f): def wrapper(*args): job_queue.put(f(*args)) return wrapper class MainClass(): def __init__(self): """Init ClusterGen""" return @staticmethod @store_in_queue def run_big_job(big_job, data): """Run big_job on parallel thread""" big_job() return big_job def big_job_exec(self): """Big job execution""" bigjob = BigJob() big_job_input_data = ... # Start big_job on different thread t = threading.Thread( target=MainClass.run_big_job, args=(bigjob, big_job_input_data), group=None, name="example-bigjob", ) t.start() # get results from big_job bigjob_results = job_queue.get()
in both of the examples above,
bigjob() is run async on a different thread. The examples can be easily modified with multiple threads.
Why async? In my case
BigJob() is a module from a dependency that uses
Joblib.Parallel to improve speed, which wouldn't work when my app was frozen + I needed
bigjob() to run async to prevent my GUI from crashing.
[PDF] joblib Documentation, error-prone and often lead to unreproducible results. As a result, joblib's persistence is good for resuming an application status The pickling machinery of Parallel multiprocessing queues are In addition, if the dask and distributed Python packages are installed, it is possible to use the 'dask' backend� For reference, a batch of 100 tasks that takes about 1 min with the loky backend of joblib has been hanging on my machine for two days now. The dask board reports 5 tasks, which is n_jobs + 1, so I think re-hitting the same core might be causing a lockdown. The data transfer is minimal (I serialized big datasets to h5 files and just read them
关于python：使用cython的dask或joblib多重处理编译可执行文件会 , Compiling Executable with dask or joblib multiprocessing with cython results in errors我正在将一些串行处理的python作业转换为dask或joblib的� Introduction¶. multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
python, Compiling Executable with dask or joblib multiprocessing with cython results in errors. 发表于 2017-11-16 08:53:47. 活跃于 2017-11-21 10:37:21. 查看790 次. 16.6.1. Introduction¶. multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
joblib.Parallel — joblib 0.17.0.dev0 documentation, “multiprocessing” previous process-based backend based on multiprocessing. is a compiled extension that explicitly releases the GIL (for instance a Cython loop for debugging without changing the codepath; early capture of pickling errors 425 result = _nlargest(n, it) 426 return map(itemgetter(0), result) # undecorate� Scikit-learn already parallelizes many algorithms internally using joblib. You can schedule these to run on a distributed cluster using dask.distributed,
- I think this might be worth reporting as a bug (github.com/cython/cython/issues).
- @DavidW Good option - I'll do that exactly now. THX
- Found a solution that works for me: I had to use
with multiprocessing.get_context("spawn").Pool() as pool: ...in my app to force Loky-backend of the dependency to use
n_jobs=1. No mp-bombs in my frozen app anymore :thumbsup:
- Tried this and, unfortunately, I still get multiple spawns of my frozen command line app using multiprocessing.Pool on Python 3.7 with joblib dependency. I verified, and
setattr(sys, 'frozen', True)is executed (as a bomb, however).
- This works only for windows see bugs.python.org/issue32146
- Your solution will only work for your minimal/similar examples. As soon as data must be shared between the master and the slaves your work around will break down...
- @ead oh sad - you are right. I just tested. Do you know, why? What can I do further? (Okay, it is no longer a process-bomp, but non functional. :-( )
- What is about my solution, which mimics the the right behavior of the python-interpreter? You will need to adjust it for windows (
pipe), but I think that should set up the communication correctly
- @ead your solution works - I just needed to understand it. I'm fine with that - but I think it could be done (working for me also with big worker functions) in a more readable (or understandable) fashion - I'll update my answer.