How to speedup my tensorflow execution on hadoop?

tensorflow dataset tutorial
tensorflow dataset batch
how to load your data in tensorflow
tensorflow data pipeline
prefetch_to_device
tensorflow distributed
how does tensorflow parallelize
tf.data.dataset.list_files example

The following script executes very slow. I just want to count the total number of lines in the twitter-follwer-graph (textfile with ~26 GB).

I need to perform a machine learning task. This is just a test on accessing data from the hdfs by tensorflow.

import tensorflow as tf
import time

filename_queue = tf.train.string_input_producer(["hdfs://default/twitter/twitter_rv.net"], num_epochs=1, shuffle=False)

def read_filename_queue(filename_queue):
    reader = tf.TextLineReader()
    _, line = reader.read(filename_queue)
    return line

line = read_filename_queue(filename_queue)

session_conf = tf.ConfigProto(intra_op_parallelism_threads=1500,inter_op_parallelism_threads=1500)

with tf.Session(config=session_conf) as sess:
    sess.run(tf.initialize_local_variables())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    start = time.time()
    i = 0
    while True:
        i = i + 1
        if i%100000 == 0:
            print(i)
            print(time.time() - start)

        try:
            sess.run([line])
        except tf.errors.OutOfRangeError:
            print('end of file')
            break
    print('total number of lines = ' + str(i))
    print(time.time() - start)

The process needs about 40 secs for the first 100000 lines. I tried to set intra_op_parallelism_threads and inter_op_parallelism_threads to 0, 4, 8, 40, 400 and 1500. But it didn't effect the execution time significantly ...

Can you help me?


system specs:

  • 16 GB RAM
  • 4 CPU cores

You can split the big file into smaller ones, it may help. And set intra_op_parallelism_threads and inter_op_parallelism_threads to 0;

For many systems, reading a single raw text file with multi-processes is not easy, tensorflow read one file only with one thread, so adjusting tensorflow threads won't help. Spark can process file with multi-threads for it divide the file in blocks and every thread reading the content in lines of it's block and ignoring the characters before first \n for they belongs to last line of last block. For batch data processing, Spark is a better choice while tensorflow is better for machine learning/deep learning task;

Better performance with the tf.data API, Under the hood, this is how your execution time was spent: Naive In a real-​world setting, the input data may be stored remotely (for example, GCS or HDFS). Many of these use cases are built on TensorFlow, a popular deep learning framework written by Google. In the beginning, our internal TensorFlow users ran the framework on small and unmanaged “bare metal” clusters. But we quickly realized the need to connect TensorFlow to the massive compute and storage power of our Hadoop-based big data platform. With hundreds of petabytes of data stored on our Hadoop clusters that could be leveraged for deep learning, we needed a scalable way to process


https://github.com/linkedin/TonY

With TonY, you can submit a TensorFlow job and specify number of workers and whether they require CPUs or GPUs.

We were able to get almost-linear speedup when running on multiple servers with TonY (Inception v3 model):

Below is an example of how to use it from the README:

In the tony directory there’s also a tony.xml which contains all of your TonY job configurations. For example:

$ cat tony/tony.xml
<configuration>
  <property>
    <name>tony.worker.instances</name>
    <value>4</value>
  </property>
  <property>
    <name>tony.worker.memory</name>
    <value>4g</value>
  </property>
  <property>
    <name>tony.worker.gpus</name>
    <value>1</value>
  </property>
  <property>
    <name>tony.ps.memory</name>
    <value>3g</value>
  </property>
</configuration>

For a full list of configurations, please see the wiki.

Model code
$ ls src/models/ | grep mnist_distributed
  mnist_distributed.py

Then you can launch your job:

$ java -cp "`hadoop classpath --glob`:tony/*:tony" \
            com.linkedin.tony.cli.ClusterSubmitter \
            -executes src/models/mnist_distributed.py \
            -task_params '--input_dir /path/to/hdfs/input --output_dir /path/to/hdfs/output --steps 2500 --batch_size 64' \
            -python_venv my-venv.zip \
            -python_binary_path Python/bin/python \
            -src_dir src \
            -shell_env LD_LIBRARY_PATH=/usr/java/latest/jre/lib/amd64/server

