Hot questions for Using Neural networks in pyspark

Question:

I'm using Spark 2.0.1 in python, my dataset is in DataFrame, so I'm using the ML (not MLLib) library for machine learning. I have a multilayer perceptron classifier and I have only two labels.

My question is, is it possible to get not only the labels, but also (or only) the probability for that label? Like not just 0 or 1 for every input, but something like 0.95 for 0 and 0.05 for 1. If this is not possible with MLP, but is possible with other classifier, I can change the classifier. I have only used MLP because I know they should be capable of returning the probability, but I can't find it in PySpark.

I have found a similar topic about this, How to get classification probabilities from MultilayerPerceptronClassifier? but they use Java and the solution they suggested doesn't work in python.

Thx


Answer:

Indeed, as of version 2.0, MLP in Spark ML does not seem to provide classification probabilities; nevertheless, there are a number of other classifiers doing so, i.e. Logistic Regression, Naive Bayes, Decision Tree, and Random Forest. Here is a short example with the first and the last one:

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
df = sqlContext.createDataFrame([
     (0.0, Vectors.dense(0.0, 1.0)),
     (1.0, Vectors.dense(1.0, 0.0))], 
     ["label", "features"])
df.show()
# +-----+---------+ 
# |label| features| 
# +-----+---------+ 
# | 0.0 |[0.0,1.0]| 
# | 1.0 |[1.0,0.0]| 
# +-----+---------+

lr = LogisticRegression(maxIter=5, regParam=0.01, labelCol="label")
lr_model = lr.fit(df)

rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="label", seed=42)
rf_model = rf.fit(df)

# test data:
test = sc.parallelize([Row(features=Vectors.dense(0.2, 0.5)),
                       Row(features=Vectors.dense(0.5, 0.2))]).toDF()

lr_result = lr_model.transform(test)
lr_result.show()
# +---------+--------------------+--------------------+----------+
# | features|       rawPrediction|         probability|prediction|
# +---------+--------------------+--------------------+----------+
# |[0.2,0.5]|[0.98941878916476...|[0.72897310704261...|       0.0|
# |[0.5,0.2]|[-0.9894187891647...|[0.27102689295738...|       1.0|  
# +---------+--------------------+--------------------+----------+

rf_result = rf_model.transform(test)
rf_result.show()
# +---------+-------------+--------------------+----------+ 
# | features|rawPrediction|         probability|prediction| 
# +---------+-------------+--------------------+----------+ 
# |[0.2,0.5]|    [1.0,2.0]|[0.33333333333333...|       1.0| 
# |[0.5,0.2]|    [1.0,2.0]|[0.33333333333333...|       1.0| 
# +---------+-------------+--------------------+----------+

For MLlib, see my answer here; for several undocumented & counter-intuitive features of PySpark classification, see my relevant blog post.

Question:

I am trying to make a predictor using Auto Encoder (AE) and Alternating Least Squares (ALS) methods. ALS is created using pyspark.mllib.recommendation package. I am able to save the ALS model and reuse it by model.save() and pyspark.mllib.recommendation.MatrixFactorizationModel.load() methods.

AE is created using torch.nn.Module package and has 4 layers.Unlike ALS for AE saving and loading model is not something I am missing.Can anyone help me to find a way to save and load an auto encoder.


Answer:

The torch module provides save and load methods.

It saves the model as .pth files

torch.save(model,'model.pth')

It also has load method

torch.load(.pth_file)

Question:

Our team is working on a NLP problem. We have a dataset with some labeled sentences and we must classify them into two classes, 0 or 1.

We preprocess the data and use word embeddings so that we have 300 features for each sentence, then we use a simple neural network to train the model.

Since the data are very skewed we measure the model score with the F1-score, computing it both on the train set (80%) and the test set (20%).

Spark

We used the multilayer perceptron classifier featured in PySpark's MLlib:

layers = [300, 600, 2]

trainer = MultilayerPerceptronClassifier(featuresCol='features', labelCol='target',
                                         predictionCol='prediction', maxIter=10, layers=layers,
                                         blockSize=128)
model = trainer.fit(train_df)
result = model.transform(test_df)

predictionAndLabels = result.select("prediction", "target").withColumnRenamed("target", "label")
evaluator = MulticlassClassificationEvaluator(metricName="f1")
f1_score = evaluator.evaluate(predictionAndLabels)

This way we get F1-scores ranging between 0.91 and 0.93.

TensorFlow

We then chose to switch (mainly for learning purpose) to TensorFlow, so we implemented a neural network using the same architecture and formulas of the MLlib's one:

# Network Parameters
n_input = 300
n_hidden_1 = 600
n_classes = 2

