Hot questions for Apache Kafka

Top 10 Java Open Source / Spring / Apache Kafka

Question:

Edit FYI: working gitHub example


I was searching the internet and couldn't find a working and simple example of an embedded Kafka test. My setup is:

  • Spring boot
  • Multiple @KafkaListener with different topics in one class
  • Embedded Kafka for test which is starting fine
  • Test with Kafkatemplate which is sending to topic but the @KafkaListener methods are not receiving anything even after a huge sleep time
  • No warnings or errors are shown, only info spam from Kafka in logs

Please help me. There are mostly over configured or overengineered examples. I am sure it can be done simple. Thanks, guys!

@Controller
public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
        LOG.debug("Receiving event with payload [{}]", payload);
        //I will do database stuff here which i could check in db for testing
    }
}

private static String SENDER_TOPIC = "test.kafka.topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
        Thread.sleep(10000);
    }

Answer:

Embedded Kafka tests work for me with below configs,

Annotation on test class

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333", 
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

Before annotation for setup method

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

Note: I am not using @ClassRule for creating embedded Kafka rather auto-wiring @Autowired embeddedKafka

@Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}

Hope this helps!

Edit: Test configuration class marked with @TestConfiguration

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}

Now @Test method will autowire KafkaTemplate and use is to send message

kafkaTemplate.send(topic, data);

Updated answer code block with above line

Question:

I've been trying to do some POC work for Spring Kafka. Specifically, I wanted to experiment with what are the best practices in terms of dealing with errors while consuming messages within Kafka.

I am wondering if anyone is able to help with:

  1. Sharing best practices surrounding what Kafka consumers should do when there is a failure
  2. Help me understand how AckMode Record works, and how to prevent commits to the Kafka offset queue when an exception is thrown in the listener method.

The code example for 2 is given below:

Given that AckMode is set to RECORD, which according to the documentation:

commit the offset when the listener returns after processing the record.

I would have thought the the offset would not be incremented if the listener method threw an exception. However, this was not the case when I tested it using the code/config/command combination below. The offset still gets updated, and the next message continues to be processed.

My config:

    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;
}

My code:

@Component
public class KafkaMessageListener{
    @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
    public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
            throw new RuntimeException("Oops!");
    }

Command to verify offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

I'm using kafka_2.12-0.10.2.0 and org.springframework.kafka:spring-kafka:1.1.3.RELEASE


Answer:

The container (via ContainerProperties) has a property, ackOnError which is true by default...

/**
 * Set whether or not the container should commit offsets (ack messages) where the
 * listener throws exceptions. This works in conjunction with {@link #ackMode} and is
 * effective only when the kafka property {@code enable.auto.commit} is {@code false};
 * it is not applicable to manual ack modes. When this property is set to {@code true}
 * (the default), all messages handled will have their offset committed. When set to
 * {@code false}, offsets will be committed only for successfully handled messages.
 * Manual acks will be always be applied. Bear in mind that, if the next message is
 * successfully handled, its offset will be committed, effectively committing the
 * offset of the failed message anyway, so this option has limited applicability.
 * Perhaps useful for a component that starts throwing exceptions consistently;
 * allowing it to resume when restarted from the last successfully processed message.
 * @param ackOnError whether the container should acknowledge messages that throw
 * exceptions.
 */
public void setAckOnError(boolean ackOnError) {
    this.ackOnError = ackOnError;
}

Bear in mind, though, that if the next message is successful, its offset will be committed anyway, which effectively commits the failed offset too.

EDIT

Starting with version 2.3, ackOnError is now false by default.

Question:

I am trying to setup Kafka Consumer using SpringBoot(1.5.9) and Spring-kafka(2.1.0). However when I start my app I get java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V on Kafka MessagingMessageListenerAdapter. I tried with Spring-Kafka(1.2.0) and that error went away. Has anyone else experienced this version incompatibility?

Here is my config class

@EnableKafka
@Configuration
public class ImporterConfigs{

static Logger logger = Logger.getLogger(ImporterConfigs.class);

@Value("${kafka.bootstrap-servers}")
private static String bootstrapServers;

@Bean
public Map<String, Object> consumerKafkaConfigs() {
  Map<String, Object> props = new HashMap<>();
  // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  // allows a pool of processes to divide the work of consuming and processing records
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");

  return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
  return new DefaultKafkaConsumerFactory<>(consumerKafkaConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());


  return factory;
}

}

Here is the full stacktrace:

