Hot questions for Using Neural networks in memory leaks

Question:

I'm getting this error

'ValueError: Tensor Tensor("Placeholder:0", shape=(1, 1), dtype=int32) is not an element of this graph.'

The code is running perfectly fine without with tf.Graph(). as_default():. However I need to call M.sample(...) multiple times and each time the memory won't be free after session.close(). Probably there is a memory leak but not sure where is it.

I want to restore a pre-trained neural network, set it as default graph, and testing it multiple times (like 10000) over the default graph without making it larger each time.

The code is:

def SessionOpener(save):
    grph = tf.get_default_graph()
    sess = tf.Session(graph=grph)
    ckpt = tf.train.get_checkpoint_state(save)
    saver = tf.train.import_meta_graph('./predictor/save/model.ckpt.meta')
    if ckpt and ckpt.model_checkpoint_path:
        saver.restore(sess, ckpt.model_checkpoint_path)
        tf.global_variables_initializer().run(session=sess)
    return sess

def LoadPredictor(save):
    with open(os.path.join(save, 'config.pkl'), 'rb') as f:
        saved_args = cPickle.load(f)
    with open(os.path.join(save, 'words_vocab.pkl'), 'rb') as f:
        words, vocab = cPickle.load(f)
    model = Model(saved_args, True)
    return model, words, vocab

if __name__ == '__main__':
    Save = './save'
    M, W, V = LoadPredictor(Save)
    Sess = SessionOpener(Save)
    word = M.sample(Sess, W, V, 1, str(123), 2, 1, 4)
    Sess.close()

And the model is:

class Model():
    def __init__(self, args, infer=False):
        with tf.Graph().as_default():
            self.args = args
            if infer:
                args.batch_size = 1
                args.seq_length = 1

            if args.model == 'rnn':
                cell_fn = rnn.BasicRNNCell
            elif args.model == 'gru':
                cell_fn = rnn.GRUCell
            elif args.model == 'lstm':
                cell_fn = rnn.BasicLSTMCell
            else:
                raise Exception("model type not supported: {}".format(args.model))

            cells = []
            for _ in range(args.num_layers):
                cell = cell_fn(args.rnn_size)
                cells.append(cell)

            self.cell = cell = rnn.MultiRNNCell(cells)

            self.input_data = tf.placeholder(tf.int32, [args.batch_size, args.seq_length])
            self.targets = tf.placeholder(tf.int32, [args.batch_size, args.seq_length])
            self.initial_state = cell.zero_state(args.batch_size, tf.float32)
            self.batch_pointer = tf.Variable(0, name="batch_pointer", trainable=False, dtype=tf.int32)
            self.inc_batch_pointer_op = tf.assign(self.batch_pointer, self.batch_pointer + 1)
            self.epoch_pointer = tf.Variable(0, name="epoch_pointer", trainable=False)
            self.batch_time = tf.Variable(0.0, name="batch_time", trainable=False)
            tf.summary.scalar("time_batch", self.batch_time)

            def variable_summaries(var):
            """Attach a lot of summaries to a Tensor (for TensorBoard visualization)."""
                with tf.name_scope('summaries'):
                    mean = tf.reduce_mean(var)
                    tf.summary.scalar('mean', mean)
                    tf.summary.scalar('max', tf.reduce_max(var))
                    tf.summary.scalar('min', tf.reduce_min(var))


            with tf.variable_scope('rnnlm'):
                softmax_w = tf.get_variable("softmax_w", [args.rnn_size, args.vocab_size])
                variable_summaries(softmax_w)
                softmax_b = tf.get_variable("softmax_b", [args.vocab_size])
                variable_summaries(softmax_b)
                with tf.device("/cpu:0"):
                    embedding = tf.get_variable("embedding", [args.vocab_size, args.rnn_size])
                    inputs = tf.split(tf.nn.embedding_lookup(embedding, self.input_data), args.seq_length, 1)
                    inputs = [tf.squeeze(input_, [1]) for input_ in inputs]

            def loop(prev, _):
                prev = tf.matmul(prev, softmax_w) + softmax_b
                prev_symbol = tf.stop_gradient(tf.argmax(prev, 1))
                return tf.nn.embedding_lookup(embedding, prev_symbol)

            outputs, last_state = legacy_seq2seq.rnn_decoder(inputs, self.initial_state, cell, loop_function=loop if infer else None, scope='rnnlm')
            output = tf.reshape(tf.concat(outputs, 1), [-1, args.rnn_size])
            self.logits = tf.matmul(output, softmax_w) + softmax_b
            self.probs = tf.nn.softmax(self.logits)
            loss = legacy_seq2seq.sequence_loss_by_example([self.logits],
                    [tf.reshape(self.targets, [-1])],
                    [tf.ones([args.batch_size * args.seq_length])],
                    args.vocab_size)
            self.cost = tf.reduce_sum(loss) / args.batch_size / args.seq_length
            tf.summary.scalar("cost", self.cost)
            self.final_state = last_state
            self.lr = tf.Variable(0.0, trainable=False)
            tvars = tf.trainable_variables()
            grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tvars),
                args.grad_clip)
            optimizer = tf.train.AdamOptimizer(self.lr)
            self.train_op = optimizer.apply_gradients(zip(grads, tvars))

    def sample(self, sess, words, vocab, num=200, prime='first all', sampling_type=1, pick=0, width=4):
        def weighted_pick(weights):
            t = np.cumsum(weights)
            s = np.sum(weights)
            return(int(np.searchsorted(t, np.random.rand(1)*s)))

        ret = ''
        if pick == 1:
            state = sess.run(self.cell.zero_state(1, tf.float32))

            if not len(prime) or prime == ' ':
                prime  = random.choice(list(vocab.keys()))
            for word in prime.split()[:-1]:
                x = np.zeros((1, 1))
                x[0, 0] = vocab.get(word,0)
                feed = {self.input_data: x, self.initial_state:state}
                [state] = sess.run([self.final_state], feed)

            ret = prime
            word = prime.split()[-1]
            for n in range(num):
                x = np.zeros((1, 1))
                x[0, 0] = vocab.get(word, 0)
                feed = {self.input_data: x, self.initial_state:state}
                [probs, state] = sess.run([self.probs, self.final_state], feed)
                p = probs[0]

                if sampling_type == 0:
                    sample = np.argmax(p)
                elif sampling_type == 2:
                    if word == '\n':
                        sample = weighted_pick(p)
                    else:
                        sample = np.argmax(p)
                else: # sampling_type == 1 default:
                    sample = weighted_pick(p)

                ret = words[sample]
        return ret

