Recommendation for subset of users using Pyspark mllib ALS/MatrixFactorizationModel

pyspark.ml.recommendation als
pyspark ml recommendation als model
recommendforusersubset
pyspark recommendation
spark als recommend products for users
pyspark als ml
alternating least squares
als.trainimplicit pyspark

I have built a model using the below code:

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

model1 = ALS.train(ratings=ratingsR, rank=model_params['rank'], \
                   iterations=model_params['iterations'], lambda_=model_params['lambda'], \
                   blocks=model_params['blocks'], nonnegative=model_params['nonnegative'], \
                   seed=model_params['seed'])

Now I want to predict campaigns for all users (or a subset of users) using distributed environment provided by spark.

I tried recommendProductsForUsers which is taking ages to get me 32M Users X 4000 Products.

preds = model1.recommendProductsForUsers(num=4000)

I really don't need recommendations for all 32M users. I am fine with 100k-200k users as well.

So to modify my process, I tried the spark udf way to process for each user one by one, but using distribution mechanism of spark cluster:

import pyspark.sql.functions as F
def udf_preds(sameModel):
    return F.udf(lambda x: get_predictions(x, sameModel))

def get_predictions(x, sameModel):
    preds = sameModel.recommendProducts(user=x, num=4000) # per user it takes around 4s
    return preds

test.withColumn('predictions', udf_preds(model1)(F.col('user_id')))

Test contains around 200,000 users. The above fails with the following error:

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

How to perform the recommendations for a subset of users better?

(EDIT)

From @piscall's response. I tried to do the same using an RDD.

preds_rdd = test.rdd.map(lambda x: (x.user_id, sameModel.recommendProducts(x.user_id, 4000)))
preds_rdd.take(2)
 File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 330, in __getnewargs__
    "It appears that you are attempting to reference SparkContext from a broadcast "
 Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

 PicklingErrorTraceback (most recent call last)
<ipython-input-17-e114800a26e7> in <module>()
----> 1 preds_rdd.take(2)

 /usr/hdp/current/spark2-client/python/pyspark/rdd.py in take(self, num)
   1356 
   1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
   1359 
   1360             items += res

 /usr/hdp/current/spark2-client/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
   1038         # SparkContext#runJob.
   1039         mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1040         sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1041         return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
   1042 

 /usr/hdp/current/spark2-client/python/pyspark/rdd.py in _jrdd(self)
   2470 
   2471         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2472                                       self._jrdd_deserializer, profiler)
   2473         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
   2474                                              self.preservesPartitioning)

 /usr/hdp/current/spark2-client/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
   2403     assert serializer, "serializer should not be empty"
   2404     command = (func, profiler, deserializer, serializer)
-> 2405     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
   2406     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
   2407                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

 /usr/hdp/current/spark2-client/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
   2389     # the serialized command will be compressed by broadcast
   2390     ser = CloudPickleSerializer()
-> 2391     pickled_command = ser.dumps(command)
   2392     if len(pickled_command) > (1 << 20):  # 1M
   2393         # The broadcast will have same life cycle as created PythonRDD

 /usr/hdp/current/spark2-client/python/pyspark/serializers.py in dumps(self, obj)
    573 
    574     def dumps(self, obj):
--> 575         return cloudpickle.dumps(obj, 2)
    576 
    577 

 /usr/hdp/current/spark2-client/python/pyspark/cloudpickle.py in dumps(obj, protocol)
    916 
    917     cp = CloudPickler(file,protocol)
--> 918     cp.dump(obj)
    919 
    920     return file.getvalue()

/u
I have built a model using the below code:

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
sr/hdp/current/spark2-client/python/pyspark/cloudpickle.py in dump(self, obj)
    247                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    248             print_exec(sys.stderr)
--> 249             raise pickle.PicklingError(msg)
    250 
    251 

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

what I would do is to use predictAll method. Assume that df_products is a dataframe containing all 4,000 products and df_users a dataframe with the 100-200K selected users, then do a crossJoin to find all combination of two data sets to form the testdata, then use predictAll which will yield Rating objects of selected users over the 4000 products:

from pyspark.sql.functions import broadcast
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

testdata = broadcast(df_products).crossJoin(df_users).select('user', 'product').rdd.map(tuple)

model.predictAll(testdata).collect()

Use the example from the documentation which has 4 products(1,2,3,4) and 4 users (1,2,3,4):

df_products.collect()                                                                                              
# [Row(product=1), Row(product=3), Row(product=2), Row(product=4)]

# a subset of all users:
df_users.collect()                                                                                                 
# [Row(user=1), Row(user=3)]

testdata.collect()                                                                                                 
# [(1, 1), (1, 3), (1, 2), (1, 4), (3, 1), (3, 3), (3, 2), (3, 4)]

model.predictAll(testdata).collect()
#[Rating(user=1, product=4, rating=0.9999459747142155),
# Rating(user=3, product=4, rating=4.99555263974573),
# Rating(user=1, product=1, rating=4.996821463543848),
# Rating(user=3, product=1, rating=1.000199620693615),
# Rating(user=1, product=3, rating=4.996821463543848),
# Rating(user=3, product=3, rating=1.000199620693615),
# Rating(user=1, product=2, rating=0.9999459747142155),
# Rating(user=3, product=2, rating=4.99555263974573)]

Note: you might want to screen out users which are not in the existing model before creating testdata and process them separately.

