Hot questions for Using Neural networks in multiprocessing

Question:

I'm using Keras with Tensorflow as backend.

I am trying to save a model in my main process and then load/run (i.e. call model.predict) within another process.

I'm currently just trying the naive approach from the docs to save/load the model: https://keras.io/getting-started/faq/#how-can-i-save-a-keras-model. So basically:

  1. model.save() in main process
  2. model = load_model() in child process
  3. model.predict() in child process

However, it simply hangs on the load_model call.

Searching around I've discovered this potentially related answer suggesting that Keras can only be utilized in one process: using multiprocessing with theano but am unsure if this is true (can't seem to find much on this).

Is there a way to accomplish my goal? A high level description or short example is greatly appreciated.

Note: I've attempted approaches along the lines of passing a graph to the process but failed since it seems tensorflow graphs aren't pickable (related SO post for that here: Tensorflow: Passing a session to a python multiprocess). If there is indeed a way to pass the tensorflow graph/model to the child process then I am open to that as well.

Thanks!


Answer:

From my experience - the problem lies in loading Keras to one process and then spawning a new process when the keras has been loaded to your main environment. But for some applications (like e.g. training a mixture of Kerasmodels) it's simply better to have all of this things in one process. So what I advise is the following (a little bit cumbersome - but working for me) approach:

  1. DO NOT LOAD KERAS TO YOUR MAIN ENVIRONMENT. If you want to load Keras / Theano / TensorFlow do it only in the function environment. E.g. don't do this:

    import keras
    
    def training_function(...):
        ...
    

    but do the following:

    def training_function(...):
        import keras
        ...
    
  2. Run work connected with each model in a separate process: I'm usually creating workers which are making the job (like e.g. training, tuning, scoring) and I'm running them in separate processes. What is nice about it that whole memory used by this process is completely freed when your process is done. This helps you with loads of memory problems which you usually come across when you are using multiprocessing or even running multiple models in one process. So this looks e.g. like this:

    def _training_worker(train_params):
        import keras
        model = obtain_model(train_params)
        model.fit(train_params)
        send_message_to_main_process(...)
    
    def train_new_model(train_params):
        training_process = multiprocessing.Process(target=_training_worker, args = train_params)
        training_process.start()
        get_message_from_training_process(...)
        training_process.join()
    

Different approach is simply preparing different scripts for different model actions. But this may cause memory errors especially when your models are memory consuming. NOTE that due to this reason it's better to make your execution strictly sequential.

Question:

I'm currently trying to implement multi-GPU training with the Tensorflow network. One solution for this would be to run one model per GPU, each having their own data batches, and combine their weights after each training iteration. In other words "Data Parallelism".

So for example if I use 2 GPUs, train with them in parallel, and combine their weights afterwards, then shouldn't the resulting weights be different compared to training with those two data batches in sequence on one GPU? Because both GPUs have the same input weights, whereas the single GPU has modified weights for the second batch.

Is this difference just marginal, and therefore not relevant for the end result after many iterations?


Answer:

The order of the batches fed into training makes some difference. But the difference may be small if you have large number of batches. Each batch pulls the variables in the model a bit towards the minimum of the loss. The different order may make the path towards minimum a bit different. But as long as the loss is decreasing, your model is training and its evaluation becomes better and better.

Sometimes, to avoid the same batches "pull" the model together and avoid being too good only for some input data, the input for each model replica would be randomly shuffled before feeding into the training program.

Question:

Due to the limitation of RAM memory, I followed these instructions and built a generator that draw small batch and pass them in the fit_generator of Keras. But Keras can't prepare the queue with the multiprocessing even I inherit the Sequence.

Here is my generator for multiprocessing.