and the output is:

Traceback (most recent call last):
  File "/rcg/software/Linux/Ubuntu/16.04/amd64/TOOLS/TENSORFLOW/1.2.1-GPU-PY352/lib/python3.5/site-packages/tensorflow/python/client/session.py", line 942, in _run
    allow_operation=False)
  File "/rcg/software/Linux/Ubuntu/16.04/amd64/TOOLS/TENSORFLOW/1.2.1-GPU-PY352/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 2584, in as_graph_element
    return self._as_graph_element_locked(obj, allow_tensor, allow_operation)
  File "/rcg/software/Linux/Ubuntu/16.04/amd64/TOOLS/TENSORFLOW/1.2.1-GPU-PY352/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 2663, in _as_graph_element_locked
    raise ValueError("Tensor %s is not an element of this graph." % obj)
ValueError: Tensor Tensor("Placeholder:0", shape=(1, 1), dtype=int32) is not an element of this graph.

Answer:

When you create a Model, the session hasn't been restored yet. All placeholders, variables and ops that are defined in Model.__init__ are placed in a new graph, which makes itself a default graph inside with block. This is the key line:

with tf.Graph().as_default():
  ...

This means that this instance of tf.Graph() equals to tf.get_default_graph() instance inside with block, but not before or after it. From this moment on, there exist two different graphs.

When you later create a session and restore a graph into it, you can't access the previous instance of tf.Graph() in that session. Here's a short example:

with tf.Graph().as_default() as graph:
  var = tf.get_variable("var", shape=[3], initializer=tf.zeros_initializer)

# This works
with tf.Session(graph=graph) as sess:
  sess.run(tf.global_variables_initializer())
  print(sess.run(var))  # ok because `sess.graph == graph`

# This fails
saver = tf.train.import_meta_graph('/tmp/model.ckpt.meta')
with tf.Session() as sess:
  saver.restore(sess, "/tmp/model.ckpt")
  print(sess.run(var))   # var is from `graph`, not `sess.graph`!