pyspark.mllib.recommendation — PySpark 2.1.1 documentation, Source code for pyspark.mllib.recommendation 2.0 # (the "License"); you may not use this file except in compliance with # the License. from pyspark.sql import DataFrame __all__ = ['MatrixFactorizationModel', 'ALS', 'Rating'] Train a matrix factorization model given an RDD of ratings by users for a subset of products. Source code for pyspark.mllib.recommendation ['MatrixFactorizationModel', 'ALS model given an RDD of 'implicit preferences' of users for a subset of

You can try the following:

"""
Convert the test df to rdd, and then map a function that returns 
(user_id, array(Rating(user, product, rating)))
"""
preds_rdd = test.rdd.map(lambda x: (x.user_id, sameModel.recommendProducts(x.user_id, 4000)))

# Convert (user_id, array(Rating(user, product, rating))) to 
# (user_id, array(product_names))
preds_rdd2 = preds_rdd.map(lambda row: row[0], [x.product for x in row[1])
# Convert above RDD to DF with user_id and predicted_products columns
preds_df = preds_rdd2.toDF(["user_id", "predicted_products"])

I have not tested but it's straight from the documentation over here:

You can then choose to join it back to the original dataframe, or retain columns.

You can then explode the array of products into different rows with explode() if you need.

Collaborative Filtering - RDD-based API, Run the ALS algorithm to build/train a user product matrix model. import org. apache.spark.mllib.recommendation.{ALS,. 10. MatrixFactorizationModel, Rating} . � a subset of the data to build the model and then verifying the model with the � Train a matrix factorization model given an RDD of ratings given by users to some products, in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, we run a given number of iterations of ALS.

what about preds = sameModel.predict(x), I don't know spark and even scala, but that is how we do in python and i guess it's same for spark. If you want to predict with sub samples of x then you can do something like: preds = sameModel.predict(x[0:200,::]) or vise verse for the columns.

Building a Recommendation Engine with Spark, We will work with 10 million ratings from 72,000 users on 10,000 movies, collected We will use MLlib's ALS to train a MatrixFactorizationModel , which takes a we split the data into three non-overlapping subsets, named training, test, and� Building Recommendation Engine with PySpark. Build the recommendation model using ALS on the training data. Selecting a subset of a larger set of features.

Movie Recommendation with MLlib, Spark MLlib provides a collaborative filtering algorithm that can be used for training a matrix factorization model, which predicts explicit or implicit ratings of users on it distributes the matrix factorization model training by using " Alternating Least from pyspark.ml.recommendation import ALS import pyspark. sql.functions as� # Build the recommendation model using Alternating Least Squares based on implicit ratings model = ALS. trainImplicit (ratings, rank, numIterations, alpha = 0.01) In order to run the above application, follow the instructions provided in the Self-Contained Applications section of the Spark Quick Start guide.

Spark Collaborative Filtering (ALS) Deep Dive, Personalized Recommendation with Matrix Factorization. A user-item matrix can be constructed as follows: Given n users and m items, an n pyspark.mllib. recommendation import ALS, MatrixFactorizationModel, Rating from allowing the best parameter learned from a sampled subset to be applied to� We will use MLlib’s ALS to train a MatrixFactorizationModel, which takes a RDD[Rating] object as input in Scala and RDD[(user, product, rating)] in Python. ALS has training parameters such as rank for matrix factors and regularization constants.

Hyperparameter Tuning The Alternating Least-Squares Algorithm for , Here is the basic spark code: from pyspark.mllib.recommendation import ALS, It seems that you shouldn't use transform but the recommendForUserSubset to use ALS method to construct a matrix factorization model to obtain user latent� ALS typically converges to a reasonable solution in 20 iterations or less. lambda specifies the regularization parameter in ALS. implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.

Comments
  • Can you show the entire stack-trace?
  • The stack does not show anything but lines of places of error, and test.withColumn('predictions', udf_preds(sameModel)(F.col('user_id'))) here it goes pickling error. From what i read, this is because sameModel (model object has SC with it) so that cannot be serialised.
  • Please post get_predictions function
  • Do you mean: def get_predictions(x, sameModel): preds = sameModel.recommendProducts(user=x, num=4000) # per user it takes around 4s return preds
  • Will need more information I guess. What object is sameModel? Is it the ALS instance? I don't know if you can pass an ML model into a UDF but I can't see a broadcast variable either. What do you think of taking an RDD approach? I have something in mind that I think can help
  • Hi @jxc, So ultimately, I did this itself. Created an rdd of user (in validation set) with every campaign and used predictAll method. However, it is extremely slow. :( But at least something to solve the issue.
  • @AnkitaMehta understood! as the testdata will contain from 400-800m records, that's a very big list. but if you want to do it in a fixed user list and a fixed product list, I think there is probably not better way to handle this task. :( IMO, for general recommendation systems, top-100 should be enough for most of the tasks. 4000 is just too big.
  • So the issue remains the same. sameModel or model object cannot be serialized. Adding full stack trace in answer.
  • @AnkitaMehta can you try if it is working correctly for one user?
  • so for one user it is sameModel.recommendProducts(singleUser, 4000) works like a charm. The reason is sameModel is not sent for operation in distribution. I guess you are seeing this incorrect.
  • @AnkitaMehta No I'm understanding what's happening. Will try to work on it with samples.
  • The same is done via sameModel.recommendProducts(user=x, num=4000). Ordered recommendation and its predicted rating is what we are looking for. And spark data frame cannot be used on splits (like lists) as it is not sequential, but distributed.