java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:396) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.<init>(MessagingMessageListenerAdapter.java:100) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.<init>(RecordMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListenerInstance(MethodKafkaListenerEndpoint.java:172) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:132) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:338) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:323) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:227) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:49) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:229) ~[spring-kafka-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
at com.vitechinc.springbootapplication.V3MessageImporterApplication.main(V3MessageImporterApplication.java:25) [classes/:na]

Answer:

The Spring-kafka 2.1 is based on the Spring Framework 5.0 and that is exactly what you see with that error:

java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)

Spring Framework 4.3 (the foundation for Spring Boot 1.5) doesn't support Java 8 yet.

You should consider to switch to Spring Boot 2.0 or stick with the Spring Kafka 1.3.2 which is compatible with Boot 1.5 background and can be reconfigured for Apache Kafka 1.0 Client.

Question:

I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.

The definition of my listener is very basic.

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

Also the test, that verifies the latch counter to be equal to zero after receiving a message, is very easy.

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbedded to test a method marked with the annotation @KafkaListener?

All the code is shared in my GitHub repository kafka-listener.

Thanks to all.


Answer:

You are probably sending the message before the consumer has been assigned the topic/partition. Set property...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...it defaults to latest.

This is like using --from-beginning with the console consumer.

EDIT

Oh; you're not using boot's properties.

Add

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

BTW, you should probably also do a get(10L, TimeUnit.SECONDS) on the result of the template.send() (a Future<>) to assert that the send was successful.

EDIT3

To override the offset reset just for the test, you can do the same as what you did for the broker addresses:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

and

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.

Also, I would recommend setting enable.auto.commit to false so that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.

Question:

I am using this docker-compose setup for setting up Kafka locally: https://github.com/wurstmeister/kafka-docker/

docker-compose up works fine, creating topics via shell works fine.

Now I try to connect to Kafka via spring-kafka:2.1.0.RELEASE

When starting up the Spring application it prints the correct version of Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

I try to send a message like this

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

Sending on client side fails with

UnknownServerException: The server experienced an unexpected error when processing the request

In the server console I get the message Magic v1 does not support record headers

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Googling suggests a version conflict, but the version seem to fit (org.apache.kafka:kafka-clients:1.0.0 is in the classpath).

Any clues? Thanks!

Edit: I narrowed down the source of the problem. Sending plain Strings works, but sending Json via JsonSerializer results in the given problem. Here is the content of my producer config:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

Answer:

I had a similar issue. Kafka adds headers by default if we use JsonSerializer or JsonSerde for values. In order to prevent this issue, we need to disable adding info headers.

if you are fine with default json serialization, then use the following (key point here is ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

but if you need a custom JsonSerializer with specific ObjectMapper (like with PropertyNamingStrategy.SNAKE_CASE), you should disable adding info headers explicitly on JsonSerializer, as spring kafka ignores DefaultKafkaProducerFactory's property ADD_TYPE_INFO_HEADERS (as for me it's a bad design of spring kafka)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

or if we use JsonSerde, then:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);

Question:

Hey guys I want to work with Kafka Streams real time processing in my spring boot project. So I need Kafka Streams configuration or I want to use KStreams or KTable, but I could not find example on the internet.

I did producer and consumer now I want to stream real time.


Answer:

Let me start by saying that if you are new to Kafka streams, adding spring-boot on top of it is adding another level of complexity, and Kafka streams has a big learning curve as is. Here are the basics to get you going: pom:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.1.10.RELEASE</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>${kafka.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>${kafka.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>${kafka.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>connect-api</artifactId>
  <version>${kafka.version}</version>
</dependency>

Now the configuration object. The code below assumes you are creating two stream apps, and keep in mind that each app represents its own processing topology:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaStreamConfig {

  @Value("${delivery-stats.stream.threads:1}")
  private int threads;

  @Value("${delivery-stats.kafka.replication-factor:1}")
  private int replicationFactor;

  @Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
  private String brokersUrl;


  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public StreamsConfig kStreamsConfigs() {
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
    setDefaults(config);
    return new StreamsConfig(config);
  }


  public void setDefaults(Map<String, Object> config) {
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
  }

  @Bean("app1StreamBuilder")
  public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    setDefaults(config);
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
    config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
    config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    return new StreamsBuilderFactoryBean(config);
  }

  @Bean("app2StreamBuilder")
  public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    setDefaults(config);
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
    config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
    config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    return new StreamsBuilderFactoryBean(config);
  }
}

Now comes the fun part, using the the streamsBuilder to build your app (app1 in this example).

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class App1 {
  @SuppressWarnings("unchecked")
  @Bean("app1StreamTopology")
  public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {

    final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
    toSquare.map((key, value) -> { // do something with each msg, square the values in our case
      return KeyValue.pair(key, value * value);
    }).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic

    return toSquare;
  }
}

Hope this helps.