Kafka consumer in java not consuming messages

kafka-console consumer not consuming messages
kafka consumer poll not receiving messages
kafka consumer poll timeout
kafka consumer java
kafka consumer multiple partitions
kafka consumer multiple topics
kafka consumer not reading messages
kafka multiple consumers same topic

I am trying to a kafka consumer to get messages which are produced and posted to a topic in Java. My consumer goes as follows.

consumer.java

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;



public class KafkaConsumer extends  Thread {
    final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = " AATest";
    ConsumerConnector consumerConnector;


    public static void main(String[] argv) throws UnsupportedEncodingException {
        KafkaConsumer KafkaConsumer = new KafkaConsumer();
        KafkaConsumer.start();
    }

    public KafkaConsumer(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect","10.200.208.59:2181");
        properties.put("group.id","test-group");      
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        System.out.println(stream);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext())
            System.out.println("from it");
            System.out.println(new String(it.next().message()));

    }

    private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
        for(MessageAndOffset messageAndOffset: messageSet) {
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            System.out.println(new String(bytes, "UTF-8"));
        }
    }
}

When I run the above code I am getting nothing in the console wheres the java producer program behind the screen is posting data continously under the 'AATest' topic. Also the in the zookeeper console I am getting the following lines when I try running the above consumer.java

[2015-04-30 15:57:31,284] INFO Accepted socket connection from /10.200.208.59:51780 (org.apache.zookeeper.
server.NIOServerCnxnFactory)
[2015-04-30 15:57:31,284] INFO Client attempting to establish new session at /10.200.208.59:51780 (org.apa
che.zookeeper.server.ZooKeeperServer)
[2015-04-30 15:57:31,315] INFO Established session 0x14d09cebce30007 with negotiated timeout 6000 for clie
nt /10.200.208.59:51780 (org.apache.zookeeper.server.ZooKeeperServer)

Also when I run a separate console-consumer pointing to the AATest topic, I am getting all the data produced by the producer to that topic.

Both consumer and broker are in the same machine whereas the producer is in different machine. This actually resembles this question. But going through it dint help me. Please help me.

Different answer but it happened to be initial offset (auto.offset.reset) for a consumer in my case. So, setting up auto.offset.reset=earliest fixed the problem in my scenario. Its because I was publishing event first and then starting a consumer.

By default, consumer only consumes events published after it started because auto.offset.reset=latest by default.

eg. consumer.properties

bootstrap.servers=localhost:9092
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Test
class KafkaEventConsumerSpecs extends FunSuite {

  case class TestEvent(eventOffset: Long, hashValue: Long, created: Date, testField: String) extends BaseEvent

  test("given an event in the event-store, consumes an event") {

    EmbeddedKafka.start()

    //PRODUCE
    val event = TestEvent(0l, 0l, new Date(), "data")
    val config = new Properties() {
      {
        load(this.getClass.getResourceAsStream("/producer.properties"))
      }
    }
    val producer = new KafkaProducer[String, String](config)

    val persistedEvent = producer.send(new ProducerRecord(event.getClass.getSimpleName, event.toString))

    assert(persistedEvent.get().offset() == 0)
    assert(persistedEvent.get().checksum() != 0)

    //CONSUME
    val consumerConfig = new Properties() {
      {
        load(this.getClass.getResourceAsStream("/consumer.properties"))
        put("group.id", "consumers_testEventsGroup")
        put("client.id", "testEventConsumer")
      }
    }

    assert(consumerConfig.getProperty("group.id") == "consumers_testEventsGroup")

    val kafkaConsumer = new KafkaConsumer[String, String](consumerConfig)

    assert(kafkaConsumer.listTopics().asScala.map(_._1).toList == List("TestEvent"))

    kafkaConsumer.subscribe(Collections.singletonList("TestEvent"))

    val events = kafkaConsumer.poll(1000)
    assert(events.count() == 1)

    EmbeddedKafka.stop()
  }
}

But if consumer is started first and then published, the consumer should be able to consume the event without auto.offset.reset required to be set to earliest.

References for kafka 0.10

https://kafka.apache.org/documentation/#consumerconfigs

