Converting Pandas dataframe into Spark dataframe error

convert pandas to spark dataframe python
spark dataframe topandas()
pyspark create dataframe
spark dataframe vs pandas dataframe
pandas dataframe to rdd
pandas dataframe to sqlcontext
scala dataframe to python dataframe
spark create table from pandas dataframe

I'm trying to convert Pandas DF into Spark one. DF head:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691

Code:

dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)

And I got an error:

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

You need to make sure your pandas dataframe columns are appropriate for the type spark is inferring. If your pandas dataframe lists something like:

pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol                    5062 non-null object
Col2                       5062 non-null object

And you're getting that error try:

df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)

Now, make sure .astype(str) is actually the type you want those columns to be. Basically, when the underlying Java code tries to infer the type from an object in python it uses some observations and makes a guess, if that guess doesn't apply to all the data in the column(s) it's trying to convert from pandas to spark it will fail.

Converting Pandas dataframe into Spark dataframe error, You can avoid type related errors by imposing a schema as follows: Suppose a text file was created (samp.csv) with the original data (as� I have the below script. I've converted a spark DF into a Pandas DF to carry out my functions. I now have an output dataframe DF6, which is exactly what I needed. I now need to write the data back to HDFS (Which Pandas is unable to do), so I need to convert the Pandas dataframe back to Spark and write it to the directory.

Type related errors can be avoided by imposing a schema as follows:

note: a text file was created (test.csv) with the original data (as above) and hypothetical column names were inserted ("col1","col2",...,"col25").

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

pdDF = pd.read_csv("test.csv")

contents of the pandas data frame:

       col1     col2    col3    col4    col5    col6    col7    col8   ... 
0      10000001 1       0       1       12:35   OK      10002   1      ...
1      10000001 2       0       1       12:36   OK      10002   1      ...
2      10000002 1       0       4       12:19   PA      10003   1      ...

Next, create the schema:

from pyspark.sql.types import *

mySchema = StructType([ StructField("col1", LongType(), True)\
                       ,StructField("col2", IntegerType(), True)\
                       ,StructField("col3", IntegerType(), True)\
                       ,StructField("col4", IntegerType(), True)\
                       ,StructField("col5", StringType(), True)\
                       ,StructField("col6", StringType(), True)\
                       ,StructField("col7", IntegerType(), True)\
                       ,StructField("col8", IntegerType(), True)\
                       ,StructField("col9", IntegerType(), True)\
                       ,StructField("col10", IntegerType(), True)\
                       ,StructField("col11", StringType(), True)\
                       ,StructField("col12", StringType(), True)\
                       ,StructField("col13", IntegerType(), True)\
                       ,StructField("col14", IntegerType(), True)\
                       ,StructField("col15", IntegerType(), True)\
                       ,StructField("col16", IntegerType(), True)\
                       ,StructField("col17", IntegerType(), True)\
                       ,StructField("col18", IntegerType(), True)\
                       ,StructField("col19", IntegerType(), True)\
                       ,StructField("col20", IntegerType(), True)\
                       ,StructField("col21", IntegerType(), True)\
                       ,StructField("col22", IntegerType(), True)\
                       ,StructField("col23", IntegerType(), True)\
                       ,StructField("col24", IntegerType(), True)\
                       ,StructField("col25", IntegerType(), True)])

Note: True (implies nullable allowed)

create the pyspark dataframe:

df = spark.createDataFrame(pdDF,schema=mySchema)

confirm the pandas data frame is now a pyspark data frame:

type(df)

output:

pyspark.sql.dataframe.DataFrame

Aside:

To address Kate's comment below - to impose a general (String) schema you can do the following:

df=spark.createDataFrame(pdDF.astype(str)) 

Optimize conversion between PySpark and pandas DataFrames , Learn how to use convert Apache Spark DataFrames to and from pandas DataFrames if an error occurs before the computation within Spark. create the pyspark dataframe: df = spark.createDataFrame(pdDF,schema=mySchema) confirm the pandas data frame is now a pyspark data frame: type(df) output: pyspark.sql.dataframe.DataFrame Aside: To address Kate's comment below - to impose a general (String) schema you can do the following: df=spark.createDataFrame(pdDF.astype(str))

I made this script, It worked for my 10 pandas Data frames

from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return DateType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)


# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

You can see it also in this gist

With this you just have to call spark_df = pandas_to_spark(pandas_df)

Create a Spark DataFrame from Pandas or NumPy with Arrow , This could also be included in spark-defaults.conf to be enabled for all sessions. Spark simply takes the Pandas DataFrame as input and converts� So I have looked up this question on here but previous solutions have not worked for me. I have a DataFrame in this format. mdf.head() dbn boro bus 0 17K548 Brooklyn B41, B43, B44-SBS, B45, B48, B49, B69 1 09X543 Bronx Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A, 4 28Q680 Queens Q25, Q46, Q65 6 14K474 Brooklyn B24, B43, B48, B60, Q54, Q59

I have tried this with your data and it is working :

%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()

PySpark Usage Guide for Pandas with Apache Arrow, converting a Spark DataFrame to a Pandas implementation if an error occurs before the� To convert pyspark dataframe into pandas dataframe, you have to use this below given command. $ pandas_df = spark_df.select("*").toPandas() Hope this will help you.

In spark version >= 3 you can convert pandas dataframes to pyspark dataframe in one line

use spark.createDataFrame(pandasDF)

dataset = pd.read_csv("data/AS/test_v2.csv")

sparkDf = spark.createDataFrame(dataset);

if you are confused about spark session variable, spark session is as follows

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

Spark SQL and DataFrames, converting a Spark DataFrame to a Pandas are currently supported and an error can be� All Spark SQL data types are supported by Arrow-based conversion except MapType, ArrayType of TimestampType, and nested StructType. StructType is represented as a pandas.DataFrame instead of pandas.Series. BinaryType is supported only when PyArrow is equal to or higher than 0.10.0.

[SOLVED] Converting Pandas dataframe into Spark dataframe error , You need to make sure your pandas dataframe columns are appropriate for the type spark is inferring. If your pandas dataframe lists something like: pd.info()� Converting a PySpark DataFrame to Pandas is quite trivial thanks to toPandas()method however, this is probably one of the most costly operations that must be used sparingly, especially when dealing…

How to convert pyspark Dataframe to pandas Dataframe?, Learn how to use convert Apache Spark DataFrames to and from pandas DataFrames using Apache Arrow in Azure Databricks. fall back to a non-Arrow implementation if an error occurs before the computation within Spark. The first half of the video talks about importing an excel file, but the second half focuses on associating/importing a dataset to a python notebook, and then converting that pandas dataframe to a pySpark dataframe.

DataFrame — Dataset of Rows with RowEncoder � The Internals of , I want to convert my pyspark dataframe to pandas dataframe for some operation. How can I do How to convert rdd object to dataframe in spark. SqlContext has a Hi@akhtar, In your error, it shows that you READ MORE. Please note that the use of the .toPandas() method should only be used if the resulting Pandas's DataFrame is expected to be small, as all the data is loaded into the driver&#039;s memory (you can look at the code at: apache/spark).

Comments
  • My first assumption is that the file contains both numbers and strings in one column and Spark confuses over it. However, it should be handled by Pandas when importing.
  • does your DF have column names?
  • Yes it has. Should I disable them?
  • no, but it would be helpful if you would put it to your DF head output. Try to skip the 11-nth column (with NA's) and rerun your code
  • Why don't you use spark-csv?
  • I found this very helpful. Follow-up question: When I went through and followed these steps for my own dataframe, I did not see any change to the pd.info(). How exactly is the dataframe itself changing? How could I check to see the pandas DataFrame has changed after using the .astype(str)?
  • Is it possible to generalize the schema creation part to just have it create all columns as a certain type? For example, just tell it that all columns as StringType (instead of assigning each column individually)
  • df=spark.createDataFrame(pdPD.astype(str))
  • verified this all works, also verified output goes through from pyspark out to parquet and into scala. Thanks Gonzalo. Wouldn't begin to know how, but this seems like a brilliant contribution to the open source community. maybe like pd.to_sparkdf() or something.
  • Gonzalo, I just forked your gist to support ArrayType[StringType]. Thanks again. Readers, this is great solution to go from pandas to pyspark and scala spark.
  • No problem tony! I'm glad it was useful
  • I turned it into a class on the fork. Thanks again!
  • For my data it takes like forever