Kafka streams - KSQL - Split messages and publish to another topic

kafka connect
kafka topic multiple message types
confluent
kafka streams example
kafka topic design best practices
ksql select * from topic
multiple schema in one kafka topic
ksql array of struct

Is there a way to split a message into multiple messages using KSQL and publish to a new topic. Just to be clear, I am not looking for a Java based listener and iterate/stream it to a new topic; instead, I am looking for a KSQL that does that for me.

For example:

Let's say, I need messages in invoice topic split into item_inventory_delta messages

invoice topic

key: saleschecknumber

message example:

{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}
item_inventory_delta topic

key: saleschecknumber_itemID

message examples

1.
{
    "itemId": 123,
    "quantity": 1
}

2.

{
    "itemId": 345,
    "quantity": 5
}

As of ksqlDB 0.6 you can now do this, thanks to the addition of the EXPLODE table function.

Given a topic invoice with JSON payload per your example, first inspect the topic using PRINT to dump its contents:

ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}

Then declare a schema on topic of the topic, which gives us a ksqlDB stream:

CREATE STREAM INVOICE (total DOUBLE, 
                       salecounter INT, 
                       items ARRAY<STRUCT<itemId INT, 
                                          quantity INT>>) 
                WITH (KAFKA_TOPIC='invoice', 
                      VALUE_FORMAT='JSON');

This simply "registers" the existing topic for use with ksqlDB. No new Kafka topics are written, until the next step.

Create a new Kafka topic, populated continually from the messages arriving in the source stream:

CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS 
  SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID, 
         EXPLODE(ITEMS)->QUANTITY AS QUANTITY 
    FROM INVOICE;

New topic has been created:

ksql> SHOW TOPICS;

 Kafka Topic                     | Partitions | Partition Replicas
-------------------------------------------------------------------
 invoice                         | 1          | 1
 item_inventory_delta            | 1          | 1

Topic has delta messages as requested :)

ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}

Data Wrangling with Apache Kafka and KSQL, KSQL, the SQL streaming engine for Apache Kafka�, puts the power of By continually streaming messages from one Kafka topic to another, CREATE STREAM flood_monitoring_059793 \ (meta STRUCT<publisher� Kafka Streams is a client-side library. You can use two different APIs to configure your streams: Kafka Streams DSL - high-level interface with map, join, and many other methods. You design your topology here using fluent API. Processing API - low-level interface with greater control, but more verbose code.

There are many ways to handle for my understanding its more related to how we process incoming message not to aggregate the message. Easy way to use Kafka Stream Processor API which allow you customize processing logic.

Kafka Stream Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic

Note: You have not define what will output value so i am just posting key and value same but its your choice you can define your output key and value

You can define Kafka Stream processor API as below

Topology builder = new Topology();
builder.addSource("Source", "invoice")
                .addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
                .addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
                        "sourceProcessor")

Below is custom Processor approach please note its just approach not full implementation

class InvoiceProcessor implements Processor<String, String> {
        private Gson gson = new Gson();

        //constructor
        .......
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;

        }

        @Override
        public void close() {
            // Any code for clean up would go here. This processor instance will not be used
            // again after this call.
        }

        @Override
        public void process(String key, String value) {
            try {

                //Create custom inventory to map JSON object  
                //List[Item] items is member object of Inventory class
                Inventory inventory = gson.fromJson(key, Inventory.class);


                //itertae item of items List[Items]
                for(Item item: inventory.getItems()){
                context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));

                }
                //


                }


        }

    }  

Should You Put Several Event Types in the Same Kafka Topic , Should you to use one one big Kafka topic, or several small ones? to publish to Kafka as messages, do you put them in the same topic, or do you split At the other extreme, having millions of different topics is also a bad idea, since You can always split up the compound event later, using a stream� Streams can be created from a Kafka topic or derived from existing streams and tables. CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) WITH (kafka_topic='pageviews', value_format=’JSON’); 2. TABLE: A table is a view of a STREAM or another TABLE and represents a collection of evolving facts. For example, we could

For a KStream application, you can use flatMap, which accepts a fuction that takes a record and returns an iterable of zero or more records:

case class Record(total: Double, salecounter: Int, items: List[Item])
case class Item(itemId: Int, quantity: Int)

// initialize the stream 
val inputStream: KStream[String, Record] = ??? 

// split the message
inputStream.flatMap { case (key, record) => 
  record.items.map(item => (key, item) )
}

Kafka Streams for Stream processing, A message is considered "committed" when all in sync replicas for that So when creating a Topic we need to specify in how many partitions we want to split it and how many Keys play a role into assigning the topic partition(the default Kafka So why do we need Kafka Streams(or the other big stream� KSQL-Kafka Workflow. KSQL is a wrapper on top of Kafka Streams API. As we know, Kafka Streams involves coding, understanding some topologies and harder to read and write code.

Apache Kafka Streams, Kafka Streams is a client library for building applications and microservices, where Kafka as a central datahub for our services to communicate to one another. Hundreds of billions of messages are produced daily and are used to execute LINE leverages Kafka Streams to reliably transform and filter topics enabling sub� Apache Kafka: A Distributed Streaming Platform. home introduction quickstart use cases documentation getting started APIs configuration design implementation operations security kafka connect kafka streams

Real-Time Stream Processing With Apache Kafka Part 2: Kafka , Both producer and consumer are decoupled from each other and run A consumer of topics pulls messages off a Kafka topic. Data Data is stored in Kafka topics and every topic is split into one or Producer API: This API allows an application to publish a stream of records to one or more Kafka topics. A topic in Kafka consists of key-value messages. The topic is agnostic to the serialization format or “type” of its messages: it treats message keys and message values universally as byte arrays aka byte[]. In other words, at this point we have no idea yet what’s in the data. Kafka Streams and KSQL don’t have a concept of “a topic”.

Support multi schema topics � Issue #1267 � confluentinc/ksql � GitHub, apurvam changed the title Does KSQL support multi schema topics? I have a Kafka Streams app that uses the Confluent Avro serdes for the value. It would be easy to publish messages with schemas that are not Require the engineer to split the different message types out onto different topics. It's not� In summary, combining Kafka Streams processors with State Stores and an HTTP server can effectively turn any Kafka topic into a fast read-only key-value store. Architecture of a Kafka Streams

Comments
  • Thank you. Upvoted. I should have been more clear on what I was looking for. Sorry. Updated my question
  • Thanks @Brandon. I am looking for more of a non-boilerplate way to achieve this.