The best way to deal with this is give names to all nodes, e.g. 'input', 'target', etc, save the model and then look up the nodes in the restored graph by name, something like this:

saver = tf.train.import_meta_graph('/tmp/model.ckpt.meta')
with tf.Session() as sess:
  saver.restore(sess, "/tmp/model.ckpt")      
  input_data = sess.graph.get_tensor_by_name('input')
  target = sess.graph.get_tensor_by_name('target')

This method guarantees that all nodes will be from the graph in session.

Question:

I was just trying some stuff for a quaternionic neural network when I realized that, even if I close my current Session in a for loop, my program slows down massively and I get a memory leak caused by ops being constructed. This is my code:

for step in xrange(0,200):#num_epochs * train_size // BATCH_SIZE):
338 
339         with tf.Session() as sess:
340 
341             offset = (BATCH_SIZE) % train_size
342             #print "Offset : %d" % offset
343 
344             batch_data = []
345             batch_labels = []
346             batch_data.append(qtrain[0][offset:(offset + BATCH_SIZE)])
347             batch_labels.append(qtrain_labels[0][offset:(offset + BATCH_SIZE)]
352             retour = sess.run(test, feed_dict={x: batch_data})
357 
358             test2 = feedForwardStep(retour, W_to_output,b_output)
367             #sess.close()

The problem seems to come from test2 = feedForward(..). I need to declare these ops after executing retour once, because retour can't be a placeholder (I need to iterate through it). Without this line, the program runs very well, fast and without a memory leak. I can't understand why it seems like TensorFlow is trying to "save" test2 even if I close the session ...


Answer:

TL;DR: Closing a session does not free the tf.Graph data structure in your Python program, and if each iteration of the loop adds nodes to the graph, you'll have a leak.

Since your function feedForwardStep creates new TensorFlow operations, and you call it within the for loop, then there is a leak in your code—albeit a subtle one.

Unless you specify otherwise (using a with tf.Graph().as_default(): block), all TensorFlow operations are added to a global default graph. This means that every call to tf.constant(), tf.matmul(), tf.Variable() etc. adds objects to a global data structure. There are two ways to avoid this:

  1. Structure your program so that you build the graph once, then use tf.placeholder() ops to feed in different values in each iteration. You mention in your question that this might not be possible.

  2. Explicitly create a new graph in each for loop. This might be necessary if the structure of the graph depends on the data available in the current iteration. You would do this as follows:

    for step in xrange(200):
        with tf.Graph().as_default(), tf.Session() as sess:
            # Remainder of loop body goes here.
    

    Note that in this version, you cannot use Tensor or Operation objects from a previous iteration. (For example, it's not clear from your code snippet where test comes from.)

Question:

I have the following loop:

for (int i = 1; i <= epochs; ++i) {
    for (std::vector<std::filesystem::path>::iterator it = batchFiles.begin(); it != batchFiles.end(); ++it) {
        struct fann_train_data *data = fann_read_train_from_file(it->string().c_str());
        fann_shuffle_train_data(data);
        float error = fann_train_epoch(ann, data);
    }
}

ann is the network. batchFiles is a std::vector<std::filesystem::path>.

This code iterates through all the training data files in a folder and uses it to train the ANN each time, as many times as determined by the epochs variable.

The following line causes a memory leak:

struct fann_train_data *data = fann_read_train_from_file(it->string().c_str());

The problem is that I must constantly switch between the training files, as I don't have enough memory to load them all at once, otherwise I would have loaded the training data just once.

Why does this happen? How can I resolve this?


Answer:

In C++, memory is automatically freed when the object managing it goes out of scope. (Assuming the class was properly written.) That's called RAII.

But FANN presents a C API, not a C++ API. In C, you need to manually free memory when you're done with it. By extension, when a C library creates an object for you, it typically needs you to tell it when you're done with the object. The library doesn't have a good way to figure out on its own when the object's resources should be freed.

The convention is that whenever a C API gives you a function like struct foo* create_foo(), you should be looking for a corresponding function like void free_foo(struct foo* f). It's symmetrical.

In your case, as originally noted by PaulMcKenzie, you need void fann_destroy_train_data(struct fann_train_data * train_data). From the documentation, emphasis mine:

Destructs the training data and properly deallocates all of the associated data. Be sure to call this function after finished using the training data.