Handle Akka stream's first element specially

akka-stream recover
restartsource akka
akka streams logging
akka streams book
akka stream retry
akka stream error handling
akka streams killswitch
akka source

Is there an idiomatic way of handling Akka stream's Source first element in a special way? What I have now is:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }

Thanks

While I would generally go with Ramon's answer, you could also use prefixAndTail, with a prefix of 1, together with flatMapConcat to achieve something similar:

val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest:  $i")

val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
  // `head` is a Seq of the prefix elements, which in our case is
  // just the first one. We can convert it to a source of just
  // the first element, processed via our fst flow, and then
  // concatenate `tail`, which is the remainder...
  Source(head).via(fst).concat(tail.via(rst))
}

Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest:  2
// Rest:  3
// Rest:  4
// Rest:  5

This of course works not just for the first item, but for the first N items, with the proviso that those items will be taken up as a strict collection.

Dynamic stream handling • Akka Documentation, Introduction. Akka Streams provides a way of handling File IO and TCP connections with Streams. While the general approach is� We start with the tweet stream of authors: var authors = tweets .Where(t => t.HashTags.Contains("Akka.Net")) .Select(t => t.Author); Assume that we can lookup their email address using: Task<string> LookupEmail(string handle) The Task is completed with Failure if the email is not found.

Using zipWith

You could zip the original Source with a Source of Booleans that only returns true the first time. This zipped Source can then be processed.

First we'll need a Source that emits the Booleans:

//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] = 
  (Iterator single true) ++ (Iterator continually false)

def firstTrueSource : Source[Boolean, _] = 
  Source fromIterator firstTrueIterator

We can then define a function that handles the two different cases:

type Data = ???
type OutputData = ???

def processData(data : Data, firstRun : Boolean) : OutputData = 
  if(firstRun) { ... }
  else { ... }

This function can then be used in a zipWith of your original Source:

val originalSource : Source[Data,_] = ???    

val contingentSource : Source[OutputData,_] =
  originalSource.zipWith(firstTrueSource)(processData)

Using Stateful Flow

You could create a Flow that contains state similar to the example in the question but with a more functional approach:

def firstRunner(firstCall : (Data) => OutputData,
                otherCalls : (Data) => OutputData) : (Data) => OutputData = {
  var firstRun = true
  (data : Data) => {
    if(firstRun) {
      firstRun = false
      firstCall(data)
    }
    else
      otherCalls(data)
  }
}//end def firstRunner

def firstRunFlow(firstCall :  (Data) => OutputData, 
                 otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] = 
  Flow[Data] map firstRunner(firstCall, otherCalls)

This Flow can then be applied to your original Source:

def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???

val firstSource : Source[OutputData, _] = 
  originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)

"Idiomatic Way"

Answering your question directly requires dictating the "idiomatic way". I answer that part last because it is the least verifiable by the compiler and is therefore closer to opinion. I would never claim to be a valid classifier of idiomatic code.

My personal experience with akka-streams has been that it is best to switch my perspective to imagining an actual stream (I think of a train with boxcars) of Data elements. Do I need to break it up into multiple fixed size trains? Do only certain boxcars make it through? Can I attach another train side-by-side that contains Boolean cars which can signal the front? I would prefer the zipWith method due to my regard of streams (trains). My initial approach is always to use other stream parts connected together.

Also, I find it best to embed as little code in an akka Stream component as possible. firstTrueIterator and processData have no dependency on akka at all. Concurrently, the firstTrueSource and contingentSource definitions have virtually no logic. This allows you to test the logic independent of a clunky ActorSystem and the guts can be used in Futures, or Actors.

Working with streaming IO • Akka Documentation, import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl. _ final case class Author(handle: String) final case class Hashtag(name: String)� Working with streaming IO Dependency. To use Akka Streams, add the module to your project: sbt val AkkaVersion = "2.6.8" libraryDependencies += "com.typesafe.akka" %% "akka-stream" % AkkaVersion

You can use prepend to prepend a source to flows. Just prepend single item source to the flow, after it is drained, rest of the original source will continue.

https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/prepend.html

 Source(List(1, 2, 3))
  .prepend(Source.single(0))
  .runWith(Sink.foreach(println))

0 1 2 3

Streams Quickstart Guide • Akka Documentation, Executing asynchronous operations in a streaming context, while maintaining backpressure in the stream, is easy to do with the Akka Streams� I'm using the Akka Streams Kafka library to interact with Kafka broker. I have the following stream definition with an UnBounded buffer: def producerStream[T: MessageType](producerProperties: Map

While I prefer the approach with zip, one can also use statefulMapConcat:

source
  .statefulMapConcat { _ =>
        var firstRun = true
        elem => {
          if (firstRun) {
            //first
            firstRun = false
          } else {
            //not first            
          }
        }
      }

Backoff and Retry Error-Handling for Akka Streams, This is a simple example of how the Akka Streams API can be used to handle the processing of unbounded streaming-data, while an actor can� But Akka Streams is very young and will continue to provide new features quickly. Iteratee ++ Very good for this use case (broadcast and channel are very handy, merge is very easy) – Low level / complex cases can be more difficult. Both are very good stream processing libraries, with high level API and good capabilities to handle back pressure.

Integrating Akka Streams and Akka Actors: Part I, One of the most fundamental things when designing a stream is error handling and there are multiple ways to approach it in Akka Streams. As the world is growing, so is data and with that analysis of this data has become important. Akka Stream is built on top of Akka-actors to handle the streaming data. It is used to analyze the

Akka Streams: error handling in event processing pipelines, I have been using Akka Streams for stream processing in several projects. It's a very powerful streaming library that offers you many ways to configure your streams� A consumer subscribes to Kafka topics and passes the messages into an Akka Stream. Settings. When creating a consumer stream you need to pass in ConsumerSettings that define things like: de-serializers for the keys and values; bootstrap servers of the Kafka cluster; group id for the consumer, note that offsets are always committed for a given

Akka streams error handling article � GitHub, Hi, What is the right way to handle exceptions in akka streams? Closing the stream and retrying is not an option as the failure happens due to� For each of these Aeron sub-channels we run a separate Akka stream. The following diagram shows how the stages are composed for the control streams. Compared to the streams for the ordinary messages (see above) the control streams handle more things, e.g. reliable delivery of system messages.

Comments
  • Didn't you mean var instead of val?