kafka consumer not showing the consumed messages ?, I even tried consuming both ways as zookeeper and bootstrap server, but none of them are giving messages published. 76617-no-msg-in-consumer.png. Reply. problem is the kafka is unable to consume the messages. My configuration. is kafka broker on the local host and zookeeper on the local host. I. have only one broker and one consumer at present.

In our case, we solved our problem with the following steps:

The first thing we found is that there is an config called 'retry' for KafkaProducer and its default value means 'No Retry'. Also, send method of the KafkaProducer is async without calling the get method of the send method's result. In this way, there is no guarantee to delivery produced messages to the corresponding broker without retry. So, you have to increase it a bit or can use idempotence or transactional mode of KafkaProducer.

The second case is about the Kafka and Zookeeper version. We chose the 1.0.0 version of the Kafka and Zookeeper 3.4.4. Especially, Kafka 1.0.0 had an issue about the connectivity with Zookeeper. If Kafka loose its connection to the Zookeeper with an unexpected exception, it looses the leadership of the partitions which didn't synced yet. There is an bug topic about this issue : https://issues.apache.org/jira/browse/KAFKA-2729 After we found the corresponding logs at Kafka log which indicates same issue at topic above, we upgraded our Kafka broker version to the 1.1.0.

It is also important point to notice that small sized the partitions (like 100 or less), increases the throughput of the producer so if there is no enough consumer then the available consumer fall into the thread stuck on results with delayed messages(we measured delay with minutes, approximately 10-15 minutes). So you need to balance and configure the partition size and thread counts of your application correctly according to your available resources.

4. Kafka Consumers: Reading Data from Kafka, When we add a new consumer to the group, it starts consuming messages from During those seconds, no messages will be processed from the partitions owned a KafkaProducer —you create a Java Properties instance with the properties  It can only be called after consumer.run. Committing offsets does not change what message we'll consume next once we've started consuming, but instead is only used to determine from which place to start. To immediately change from what offset you're consuming messages, you'll want to seek, instead.

There might also be a case where kafka takes a long time to rebalance consumer groups when a new consumer is added to the same group id. Check kafka logs to see if the group is rebalanced after starting your consumer

Re: kafka consumer not consuming messages, Subject, Re: kafka consumer not consuming messages The set up has 3 kafka brokers running on 3 different ec2 nodes (I added the 3) Now i used the java Consumer code, and tried to get those >>>>>>>>>> messages. I followed the (light) documentation on Kafka connector and Kafka consumer in order to consume from Kafka and yet, cannot receive any message. My Kafka cluster is working, as I can post and consume messages using Python or Java client: in either case, I only need the cluster location (e.g. localhost:9092) and the topic to consume from (e.g. test).

KafkaConsumer (kafka 2.2.0 API), public class KafkaConsumer<K,V> extends java.lang. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature It automatically advances every time the consumer receives messages in a call to poll(Duration) . Instead of relying on the consumer to periodically commit consumed offsets, users  You created a simple example that creates a Kafka consumer to consume messages from the Kafka Producer you created in the last tutorial. We used the replicated Kafka topic from producer lab. You created a Kafka Consumer that uses the topic to receive messages. The Kafka consumer uses the poll method to get N number of records.

Writing a Kafka Consumer in Java, Learn about constructing Kafka consumers, how to use Java to write a Now, the consumer you create will consume those messages. The poll method is not thread safe and is not meant to get called from multiple threads. Writing a Kafka Consumer in Java DZone 's Guide to Learn about constructing Kafka consumers, how to use Java to write a consumer to receive and process records, and the logging setup.

Kafka Internals: Consumers, Producers write messages to the tail of the partitions and consumers Thus it is not possible to Consume Exactly Once with only Kafka APIs. Reduced Dependencies: the new consumer is written in pure Java. It has no dependence on the Scala runtime or on Zookeeper, which makes it a much lighter library to include in your project. Better Security: the security extensions implemented in Kafka 0.9

Comments
  • Did you try adding props.put("auto.offset.reset", "smallest"); ?
  • Yeah i tried, but got the same result, no data in the consumer end. (sorry for the delayed respnse)
  • Extremely sorry.. the problem was because of a typo before the topic name... now it got solved
  • yep that happens sometimes :)