Reading a CSV files using Akka Streams

akka-stream read file line by line
scala-csv
akka-stream to list
debugging akka streams
akka streams source from seq
akka streams book
akka streams best practices
akka streams recoverwithretries example

I'm reading a csv file. I am using Akka Streams to do this so that I can create a graph of actions to perform on each line. I've got the following toy example up and running.

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyAkkaSystem")
    implicit val materializer = ActorMaterializer()

        val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines)
        val sink = Sink.foreach(println)
        source.runWith(sink)
      }

The two Source types don't sit easy with me. Is this idiomatic or is there is a better way to write this?

Actually, akka-streams provides a function to directly read from a file.

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .runForeach(println)

Here, runForeach method is to print the lines. If you have a proper Sink to process these lines, use it instead of this function. For example, if you want to split the lines by ' and print the total number of words in it:

val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size))

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .to(sink)
      .run()

Hy! This is not as trivial as it should be in an ideal world :frowning: So you want: parse the csv to some  Akka Streams in Practice This is a sample Akka Streams project which uses the library to import data from a number of Gzipped CSV files into a Cassandra table. The CSV files contain some kind of readings, i.e. (id, value) pairs, where every id has two associated value s and the records for a given id appear in subsequent lines in the file.

The idiomatic way to read a CSV file with Akka Streams is to use the Alpakka CSV connector. The following example reads a CSV file, converts it to a map of column names (assumed to be the first line in the file) and ByteString values, transforms the ByteString values to String values, and prints each line:

FileIO.fromPath(Paths.get("a.csv"))
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMap())
  .map(_.mapValues(_.utf8String))
  .runForeach(println)

Hi, I am writing a sample program using akka streams where I am reading contents of csv file. The csv file contains 5 lines of data. Now I want to  Akka Streams in Practice This is a sample Akka Streams project which uses the library to import data from a number of Gzipped CSV files into a Cassandra table. The CSV files contain some kind of readings, i.e. (id, value) pairs, where every id has two associated value s and the records for a given id appear in subsequent lines in the file.

Try this:

package ru.io

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.Await
import scala.concurrent.duration._

object ReadStreamApp extends App {
  implicit val actorSystem = ActorSystem()
  import actorSystem.dispatcher
  implicit val flowMaterializer = ActorMaterializer()

  // читать строки из файла журнала
  val logFile = Paths.get("src/main/resources/a.csv")

  val source = FileIO.fromPath(logFile)

  // анализировать фрагменты байтов в строки
  val flow = Framing
    .delimiter(ByteString(System.lineSeparator()), maximumFrameLength = 512, allowTruncation = true)
    .map(_.utf8String)

  val sink = Sink.foreach(println)

  source
    .via(flow)
    .runWith(sink)
    .andThen {
      case _ =>
        actorSystem.terminate()
        Await.ready(actorSystem.whenTerminated, 1 minute)
    }
}

I'm reading a csv file. I am using Akka Streams to do this so that I can create a graph of actions to perform on each line. I've got the following toy example up and  Akka Streams in Practice This is a sample Akka Streams project which uses the library to import data from a number of Gzipped CSV files into a Cassandra table. The CSV files contain some kind of readings, i.e. (id, value) pairs, where every id has two associated value s and the records for a given id appear in subsequent lines in the file.

Yeah, it's ok because these are different Sources. But if you don't like scala.io.Source you can read file yourself (which sometimes we have to do e.g. source csv file is zipped) and then parse it using given InputStream like this

StreamConverters.fromInputStream(() => input)
  .via(Framing.delimiter(ByteString("\n"), 4096))
  .map(_.utf8String)
  .collect { line =>
    line
  }

Having said that consider using Apache Commons CSV with akka-stream. You may end up writing less code :)

This is a sample Akka Streams project which uses the library to import data from a number of Gzipped CSV files into a Cassandra table. The CSV files contain  The column-based nature of CSV files can be used to read it into a map of column names and their ByteString values, or alternatively to String values. The column names can be either provided in code or the first line of data can be interpreted as the column names. Scala. import akka. stream. alpakka. csv. scaladsl.

In this section we're going to use Akka Streams to read the content of a CSV file, obtaining, for each row, a Map[String, String] , where the keys represent the  Scenario: you have to parse a large CSV file (~90MB), practically read the file, and create one Java object for each of the lines. In real life, the CSV file contains around 380,000 lines.

If you're new to the world of stream processing, I recommend reading the In this example we'll use Akka Streams to ingest a CSV file which  Import data from CSV files to Cassandra using Akka Streams with Java 8 cassandra gradle csv-files reactive-streams java-8 akka-streams javaslang vavr gzipped-files Updated May 19, 2017

With respect to log processing, Akka streams are particularly useful for will show how to continuously read entries from a log file, comparable to the to justify a dedicated blog post: CSV parsing with Scala and shapeless. Check out Reading and Writing CSV files using core Java tutorial for more examples. Read CSV File using OpenCSV. OpenCSV is a very popular library for reading, writing, parsing, serializing, and deserializing CSV files in Java. This library is a good choice for handling different CSV formats, delimiters, and special characters.

Comments
  • Yeah I came across this the other day. I will probably use it. I tried using the PureCSV library but it loads all the file in memory before processing which defeats the purpose of using a stream based approach.
  • Hi, I was trying to run this example for a few minutes, I didn't succeeded (implicits and all). Can I ask you please to provide missing imports and setup?