reading only specific messages from kafka topic

consume message from kafka topic
kafka consumer from beginning
kafka consumer read from offset
kafka consumer example
kafka consumer multiple topics
kafka consumer read old messages
kafka get message at offset
kafka consumer last n messages


I am writing data JSON object data into kafka topic while reading I want to read an only specific set of messages based on the value present in the message. I am using kafka-python library.

sample messages:

{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}

Here I want to read only messages having flow_Status as completed.

In Kafka it's not possible doing something like that. The consumer consumes messages one by one, one after the other starting from the latest committed offset (or from the beginning, or seeking at a specific offset). Depends on your use case, maybe you could have a different flow in your scenario: the message taking the process to do goes into a topic but then the application which processes the action, then writes the result (completed or failed) in two different topics: in this way you have all completed separated from failed. Another way is to use a Kafka Streams application for doing the filtering but taking into account that it's just a sugar, in reality the streams application will always read all the messages but allowing you to filter messages easily.

4. Kafka Consumers: Reading Data from Kafka, Kafka Consumers: Reading Data from Kafka Applications that need to read data from that needs to read messages from a Kafka topic, run some validations against them, Now suppose we created a new consumer, C1, which is the only consumer in group G1 However, the Kafka API also lets you seek a specific offset. Apache Kafka is a publish-subscribe based durable messaging system. A messaging system sends messages between processes, applications, and servers. Apache Kafka is a software where topics can be defined (think of a topic as a category), applications can add, process and reprocess records.

You can create two different topics; one for completed and another for failure status. And then read messages from the completed topics to handle them.

Otherwise, if you want them to be in a single topic and want to read only completed ones, I believe you need to read them all and ignore the failure ones using a simple if-else condition.

Reading one specific message from a Kafka topic - DEV, can be used to read one specific message from a Kafka topic, given its partition number and offset. import org.apache.kafka.clients.consumer. Spark allows you to read an individual topic, a specific set of topics, a regex pattern of topics, or even a specific set of partitions belonging to a set of topics. We will only look at an example of reading from an individual topic, the other possibilities are covered in the Kafka Integration Guide.

Kafka consumer doesn't support this kind of functionality upfront. You will have to consume all events sequentially, filter out the status completed events and put it somewhere. Instead you can consider using Kafka Streams application where you can read the data as a stream and filter the events where flow_status = "completed" and publish in some output topic or some other destination.

Example :

KStream<String,JsonNode> inputStream=;
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));

P.S. Kafka doesn't have official release for Python API for KStream but there is open source project :

How Kafka consumer can start reading messages from a different , If a topic has 4 partitions and I have only one consumer C1 in my group, this guy will get messages from all the partitions. If I had another� Partition: A topic partition is a unit of parallelism in Kafka, i.e. two consumers cannot consume messages from the same partition at the same time. A consumer can consume from multiple partitions

As of today it is not possible to achieve it at broker end, there is a Jira feature request open to apache kafka to get this feature implemented, you can track it here, i hope they will get this implemented in near future:

I feel the best way is to use a RecordFilterStrategy (Java/spring) interface and filter it at consumer end.

How to read from a specific offset and partition with , Next let's open up a console consumer to read records sent to the topic in the previous step, but you'll only read from the first partition. Kafka partitions are zero � I am writing data JSON object data into kafka topic while reading I want to read an only specific set of messages based on the value present in the message. I am using kafka-python library. sample messages: {flow_status: "completed", value: 1, active: yes} {flow_status:"failure",value 2, active:yes}

Kafka consumer not able to start reading messages from the last , I am using Apache spark (consumer) to read messages from Kafka to my consumer. The reset option only prints the result READ MORE. Kafka as a broker service has a very simple API, and could practically be used with many kinds of applications and application architectures leveraging the brokers for i/o and queueing messages.

Part 1: Apache Kafka for beginners, Kafka only exposes a record to a consumer after it has been committed Consumers can read messages starting from a specific offset and are� But, if it always delivers all messages without duplication, that is exactly-once; Initially, Kafka only supported at-most-once and at-least-once message delivery. However, the introduction of Transactions between Kafka brokers and client applications ensures exactly-once delivery in Kafka. To understand it better, let's quickly review the

Consuming messages from Kafka topics, The default setting is Latest, which means that only messages published after the flow is started will be read. If Earliest is selected, the KafkaConsumer node starts � Kafka Training: Using Kafka from the command line starts up ZooKeeper, and Kafka and then uses Kafka command line tools to create a topic, produce some messages and consume them.

  • so I can have 3 topics, 1 for whole log, 1 for completed status , 1 for failure status... job will write to topic 1, then will filter data based on status to other topic.
  • exactly, somehow the status for you is the kind of the message which deserves a different topic in this use case (one for completed and one for failure)
  • is it good approach, to have single topic with two partitons(one for completed,one for failure), while sending will keep logic in producer to send data to respective partitions... at consumer end, will create separated consumer_groups , one group to read from failed partition and other to read from completed partition
  • the producer side could be good yes but you need to implement a custom partitioner for doing that. On the consumer side is quite the opposite, two consumers needs to be in the same consumer group in order to have one partition assigned each one. If they are part of different consumer groups they will get all messages from both partitions. In any case it doesn't work well, because if one consumer crashes, the other will get the other partition (receiving completed and failed messages). You could avoid using consumer groups but direct partitions assignment.