The command line arguments are as follows: * executes describes the location to the entry point of your training code. * task_params describe the command line arguments which will be passed to your entry point. * python_venv describes the name of the zip locally which will invoke your python script. * python_binary_path describes the relative path in your python virtual environment which contains the python binary, or an absolute path to use a python binary already installed on all worker nodes. * src_dir specifies the name of the root directory locally which contains all of your python model source code. This directory will be copied to all worker nodes. * shell_env specifies key-value pairs for environment variables which will be set in your python worker/ps processes.

Accelerate your training and inference running on Tensorflow, Building it from the source itself might speed up your Tensorflow When a TensorFlow program is run, all of the operations are executed  LinkedIn open-sources a tool to run TensorFlow on Hadoop The Tony project uses Hadoop's native scheduler to run TensorFlow jobs, making fault tolerance and GPU usage easier


I am also a beginner working with tensorflow but since you were asking for answers drawing from credible and/or official sources, here is what I found and might help:

  1. Build and install from source
  2. Utilize queues for reading data
  3. Preprocessing on the CPU
  4. Use NCHW image data format
  5. Place shared parameters on the GPU
  6. Use fused batch norm

Note: The points listed above are explained in greater detail here in the tensorflow performance guide

Another thing you might want to look into is quantization:

Which can explain how to use quantization to reduce model size, both in storage and at runtime. Quantization can improve performance, especially on mobile hardware.

Open Sourcing TonY: Native Support of TensorFlow on Hadoop , Open Sourcing TonY: Native Support of TensorFlow on Hadoop For example, your parameter servers and workers likely have different memory requirements. with one GPU per worker (also one execution using CPU training on eight We also see about four times speedup when running GPU training  In TensorFlow 2, eager execution is turned on by default. The user interface is intuitive and flexible (running one-off operations is much easier and faster), but this can come at the expense of performance and deployability. To get performant and portable models, use tf.function to make graphs out of your programs


I've bypassed this performance problem by using spark instead.

How to ensure best performance for your Hadoop Cluster, Every Hadoop MapReduce job collects information about various input records read, number of records pipelined for further execution, number  Make sure to checkout tensorflow/models repo on the tag that match your tensorflow version. E.g. if you install tensorflow 1.13 then use models at tag v1.13.0. If you checkout the master branch you might experience problem that the code does not run on GPU like I did.


Try this and it should improve your timing:

session_conf = tf.ConfigProto   
(intra_op_parallelism_threads=0,inter_op_parallelism_threads=0)

It is not good to take the Config in your own hands when you do not know what is an optimum value.

Prepare TensorFlow training data by using TFRecord and HDFS , Click the Logs tab and view the execution logs, which indicates the TFRecord files are stored in the HDFS. You can log on to the E-MapReduce  Hadoop MapReduce Combiner performs local aggregation on the mappers’ output, which helps to minimize the data transfer between mapper and reducer (we will see reducer below). Once the combiner functionality is executed, the output is then passed to the partitioner for further work.


You Can Blend Apache Spark And Tensorflow To Build Potential , Runs Everywhere: Spark runs on Hadoop, Apache Mesos, If you need more flexibility, eager execution allows for immediate Whether it's on servers, edge devices, or the web, TensorFlow lets you train and deploy your model easily, which translates into a 7x speedupcompared to training the models  the logic of the my method is that: 1.change the np_array to np_array what we want 2.change the np_array to what we want 3.deal with this np_array. Using loops to process np elements is a time-consuming task. My code avoids using loops, so it's very fast.


Distributed TensorFlow – O'Reilly, This reduces the time required to find good hyperparameters for your neural network. Although parallelism has the potential for greatly speeding up training​, (2) a map_fun that names the TensorFlow function to be executed at each Spark input queue from a distributed filesystem, such as HDFS. It can accelerate TensorFlow models with no changes in the source code. When a TensorFlow program is run, all of the operations are executed individually by the TensorFlow executor. Each TensorFlow operation has a pre-compiled GPU kernel implementation that the executor dispatches to.


TensorFlow to Hadoop By Way of Datameer, Companies that want to use TensorFlow to execute deep learning models on big data stored in Hadoop may want to check out the new SmartAI offering unveiled. “It can operationalize those insights, directly on top of your data lake, and “​This will dramatically speed up their cycles in terms of producing  These are multi-linear maps which can be anything from vector spaces to the real numbers. So a tensor can be a scalar or vector or matrix. TensorFlow programs are usually structured into a construction phase, that assembles a graph, and an execution phase that uses a session to execute.