class My_Generator(Sequence):
    def __init__(self, image_filenames, labels, batch_size):
        self.image_filenames, self.labels = image_filenames, labels
        self.batch_size = batch_size

    def __len__(self):
        return np.ceil(len(self.image_filenames) / float(self.batch_size))

    def __getitem__(self, idx):
        batch_x = self.image_filenames[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.labels[idx * self.batch_size:(idx + 1) * self.batch_size]

    return np.array([
        resize(imread(file_name), (200, 200))
           for file_name in batch_x]), np.array(batch_y)

The main function:

batch_size = 100
num_epochs = 10
train_fnames = []
mask_training = []
val_fnames = [] 
mask_validation = []

I would like that the generator read batches in the folders seperatly in different threads by IDs (where IDs look like: {number}.csv for raw images and {number}_label.csv for mask images). I initially built another more elegant class to stock every data in one .h5 file instead of directory. But blocked of the same problem. Thus, if you have a code to do this, I'm taker also.

for dirpath, _, fnames in os.walk('./train/'):
    for fname in fnames:
        if 'label' not in fname:
            training_filenames.append(os.path.abspath(os.path.join(dirpath, fname)))
        else:
            mask_training.append(os.path.abspath(os.path.join(dirpath, fname)))
for dirpath, _, fnames in os.walk('./validation/'):
    for fname in fnames:
        if 'label' not in fname:
            validation_filenames.append(os.path.abspath(os.path.join(dirpath, fname)))
        else:
            mask_validation.append(os.path.abspath(os.path.join(dirpath, fname)))


my_training_batch_generator = My_Generator(training_filenames, mask_training, batch_size)
my_validation_batch_generator = My_Generator(validation_filenames, mask_validation, batch_size)
num_training_samples = len(training_filenames)
num_validation_samples = len(validation_filenames)

Herein, the model is out of scope. I believe that it's not a problem of the model so I won't paste it.

mdl = model.compile(...)
mdl.fit_generator(generator=my_training_batch_generator,
              steps_per_epoch=(num_training_samples // batch_size),
              epochs=num_epochs,
              verbose=1,
              validation_data=None, #my_validation_batch_generator,
              # validation_steps=(num_validation_samples // batch_size),
              use_multiprocessing=True,
              workers=4,
              max_queue_size=2)

The error shows that the class I create is not an Iterator:

Traceback (most recent call last):
File "test.py", line 141, in <module> max_queue_size=2)
File "/anaconda3/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py", line 2177, in fit_generator
initial_epoch=initial_epoch)
File "/anaconda3/lib/python3.6/site-packages/tensorflow/python/keras/engine/training_generator.py", line 147, in fit_generator
generator_output = next(output_generator)
File "/anaconda3/lib/python3.6/site-packages/tensorflow/python/keras/utils/data_utils.py", line 831, in get six.reraise(value.__class__, value, value.__traceback__)
File "/anaconda3/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
TypeError: 'My_Generator' object is not an iterator

Answer:

I was having the same problem, I managed to solve this by defining a __next__ method:

class My_Generator(Sequence):
    def __init__(self, image_filenames, labels, batch_size):
        self.image_filenames, self.labels = image_filenames, labels
        self.batch_size = batch_size
        self.n = 0
        self.max = self.__len__()


    def __len__(self):
        return np.ceil(len(self.image_filenames) / float(self.batch_size))

    def __getitem__(self, idx):
        batch_x = self.image_filenames[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.labels[idx * self.batch_size:(idx + 1) * self.batch_size]

        return np.array([
        resize(imread(file_name), (200, 200))
           for file_name in batch_x]), np.array(batch_y)

    def __next__(self):
        if self.n >= self.max:
           self.n = 0
        result = self.__getitem__(self.n)
        self.n += 1
        return result

note that I have declared two new variables in __init__ function.

Question:

I am trying to implement asynchronous version of deep Q-learning algorithm with Python, which requires a shared neural network among different processes for asynchronous updates. I know that it is pretty difficult to share object itself in Python due to GIL, and I found that it may be possible to simply share its weights using https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Array.

But the problem is this Array object is that it is 1D and does not support reshape() and flatten() operations, which means every time I want to copy local weights to global ones, I have to get all weights, reshape them and convert them to this Array. And when I want to copy weights back, I need to do opposite conversion, which would be quite computationally expensive. I am wondering if there are good ways to directly integrate some shared arrays (does not need to be this Array object) into the weights of neural networks so that every time when I call update() it would modify the global weights directly?

Thanks!


Answer:

The key is to allocate the memory for the numpy array using some kind of shared memory space. The multiprocessing.Array object is actually a really good way of achieving this. Then you can create a view of the Array object using numpy and all the views will share memory. You can do this once in your main process, or have each child process do it once before beginning it's work. I've written an example using the first method. Keep in mind that this is in no way "process safe" so you'll need to use your own locking.

from multiprocessing import Pool, Array
import numpy as np
import ctypes

shape = (10, 2)
_shared_array = Array(ctypes.c_double, np.prod(shape), lock=False)
shared_array = np.frombuffer(_shared_array, dtype='double').reshape(shape)

def target_func(index, value):
    shared_array[index, :] = value

p = Pool(4)
for i in range(10):
    p.apply_async(target_func, args=(i, i**2))

p.close()
p.join()

print shared_array
# [[  0.   0.]
#  [  1.   1.]
#  [  4.   4.]
#  [  9.   9.]
#  [ 16.  16.]
#  [ 25.  25.]
#  [ 36.  36.]
#  [ 49.  49.]
#  [ 64.  64.]
#  [ 81.  81.]]