Iterate rows and columns in Spark dataframe

spark dataframe iterate columns scala
spark dataframe iterate rows java
spark dataframe iterate rows python
spark dataframe loop through rows pyspark
spark dataframe map row
spark dataframe foreach example java
iterate through rows spark dataframe
spark row

I have the following Spark dataframe that is created dynamically:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")

Now, I need to iterate each row and column in sqlDF to print each column, this is my attempt:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}

row is type Row, but is not iterable that's why this code throws a compilation error in row.foreach. How to iterate each column in Row?

You can convert Row to Seq with toSeq. Once turned to Seq you can iterate over it as usual with foreach, map or whatever you need

    sqlDF.foreach { row => 
           row.toSeq.foreach{col => println(col) }
    }

Output:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40

Iterate rows and columns in Spark dataframe - scala - html, createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") Now, I need to iterate each row and column in sqlDF to print each  Spark - Iterating through all rows in dataframe comparing multiple columns for each row against another

Consider you have a Dataframe like below

+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy|   aaa| 20|
|Berta|   bbb| 30|
|  Joe|   ccc| 40|
+-----+------+---+

To loop your Dataframe and extract the elements from the Dataframe, you can either chose one of the below approaches.

Approach 1 - Loop using foreach

Looping a dataframe directly using foreach loop is not possible. To do this, first you have to define schema of dataframe using case class and then you have to specify this schema to the dataframe.

import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))

Please see the result below :

Approach 2 - Loop using rdd

Use rdd.collect on top of your Dataframe. The row variable will contain each row of Dataframe of rdd row type. To get each element from a row, use row.mkString(",") which will contain value of each row in comma separated values. Using split function (inbuilt function) you can access each column value of rdd row with index.

for (row <- df.rdd.collect)
{   
    var name = row.mkString(",").split(",")(0)
    var sector = row.mkString(",").split(",")(1)
    var age = row.mkString(",").split(",")(2)   
}

Note that there are two drawback of this approach. 1. If there is a , in the column value, data will be wrongly split to adjacent column. 2. rdd.collect is an action that returns all the data to the driver's memory where driver's memory might not be that much huge to hold the data, ending up with getting the application failed.

I would recommend to use Approach 1.

Approach 3 - Using where and select

You can directly use where and select which will internally loop and finds the data. Since it should not throws Index out of bound exception, an if condition is used

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
    name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString

Approach 4 - Using temp tables

You can register dataframe as temptable which will be stored in spark's memory. Then you can use a select query as like other database to query the data and then collect and save in a variable

df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")

how to loop through each row of dataFrame in pyspark, Using list comprehensions in python, you can collect an entire column of values into a list using just two lines: df = sqlContext.sql("show tables  The custom function would then be applied to every row of the dataframe. Note that sample2 will be a RDD, not a dataframe. Map may be needed if you are going to perform more complex computations. If you just need to add a simple derived column, you can use the withColumn, with returns a dataframe. sample3 = sample.withColumn('age2', sample.age + 2)

sqlDF.foreach is not working for me but Approach 1 from @Sarath Avanavu answer works but it was also playing with the order of the records sometime.

I found one more way which is working

df.collect().foreach { row =>
   println(row.mkString(","))
}

How To Loop Through Pandas Rows? or How To Iterate Over , will attempt to copy all the data in the RDD/DataFrame into the driver machine and may run out of memory and crash. Instead, you can make sure the number of elements you return is reduced by calling take or takeSample , or perhaps filtering or sampling your RDD/DataFrame. Spark DataFrame expand on a lot of these concepts, allowing you to transfer that knowledge easily by understanding the simple syntax of Spark DataFrames. Remember that the main advantage to using Spark DataFrames vs those other programs is that Spark can handle data across many RDDs, huge data sets that would never fit on a single computer.

You should use mkString on your Row:

sqlDF.foreach { row =>
  println(row.mkString(",")) 
}

But note that this will be printed inside the executors JVM's, so norally you won't see the output (unless you work with master = local)

Row · The Internals of Spark SQL, Spark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data frame. spark sqldata framesrowspark  Iterating over rows and columns in Pandas DataFrame Iteration is a general term for taking each item of something, one after another. Pandas DataFrame consists of rows and columns so, in order to iterate over dataframe, we have to iterate a dataframe like a dictionary.

You should iterate over the partitions which allows the data to be processed by Spark in parallel and you can do foreach on each row inside the partition.

You can further group the data in partition into batches if need be

sqlDF.foreachPartition { partitionedRows: Iterator[Model1] =>     
  if (partitionedRows.take(1).nonEmpty) {
       partitionedRows.grouped(numberOfRowsPerBatch).foreach { batch =>
        batch.foreach { row => 
        .....

Spark tips. Don't collect data on driver, spark.package.scala. DataFrame is a distributed collection of tabular data organized into rows and named columns. It is conceptually equivalent  In this article we will different ways to iterate over all or certain columns of a Dataframe. Let’s first create a Dataframe i.e. Contents of created dataframe empDfObj are, Dataframe class provides a member function iteritems () i.e. It yields an iterator which can can be used to iterate over all the columns of a dataframe.

Spark Scala, to iterate every row of a dataframe without using collect. Here is my current implementation: val df = spark.read.csv("/tmp/s0v00fc/test_dir")  Spark where() function is used to filter the rows from DataFrame or Dataset based on the given condition or SQL expression, In this tutorial, you will learn how to apply single and multiple conditions on DataFrame columns using where() function with Scala examples.

DataFrame, Here derived column need to be added, The withColumn is used, with returns a dataframe. sample3 = sample.withColumn('age2', sample.age +  seena Asked on January 7, 2019 in Apache-spark. Here map can be used and custom function can be defined. For every row custom function is applied of the dataframe. Make sure that sample2 will be a RDD, not a dataframe. For doing more complex computations, map is needed. Here derived column need to be added, The withColumn is used, with returns

Iterate every row of a spark dataframe without usi, Iteration is a general term for taking each item of something, one after another. Pandas DataFrame consists of rows and columns so, in order to iterate over  how to loop through each row of dataFrame in pyspark. and then iterate through them in a for loop. Spark add new column to dataframe with value from previous row.

Comments
  • For printing dataframe why don't you use sqlDF.show?
  • println is just for the question, I need to access column data in the program
  • Possible duplicate of Spark extracting values from a Row
  • @ShankarKoirala This question is different, it asks how to extract values from columns, the one you refer to is to extract values from Rows
  • This answer is not working for me but Approach 1 from @Sarath Avanavu answer works.
  • Yeah this is shaky because casting the record into something useful is prone to fail, even though standard processing of a dataframe by spark manages casting to even case classes, automatically.
  • While this works, it brings all the Data back to the driver. If you're operating on a large Dataframe, this could crash your application.