# TensorFlow graph input
features = tf.placeholder(tf.float32, shape=(None, n_input), name='inputs')
labels = tf.placeholder(tf.float32, shape=(None, n_classes), name='labels')

# Initializes weights and biases
init_biases_and_weights()

# Layers definition
layer_1 = tf.add(tf.matmul(features, weights['h1']), biases['b1'])
layer_1 = tf.nn.sigmoid(layer_1)

out_layer = tf.matmul(layer_1, weights['out']) + biases['out']
out_layer = tf.nn.softmax(out_layer)

# Optimizer definition
learning_rate_ph = tf.placeholder(tf.float32, shape=(), name='learning_rate')
loss_function = tf.losses.log_loss(labels=labels, predictions=out_layer)
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate_ph).minimize(loss_function)

# Start TensorFlow session
init = tf.global_variables_initializer()
tf_session = tf.InteractiveSession()
tf_session.run(init)

# Train Neural Network
learning_rate = 0.01
iterations = 100
batch_size = 256

total_batch = int(len(y_train) / batch_size)
for epoch in range(iterations):
    avg_cost = 0.0
    for block in range(total_batch):
        batch_x = x_train[block * batch_size:min(block * batch_size + batch_size, len(x_train)), :]
        batch_y = y_train[block * batch_size:min(block * batch_size + batch_size, len(y_train)), :]
        _, c = tf_session.run([optimizer, loss_function], feed_dict={learning_rate_ph: learning_rate,
                                                                     features: batch_x,
                                                                     labels: batch_y})
        avg_cost += c
    avg_cost /= total_batch
    print("Iteration " + str(epoch + 1) + " Logistic-loss=" + str(avg_cost))

# Make predictions
predictions_train = tf_session.run(out_layer, feed_dict={features: x_train, labels: y_train})
predictions_test = tf_session.run(out_layer, feed_dict={features: x_test, labels: y_test})

# Compute F1-score
f1_score = f1_score_tf(y_test, predictions_test)

Support functions:

def initialize_weights_and_biases():
    global weights, biases
    epsilon_1 = sqrt(6) / sqrt(n_input + n_hidden_1)
    epsilon_2 = sqrt(6) / sqrt(n_classes + n_hidden_1)
    weights = {
        'h1': tf.Variable(tf.random_uniform([n_input, n_hidden_1],
                                        minval=0 - epsilon_1, maxval=epsilon_1, dtype=tf.float32)),
        'out': tf.Variable(tf.random_uniform([n_hidden_1, n_classes],
                                         minval=0 - epsilon_2, maxval=epsilon_2, dtype=tf.float32))
    }
    biases = {
        'b1': tf.Variable(tf.constant(1, shape=[n_hidden_1], dtype=tf.float32)),
        'out': tf.Variable(tf.constant(1, shape=[n_classes], dtype=tf.float32))
    }

def f1_score_tf(actual, predicted):
    actual = np.argmax(actual, 1)
    predicted = np.argmax(predicted, 1)

    tp = tf.count_nonzero(predicted * actual)
    fp = tf.count_nonzero(predicted * (actual - 1))
    fn = tf.count_nonzero((predicted - 1) * actual)
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)

    f1 = 2 * precision * recall / (precision + recall)
    return tf.Tensor.eval(f1)

This way we get F1-scores ranging between 0.24 and 0.25.

Question

The only differences that I can see between the two neural networks are:

  • Optimizer: L-BFGS in Spark, Gradient Descent in TensorFlow
  • Weights and biases initialization: Spark makes its own initialization while we initialize them manually in TensorFlow

I don't think that these two parameters can cause a so big difference in performance between the models, but still Spark seems to get very high scores in very few iterations.

I can't understand if TensorFlow is performing very bad or maybe Spark's scores are not truthful. And in both cases I think we aren't seeing something important.


Answer:

Initializing weights as uniform and bias as 1 is certainly not a good idea, and it may very well be the cause of this discrepancy.

Use normal or truncated_normal instead, with the default zero mean and a small variance for the weights:

weights = {
        'h1': tf.Variable(tf.truncated_normal([n_input, n_hidden_1],
                                        stddev=0.01, dtype=tf.float32)),
        'out': tf.Variable(tf.truncated_normal([n_hidden_1, n_classes],
                                         stddev=0.01, dtype=tf.float32))
    }

and zero for the biases:

biases = {
        'b1': tf.Variable(tf.constant(0, shape=[n_hidden_1], dtype=tf.float32)),
        'out': tf.Variable(tf.constant(0, shape=[n_classes], dtype=tf.float32))
    }

That said, I am not sure about the correctness of using the MulticlassClassificationEvaluator for a binary classification problem, and I would suggest doing some further manual checks to confirm that the function indeed returns what you think it returns...