Hot questions for Using Neural networks in pyspark


I'm using Spark 2.0.1 in python, my dataset is in DataFrame, so I'm using the ML (not MLLib) library for machine learning. I have a multilayer perceptron classifier and I have only two labels.

My question is, is it possible to get not only the labels, but also (or only) the probability for that label? Like not just 0 or 1 for every input, but something like 0.95 for 0 and 0.05 for 1. If this is not possible with MLP, but is possible with other classifier, I can change the classifier. I have only used MLP because I know they should be capable of returning the probability, but I can't find it in PySpark.

I have found a similar topic about this, How to get classification probabilities from MultilayerPerceptronClassifier? but they use Java and the solution they suggested doesn't work in python.



Indeed, as of version 2.0, MLP in Spark ML does not seem to provide classification probabilities; nevertheless, there are a number of other classifiers doing so, i.e. Logistic Regression, Naive Bayes, Decision Tree, and Random Forest. Here is a short example with the first and the last one:

from import LogisticRegression, RandomForestClassifier
from import Vectors
from pyspark.sql import Row
df = sqlContext.createDataFrame([
     (0.0, Vectors.dense(0.0, 1.0)),
     (1.0, Vectors.dense(1.0, 0.0))], 
     ["label", "features"])
# +-----+---------+ 
# |label| features| 
# +-----+---------+ 
# | 0.0 |[0.0,1.0]| 
# | 1.0 |[1.0,0.0]| 
# +-----+---------+

lr = LogisticRegression(maxIter=5, regParam=0.01, labelCol="label")
lr_model =

rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="label", seed=42)
rf_model =

# test data:
test = sc.parallelize([Row(features=Vectors.dense(0.2, 0.5)),
                       Row(features=Vectors.dense(0.5, 0.2))]).toDF()

lr_result = lr_model.transform(test)
# +---------+--------------------+--------------------+----------+
# | features|       rawPrediction|         probability|prediction|
# +---------+--------------------+--------------------+----------+
# |[0.2,0.5]|[0.98941878916476...|[0.72897310704261...|       0.0|
# |[0.5,0.2]|[-0.9894187891647...|[0.27102689295738...|       1.0|  
# +---------+--------------------+--------------------+----------+

rf_result = rf_model.transform(test)
# +---------+-------------+--------------------+----------+ 
# | features|rawPrediction|         probability|prediction| 
# +---------+-------------+--------------------+----------+ 
# |[0.2,0.5]|    [1.0,2.0]|[0.33333333333333...|       1.0| 
# |[0.5,0.2]|    [1.0,2.0]|[0.33333333333333...|       1.0| 
# +---------+-------------+--------------------+----------+

For MLlib, see my answer here; for several undocumented & counter-intuitive features of PySpark classification, see my relevant blog post.


I am trying to make a predictor using Auto Encoder (AE) and Alternating Least Squares (ALS) methods. ALS is created using pyspark.mllib.recommendation package. I am able to save the ALS model and reuse it by and pyspark.mllib.recommendation.MatrixFactorizationModel.load() methods.

AE is created using torch.nn.Module package and has 4 layers.Unlike ALS for AE saving and loading model is not something I am missing.Can anyone help me to find a way to save and load an auto encoder.


The torch module provides save and load methods.

It saves the model as .pth files,'model.pth')

It also has load method



Our team is working on a NLP problem. We have a dataset with some labeled sentences and we must classify them into two classes, 0 or 1.

We preprocess the data and use word embeddings so that we have 300 features for each sentence, then we use a simple neural network to train the model.

Since the data are very skewed we measure the model score with the F1-score, computing it both on the train set (80%) and the test set (20%).


We used the multilayer perceptron classifier featured in PySpark's MLlib:

layers = [300, 600, 2]

trainer = MultilayerPerceptronClassifier(featuresCol='features', labelCol='target',
                                         predictionCol='prediction', maxIter=10, layers=layers,
model =
result = model.transform(test_df)

predictionAndLabels ="prediction", "target").withColumnRenamed("target", "label")
evaluator = MulticlassClassificationEvaluator(metricName="f1")
f1_score = evaluator.evaluate(predictionAndLabels)

This way we get F1-scores ranging between 0.91 and 0.93.


We then chose to switch (mainly for learning purpose) to TensorFlow, so we implemented a neural network using the same architecture and formulas of the MLlib's one:

# Network Parameters
n_input = 300
n_hidden_1 = 600
n_classes = 2

