Add PySpark RDD as new column to pyspark.sql.dataframe

I have a pyspark.sql.dataframe where each row is a news article. I then have a RDD that represents the words contained in each article. I want to add the RDD of words as a column named 'words' to my dataframe of new articles. I tried

df.withColumn('words', words_rdd )

but I get the error

AssertionError: col should be Column

The DataFrame looks something like this

Articles
the cat and dog ran
we went to the park
today it will rain

but I have 3k news articles.

I applied a function to clean the text such as remove stop words and I have a RDD that looks like this:

[[cat, dog, ran],[we, went, park],[today, will, rain]]

I'm trying to get my Dataframe to look like this:

Articles                 Words
the cat and dog ran      [cat, dog, ran]
we went to the park      [we, went, park]
today it will rain       [today, will, rain]

How do I add a new column to a Spark DataFrame , You cannot add an arbitrary column to a DataFrame in Spark. New columns DataFrame?) from pyspark.sql.functions import lit df = sqlContext. If you want to add content of an arbitrary RDD as a column you can. add row  I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column. I on Python vector) to an existing DataFrame with PySpark?

Why do you want to join the rdd back to the dataframe, I would rather create a new column from "Articles" directly. There are multiple ways to do it, here are my 5 cents:

from pyspark.sql import Row
from pyspark.sql.context import SQLContext
sqlCtx = SQLContext(sc)    # sc is the sparkcontext

x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')]
df = sqlCtx.createDataFrame(x)

df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words'])
df2.show()

You get the following output:

Articles                 words
the cat and dog ran      [the, cat, and, dog, ran]
we went to the park      [we, went, to, the, park]
today it will rain       [today, it, will, rain]

Let me know if you were looking to achieve something else.

5 Ways to add a new column in a PySpark Dataframe, 5 Ways to add a new column in a PySpark Dataframe In my last post on Spark, I explained how to work with PySpark RDDs and We can use .withcolumn along with PySpark SQL functions to create a new column. I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column. I've tried the following without any success: type ( randomed_hours ) # => list # Create in Python and transform to RDD new_col = pd .

A simple approach but effective would be to use udf. You can:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None], 
"string" ).toDF("Articles")

split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType())
df = df.withColumn('Words', split_words(df['Articles']))

df.show(10,False)
>>
+-------------------+-------------------------+
|Articles           |Words                    |
+-------------------+-------------------------+
|the cat and dog ran|[the, cat, and, dog, ran]|
|we went to the park|[we, went, to, the, park]|
|today it will rain |[today, it, will, rain]  |
|null               |null                     |
+-------------------+-------------------------+

I added check for None because it very usual to have in your data bad lines. You can drop them easily after splitting or before ,with dropna.

But in my opinion if you want to do this as a preparation task for Text analytics it would be probably to your best interest to build a Pipeline as @user9613318 suggests in his answer

How do I add a new column to a Spark DataFrame , How do I add a new column to a Spark DataFrame (using PySpark)? type(randomed_hours) # => list. # Create in Python and transform to RDD. new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"]) Add a new column to a PySpark DataFrame from a Python list memory it may imply many additional disk I/O to spill RDD blocks to Adding new column to existing

rdd1 = spark.sparkContext.parallelize([1, 2, 3, 5])
# make some transformation on rdd1:
rdd2 = rdd.map(lambda n: True if n % 2 else False)
# Append each row in rdd2 to those in rdd1.
rdd1.zip(rdd2).collect()

pyspark.sql module, Column A column expression in a DataFrame . pyspark.sql. Returns a new DataFrame by adding a column or replacing the existing column that has the same  How do you add a numpy.array as a new column to a pyspark.SQL DataFrame? new column sqlContext.createDataFrame(rdd) # Rebuild data frame add new column in

How do I add a new column to a Spark DataFrame (using PySpark , How do I add a column to a DataFrame in Pyspark? Adding a new column or multiple columns to Spark DataFrame can be done using withColumn() and select() methods of DataFrame, In this article, I will explain how to add a new column from the existing column, adding a constant or literal value and finally adding a list column to DataFrame.

Spark DataFrame withColumn, is created using sqlContext, you have to specify the schema or by default can be available in the dataset. If the schema is specified, the workload becomes tedious when changing every time. @since (1.4) def coalesce (self, numPartitions): """ Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. Similar to coalesce defined on an :class:`RDD`, this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.

Cheat sheet PySpark SQL Python.indd, ” function on DataFrame is used to update the value of an existing column. In order to change the value, pass an existing column name as a first argument and value to be assigned as a second column. Note that the second argument should be Column type . Creates a DataFrame from an RDD, a list or a pandas.DataFrame. When schema is a list of column names, the type of each column will be inferred from data. When schema is None, it will try to infer the schema (column names and types) from data, which should be an RDD of Row, or namedtuple, or dict.

Comments
  • please share example data, you probably need to join though.
  • How are they matched? why are the words for [the, cat, and, dog, ran] matched to the article the cat and dog ran and not another article?
  • In my case all rows are randomly generated, so order is unimportant. This should work well, thank you.
  • @bendl In general you still prefer methods, which don't require combining structures. If you generate random data, then udf or map should work perfectly.
  • That's pretty much what I want but I have 3k articles and I want to apply a function to each of those articles to perform some cleaning (not just split) and put it in a dataframe like you have above. This is my first time using pyspark so I'm not sure of the best approaches.
  • Can you provide a sample file of your actual data. Any arbitrary function can be applied with the help of udf in spark
  • got it to work using udf: newdf = df.withColumn("words",udf_clean_text("articles")) thanks!!
  • If this solved your problem, make sure to mark this answer as correct.
  • Please explain how and why your code solves the problem or improves the code from the question.