Manual commit messages with consumer group using kafka-node

confluent kafka nodejs
kafkajs vs kafka-node
kafka-node ssl example
kafka.client is not a constructor
kafka-node manual commit
kafka-node list topics

I want to manually commit messages when all tasks are finished(like pushing message into database ), in my consumer group. How can I disable auto commit and commit messages manually.

Use node-rdkafka instead. It works very well . you can commit a message like

consumer.commit({ topic: data.topic, partition: data.partition, offset: data.offset + 1 }, function(err, data) {})

How to commit manually in consumer group ? · Issue #683 · SOHU , I am committing offsets as follows , but after restarting consumer old messages are SOHU-Co / kafka-node names, as the broker stores the offset information using a ConsumerGroup's name. But when switching to autoCommit false, and trying to commit manually i get a message "Commit not needed"  Limiting the amount of handlers using the async library. In the code above, instead of directly handling the fetched messages, we define a queue with a max size and a max parallel handles (line 33), and push new messages into that queue (line 26). We then pause the Kafka consumer once we reach the size limit (line 27).

Set autoCommit false

const kafka = require('kafka-node');

const config = require('../config');

const client = new kafka.KafkaClient({
  kafkaHost: config.kafkaHost

const consumer = new kafka.Consumer(client, [
    topic: config.kafkaTopic
], {
  autoCommit: false

Then commit manually-

consumer.on('message', (message) => {
  console.log('message', message);
  // feed data into db 
  consumer.commit((error, data) => {
    if (error) {
    } else {
      console.log('Commit success: ', data);

how do i can commit messages by offset with consumerGroup , how do i can commit messages by offset with consumerGroup #502 Just curious what's your use case for manually committing the offsets? zookeeper manage the offsets no? and as I understood the consumerGroup is for the kafka 0.9+ kafka-node: add sendOffsetCommitV2Request to Client #17194. Hi , when I used the regular consumer , I did : offset.commit(groupId,topics,function(){}) for committing specific offsets. topics is array of: {topic: test, offset: 5} for example. now I want to do the same with consumerGroup , but i ne

set auto commit enable is false.


kafka-node, ConsumerGroup does not consume on all partitions; How to throttle messages Follow the instructions on the Kafka wiki to build Kafka and get a test By default​, we will consume messages from the last committed offset of  { groupId: 'kafka-node-group', //consumer group id, default `kafka-node-group` // Auto commit config autoCommit: true, autoCommitIntervalMs: 5000, // The max wait time is the maximum amount of

Managing Consumer Commits and Back-pressure With Node.js and , A detailed walk-through on creating a Node.js based kafka consumer. Committing offset 5 of partition 1 for some consumer group, means all We also learned how to prevent data loss by manually committing only handled messages. KafkaConsumers can commit offsets automatically in the background (configuration parameter = true) what is the default setting. Those auto commits are done within poll() (which is typically called in a loop). How frequently offsets should be committed, can be configured via

Consuming Messages · KafkaJS, fromBeginning. The consumer group will use the latest committed offset when starting to fetch messages. If the offset is invalid or not defined, fromBeginning  In Consumer Config set auto.commit.enable to true, this will enable consumer to commit the offset to zookeeper for already fetched messages. Also change auto.offset.reset to 'largest' to not read messages from smallest possible offset. Try this out and see if you still get the problem, you can monitor the offset update

How Kafka's Consumer Auto Commit Configuration Can Lead to , to data loss or duplication if you're using Kafka's consumer auto commit configuration. of Kafka, including producer and consumer groups, topic partitions, and offsets. Every message your producers send to a Kafka partition has an Unless you're manually triggering commits, you're most likely using  With these settings in place you will want to call consumer.StoreOffsets() as opposed to consumer.CommitAsync() after processing each message. This will provide your application with more fine-grained control over how it commits the offsets it has processed without the high costs of calling consumer.CommitAsync() after each message.