# TensorFlow graph input
features = tf.placeholder(tf.float32, shape=(None, n_input), name='inputs')
labels = tf.placeholder(tf.float32, shape=(None, n_classes), name='labels')

# Initializes weights and biases

# Layers definition
layer_1 = tf.add(tf.matmul(features, weights['h1']), biases['b1'])
layer_1 = tf.nn.sigmoid(layer_1)

out_layer = tf.matmul(layer_1, weights['out']) + biases['out']
out_layer = tf.nn.softmax(out_layer)

# Optimizer definition
learning_rate_ph = tf.placeholder(tf.float32, shape=(), name='learning_rate')
loss_function = tf.losses.log_loss(labels=labels, predictions=out_layer)
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate_ph).minimize(loss_function)

# Start TensorFlow session
init = tf.global_variables_initializer()
tf_session = tf.InteractiveSession()

# Train Neural Network
learning_rate = 0.01
iterations = 100
batch_size = 256

total_batch = int(len(y_train) / batch_size)
for epoch in range(iterations):
    avg_cost = 0.0
    for block in range(total_batch):
        batch_x = x_train[block * batch_size:min(block * batch_size + batch_size, len(x_train)), :]
        batch_y = y_train[block * batch_size:min(block * batch_size + batch_size, len(y_train)), :]
        _, c =[optimizer, loss_function], feed_dict={learning_rate_ph: learning_rate,
                                                                     features: batch_x,
                                                                     labels: batch_y})
        avg_cost += c
    avg_cost /= total_batch
    print("Iteration " + str(epoch + 1) + " Logistic-loss=" + str(avg_cost))

# Make predictions
predictions_train =, feed_dict={features: x_train, labels: y_train})
predictions_test =, feed_dict={features: x_test, labels: y_test})

# Compute F1-score
f1_score = f1_score_tf(y_test, predictions_test)

Support functions:

def initialize_weights_and_biases():
    global weights, biases
    epsilon_1 = sqrt(6) / sqrt(n_input + n_hidden_1)
    epsilon_2 = sqrt(6) / sqrt(n_classes + n_hidden_1)
    weights = {
        'h1': tf.Variable(tf.random_uniform([n_input, n_hidden_1],
                                        minval=0 - epsilon_1, maxval=epsilon_1, dtype=tf.float32)),
        'out': tf.Variable(tf.random_uniform([n_hidden_1, n_classes],
                                         minval=0 - epsilon_2, maxval=epsilon_2, dtype=tf.float32))
    biases = {
        'b1': tf.Variable(tf.constant(1, shape=[n_hidden_1], dtype=tf.float32)),
        'out': tf.Variable(tf.constant(1, shape=[n_classes], dtype=tf.float32))

def f1_score_tf(actual, predicted):
    actual = np.argmax(actual, 1)
    predicted = np.argmax(predicted, 1)

    tp = tf.count_nonzero(predicted * actual)
    fp = tf.count_nonzero(predicted * (actual - 1))
    fn = tf.count_nonzero((predicted - 1) * actual)
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)

    f1 = 2 * precision * recall / (precision + recall)
    return tf.Tensor.eval(f1)

This way we get F1-scores ranging between 0.24 and 0.25.


The only differences that I can see between the two neural networks are:

  • Optimizer: L-BFGS in Spark, Gradient Descent in TensorFlow
  • Weights and biases initialization: Spark makes its own initialization while we initialize them manually in TensorFlow

I don't think that these two parameters can cause a so big difference in performance between the models, but still Spark seems to get very high scores in very few iterations.

I can't understand if TensorFlow is performing very bad or maybe Spark's scores are not truthful. And in both cases I think we aren't seeing something important.


Initializing weights as uniform and bias as 1 is certainly not a good idea, and it may very well be the cause of this discrepancy.

Use normal or truncated_normal instead, with the default zero mean and a small variance for the weights:

weights = {
        'h1': tf.Variable(tf.truncated_normal([n_input, n_hidden_1],
                                        stddev=0.01, dtype=tf.float32)),
        'out': tf.Variable(tf.truncated_normal([n_hidden_1, n_classes],
                                         stddev=0.01, dtype=tf.float32))

and zero for the biases:

biases = {
        'b1': tf.Variable(tf.constant(0, shape=[n_hidden_1], dtype=tf.float32)),
        'out': tf.Variable(tf.constant(0, shape=[n_classes], dtype=tf.float32))

That said, I am not sure about the correctness of using the MulticlassClassificationEvaluator for a binary classification problem, and I would suggest doing some further manual checks to confirm that the function indeed returns what you think it returns...