How can i save RDD to a single parquet file?

Related searches

I work with pyspark 2.0, hadoop 2.7.2. And here is my code:

def func(df):
    new_df = pd.DataFrame(df['id'])
    new_df['num'] = new_df['num'] * 12
    return new_df

set = sqlContext.read.parquet("data_set.parquet")
columns = set.columns
map_res = set.rdd.mapPartitions(lambda iter_: func(pd.DataFrame(list(iter_), 
                                                   columns=columns)))

Now, I need to save map_res RDD as a parquet file new.parquet. Is there any way i can do it without creating a large dataframe before the saving? Or may be there is a possibility of saving each partition of RDD separately and then merge all saved files?

P.s. I want to manage without creating a dataframe due to its realy large size.

There are only 2 ways to do this:

One is use "coalesce(1)" This will make sure that all the data is saved into 1 file rather than multiple files (200 is the spark default no of partitions) use dataframe.write.save("/this/is/path").

The other option is write the output to a hive table and then use hive -e "select * from table" > data.tsv which will be tab separated.

How to write data in the dataframe into single .parquet file(both data , Dataframes can be saved as Parquet Files, but RDD's cannot. This is because Parquet Files requires a schema. RDD's aren't required to have� 1.Create a Case Class containing the Schema for the RDD. 2.Create a DF using sqlContext.createDataFrame method. Here you need to convert RDD to org.apache.spark.sql.Row with the Help of Case Class Name. 3. Save the Dataframe name to parquet File.

I suggest this:

dataframes = []
#creating index
map_res = map_res.zipWithIndex()
# setting index as key
map_res = map_res.map(lambda x: (x[1],x[0]))
# creating one spark df per element
for i in range(0, map_res.count()):
    partial_dataframe_pd  = map_res.lookup(i)
    partial_dataframe = sqlContext.createDataFrame(partial_dataframe_pd)
    dataframes.append(partial_dataframe)
# concatination
result_df = dataframes.pop()
for df in dataframes:
    result_df.union(df)   
#saving
result_df.write.parquet("...")

If you have small number of partitions (2-100) then it should work rather fast.

Can I save an RDD as Parquet Files?, Spark provides the capability to append DataFrame to existing parquet files using “append” save mode. In case, if you want to overwrite use “� Read Input from Text File. Create an RDD DataFrame by reading a data from the parquet file named employee.parquet using the following statement. scala> val parqfile = sqlContext.read.parquet(“employee.parquet”) Store the DataFrame into the Table. Use the following command for storing the DataFrame data into a table named employee. After

To save a file in Parquet format , you need to convert Rdd to DataFrame, As Parquet File always need a schema for processing .

Parquet Files - Spark 3.0.0 Documentation, // Convert rdd to data frame using toDF; the following import is required to use toDF function. val df: DataFrame = rdd.toDF() // Write file to parquet df.write. parquet("� Pandas is great for reading relatively small datasets and writing out a single Parquet file. Spark is great for reading and writing huge datasets and processing tons of files in parallel. Suppose your data lake currently contains 10 terabytes of data and you’d like to update it every 15 minutes.

You can use :

set.coalesce(1).write.parquet("myFile.parquet")

Spark Read and Write Apache Parquet file — Spark by {Examples}, Similar to write, DataFrameReader provides parquet() function (spark.read.parquet) to read the parquet files and creates a Spark DataFrame. In this example snippet, we are reading data from an apache parquet file we have written before.

Write and Read Parquet Files in Spark/Scala, You can also use Scala shell to test instead of using IDE. Scala SDK is also required. In my case, I am using the Scala SDK distributed as part of my Spark. JDK. JDK is required to run Scala in JVM. Read and Write parquet files . In this example, I am using Spark SQLContext object to read and write parquet files. Code

Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile <-read.parquet ("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. createOrReplaceTempView (parquetFile, "parquetFile") teenagers <-sql ("SELECT name

EventLog enabled so you can look at how those parquet files are worked with in DAGs and metrics. Before you right some SparkSQL on that file, make sure you register a table name. If you don't want to do a write that will file if the directory/file already exists, you can choose Append mode to add to it. It depends on your use case.

Comments
  • @santon It seems it is required to merge all single dataframes into large one preserving the schema. Keeping them as elements of RDD won't allow to manipulate with results as with DataFrame.
  • @ИванСудос correct, so i don't want all the data to be moved to one node
  • @santon when you make pipelines single parquet-files as parameters are easier to deal with