Apply a method to a list of objects in parallel using multi-processing
I have created a class with a number of methods. One of the methods is very time consuming,
my_process, and I'd like to do that method in parallel. I came across Python Multiprocessing - apply class method to a list of objects but I'm not sure how to apply it to my problem, and what effect it will have on the other methods of my class.
class MyClass(): def __init__(self, input): self.input = input self.result = int def my_process(self, multiply_by, add_to): self.result = self.input * multiply_by self._my_sub_process(add_to) return self.result def _my_sub_process(self, add_to): self.result += add_to list_of_numbers = range(0, 5) list_of_objects = [MyClass(i) for i in list_of_numbers] list_of_results = [obj.my_process(100, 1) for obj in list_of_objects] # multi-process this for-loop print list_of_numbers print list_of_results [0, 1, 2, 3, 4] [1, 101, 201, 301, 401]
I'm going to go against the grain here, and suggest sticking to the simplest thing that could possibly work ;-) That is,
Pool.map()-like functions are ideal for this, but are restricted to passing a single argument. Rather than make heroic efforts to worm around that, simply write a helper function that only needs a single argument: a tuple. Then it's all easy and clear.
Here's a complete program taking that approach, which prints what you want under Python 2, and regardless of OS:
class MyClass(): def __init__(self, input): self.input = input self.result = int def my_process(self, multiply_by, add_to): self.result = self.input * multiply_by self._my_sub_process(add_to) return self.result def _my_sub_process(self, add_to): self.result += add_to import multiprocessing as mp NUM_CORE = 4 # set to the number of cores you want to use def worker(arg): obj, m, a = arg return obj.my_process(m, a) if __name__ == "__main__": list_of_numbers = range(0, 5) list_of_objects = [MyClass(i) for i in list_of_numbers] pool = mp.Pool(NUM_CORE) list_of_results = pool.map(worker, ((obj, 100, 1) for obj in list_of_objects)) pool.close() pool.join() print list_of_numbers print list_of_results
A big of magic
I should note there are many advantages to taking the very simple approach I suggest. Beyond that it "just works" on Pythons 2 and 3, requires no changes to your classes, and is easy to understand, it also plays nice with all of the
However, if you have multiple methods you want to run in parallel, it can get a bit annoying to write a tiny worker function for each. So here's a tiny bit of "magic" to worm around that. Change
worker() like so:
def worker(arg): obj, methname = arg[:2] return getattr(obj, methname)(*arg[2:])
Now a single worker function suffices for any number of methods, with any number of arguments. In your specific case, just change one line to match:
list_of_results = pool.map(worker, ((obj, "my_process", 100, 1) for obj in list_of_objects))
More-or-less obvious generalizations can also cater to methods with keyword arguments. But, in real life, I usually stick to the original suggestion. At some point catering to generalizations does more harm than good. Then again, I like obvious things ;-)
multiprocessing — Process-based parallelism, To select a start method you use the set_start_method() in the if __name__ Context objects have the same API as the multiprocessing module, and allow multiprocessing supports two types of communication channel between processes:. I have created a class with a number of methods. One of the methods is very time consuming, my_process, and I'd like to do that method in parallel. I came across Python Multiprocessing - apply class
Generally the easiest way to run the same calculation in parallel is the
map method of a
multiprocessing.Pool (or the
as_completed function from
concurrent.futures in Python 3).
map method applies a function that only takes one argument to an iterable of data using multiple processes.
So this function cannot be a normal method, because that requires at least two arguments; it must also include
self! It could be a staticmethod, however. See also this answer for a more in-depth explanation.
16.3. multiprocessing — Process-based parallelism, The Pipe() function returns a pair of connection objects connected by a pipe which by However, if you really do need to use some shared data then multiprocessing A manager returned by Manager() will support types list, dict, Namespace, Usually a pool is created using the function multiprocessing.Pool() or the Pool() method of a context object. In both cases context is set appropriately. Note that the methods of the pool object should only be called by the process which created the pool.
If your class is not "huge", I think process oriented is better. Pool in multiprocessing is suggested. This is the tutorial -> https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
Then seperate the
my_process since they are quick and you can wait util the end of the last process.
def my_process(input, multiby): return xxxx def add_to(result,a_list): xxx p = Pool(5) res =  for i in range(10): res.append(p.apply_async(my_process, (i,5))) p.join() # wait for the end of the last process for i in range(10): print res[i].get()
Parallel programming in Python: multiprocessing (part 1) – PDC Blog, To use the multiprocessing module, you need to import it first. A Pool object can be created by passing the desired number of processes to the we want to use the square method to calculate the squares of a list of integers. Parallel processing is a mode of operation where the task is executed simultaneously in multiple processors in the same computer. It is meant to reduce the overall processing time. In this tutorial, you’ll understand the procedure to parallelize any typical logic using python’s multiprocessing module. How many maximum parallel processes can
Based on the answer of Python Multiprocessing - apply class method to a list of objects and your code:
class simulation(multiprocessing.Process): def __init__(self, id, worker, *args, **kwargs): # must call this before anything else multiprocessing.Process.__init__(self) self.id = id self.worker = worker self.args = args self.kwargs = kwargs sys.stdout.write('[%d] created\n' % (self.id))
run what you want in
def run(self): sys.stdout.write('[%d] running ... process id: %s\n' % (self.id, os.getpid())) self.worker.my_process(*self.args, **self.kwargs) sys.stdout.write('[%d] completed\n' % (self.id))
list_of_numbers = range(0, 5) list_of_objects = [MyClass(i) for i in list_of_numbers] list_of_sim = [simulation(id=k, worker=obj, multiply_by=100*k, add_to=10*k) \ for k, obj in enumerate(list_of_objects)] for sim in list_of_sim: sim.start()
Multiprocessing.Pool() - Stuck in a Pickle, from multiprocessing import Pool def sqrt(x): return x**.5 numbers = [i for i we can parallelize the higher-order function map(f, iterable) with 1 line The above iterates over 1 million integers, and in parallel, calculates ndarray_bitarr_ls = list( map(int_to_bitarr_converter.convert, In Python, we use self . The threading.Thread object takes the list_append function as a parameter and then appends it to the jobs list. Finally, the jobs are sequentially started and then sequentially "joined". The join() method blocks the calling thread (i.e. the main Python interpreter thread) until the thread has terminated. This ensures that all of the threads are
If you don't absolutely need to stick with Multiprocessing module then, it can easily achieved using concurrents.futures library
here's the example code:
from concurrent.futures.thread import ThreadPoolExecutor, wait MAX_WORKERS = 20 class MyClass(): def __init__(self, input): self.input = input self.result = int def my_process(self, multiply_by, add_to): self.result = self.input * multiply_by self._my_sub_process(add_to) return self.result def _my_sub_process(self, add_to): self.result += add_to list_of_numbers = range(0, 5) list_of_objects = [MyClass(i) for i in list_of_numbers] With ThreadPoolExecutor(MAX_WORKERS) as executor: for obj in list_of_objects: executor.submit(obj.my_process, 100, 1).add_done_callback(on_finish) def on_finish(future): result = future.result() # do stuff with your result
here executor returns future for every task it submits. keep in mind that if you use
add_done_callback() finished task from thread returns to the main thread (which would block your main thread) if you really want true parallelism then you should wait for future objects separately. here's the code snippet for that.
futures =  with ThreadPoolExecutor(MAX_WORKERS) as executor: for objin list_of_objects: futures.append(executor.submit(obj.my_process, 100, 1)) wait(futures) for succeded, failed in futures: # work with your result here if succeded: print (succeeeded.result()) if failed: print (failed.result())
hope this helps.
Python Multiprocessing Example, In this tutorial we are going to learn Python Multiprocessing with examples. Python introduced multiprocessing module to let us write parallel code. up another Python process, provides it to run code and a way for the parent application to to use the Lock class to block multiple process to access the same queue object. It refers to a function that loads and executes a new child processes. For the child to terminate or to continue executing concurrent computing,then the current process hasto wait using an API, which is similar to threading module. Introduction. When we work with Multiprocessing,at first we create process object. Then it calls a start() method.
Parallel Processing in Python, Using the standard multiprocessing module, we can efficiently parallelize simple tasks To spawn the process, we need to initialize our Process object and invoke Process.start() method. To wait for the task completion, you can use Process.join() . Using Pool.map() you can map the function to the list and passing the Returns a list of id's +1, you can do the same with using a function to whatever you have in the select clause. Update: As suggested from Jon Skeet this is a better version of the snippet of code I just posted: var foo = context.footable.Select(foo => foo.fooID + 1);
Parallel Processing in Python, In parallel processing, there are two types of execution: Synchronous and There are 2 main objects in multiprocessing to implement parallel execution Parallel processing with Pool.apply_async() without callback function Parallel Foreach method in C#. In this article, I am going to discuss the Parallel Foreach Method in C# with some examples. As we already discussed in our previous article that the Task Parallel Library (TPL) provides two methods (i.e. Parallel.For and Parallel.Foreach) which are conceptually the “for” and “for each” loops, except that, they use multiple threads to execute multiple
Multiprocessing vs. Threading in Python: What you need to know., While threading in Python cannot be used for parallel CPU computation, You can use target as the callable object, args to pass parameters to the function, and When another function wants to use a variable, it must wait until that variable is Sometimes this nuance leads to issues. For example, if the current process size in memory is 4GB and the code is using Pool(4) on a four core machine, that 4GB Python process will be pickled and sent to 4 workers. This can increase the memory usage by up to 4GB * 4 workers = 16GB.