Hot questions for Spring Cloud Stream

Top 10 Java Open Source / Spring / Spring Cloud Stream

Question:

Test Class:-

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { WebsocketSourceConfiguration.class,
        WebSocketSourceIntegrationTests.class }, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
                "websocket.path=/some_websocket_path", "websocket.allowedOrigins=*",
                "spring.cloud.stream.default-binder=kafka" })
public class WebSocketSourceIntegrationTests {

    private String port = "8080";

    @Test
    public void testWebSocketStreamSource() throws IOException, InterruptedException {
        StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
        ClientWebSocketContainer clientWebSocketContainer = new ClientWebSocketContainer(webSocketClient,
                "ws://localhost:" + port + "/some_websocket_path");
        clientWebSocketContainer.start();
        WebSocketSession session = clientWebSocketContainer.getSession(null);
        session.sendMessage(new TextMessage("foo"));
        System.out.println("Done****************************************************");
    }

}

I have seen same issue here but nothing helped me. May I know what I'm missing ?

I have spring-boot-starter-tomcat as compile time dependency in the dependency Hierarchy.


Answer:

This message says: You need to configure at least 1 ServletWebServerFactory bean in the ApplicationContext, so if you already have spring-boot-starter-tomcat you need to either autoconfigure that bean or to do it manually.

So, in the test there are only 2 configuration classes to load the applicationContext, these are = { WebsocketSourceConfiguration.class, WebSocketSourceIntegrationTests.class }, then at least in one of these classes there should be a @Bean method returning an instance of the desired ServletWebServerFactory.

* SOLUTION *

Make sure to load all the beans within your configuration class

WebsocketSourceConfiguration {
  @Bean 
  ServletWebServerFactory servletWebServerFactory(){
  return new TomcatServletWebServerFactory();
  }
}

OR also enable the AutoConfiguration to do a classpath scanning and auto-configuration of those beans.

@EnableAutoConfiguration
WebsocketSourceConfiguration

Can be done also at the Integration Test class.

@EnableAutoConfiguration
WebSocketSourceIntegrationTests

For more information check the SpringBootTest annotation documentation https://docs.spring.io/spring-boot/docs/current/api/org/springframework/boot/test/context/SpringBootTest.html

Question:

I've created using the RabbitMQ web-UI a topic exchange TX and bind to the exchange two queues TX.Q1 and TX.Q2, each binded with routing-keys rk1 and rk2 accordingly, and produced few messages to the exchange.

Now I want to create a consumer using Spring Cloud Stream that will take messages from Q1 only. I tried using configuration:

spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1

and the annotation @StreamListner(Sink.INPUT) for the method that consumes messages.

As result I can see that the consumer has created a queue (or binding) with the same name TX.Q1 but the Routing-Key of the new queue/bind is #. How can I configure via Spring Cloud Stream a consumer that will consume messages from the predifined queue (only that routed with rk1).


Answer:

So for now, the work-around that Garry Russell suggested has solved the issue for me.

I've used @RabbitListener instead of @StreamListenet this way: @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1").

As a result, the predefined queue TX.Q1 is bind with binding key : rk1 to the exchange TX.

Waiting for updates on the Spring Cloud Steream issue.

Question:

I am trying to create the simplest as possible hello world with Spring Cloud + Kafka Streams + Spring Boot 2.

I realize I miss basic concepts. Basically, I understand that:

1 - I need to define an outbound stream to write messages to a Kafka topic, and an inbound stream to read messages from a Kafka topic

public interface LoansStreams {

    String INPUT = "loans-in";
    String OUTPUT = "loans-out";

    @Input(INPUT)
    SubscribableChannel inboundLoans();

    @Output(OUTPUT)
    MessageChannel outboundLoans();

}

2 - configure Spring Cloud Stream to bind to my streams

@EnableBinding(LoansStreams.class)
public class StreamsConfig {
}

3 - configure Kafka properties

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        loans-in:
          destination: loans
          contentType: application/json
        loans-out:
          destination: loans
          contentType: application/json

4 - create model for exchange messages

@Getter @Setter @ToString @Builder
public class Loans {
    private long timestamp;
    private String result;
}

5 - write to Kafka

@Service
@Slf4j
public class LoansService {
    private final LoansStreams loansStreams;
    public LoansService(LoansStreams loansStreams) {
        this.loansStreams = loansStreams;
    }
    public void sendLoan(final Loans loans) {
        log.info("Sending loans {}", loans);
        MessageChannel messageChannel = loansStreams.outboundLoans();
        messageChannel.send(MessageBuilder
                .withPayload(loans)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

6 - listen to Kafka topic

@Component
@Slf4j
public class LoansListener {

    @StreamListener(LoansStreams.INPUT)
    public void handleLoans(@Payload Loans loans) {
        log.info("Received results: {}", loans);

    }
}

I spent a whole day reading few blogs and I assume that the above code is at least workable. I amo not sure I realy coding the best aproach as possible. By the way, I get the error mentioned in the topic:

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 18:33:05.619 ERROR 14784 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.

Googling for solution, I found someone saying to code StreamListe returning the model so I replaced it with:

@StreamListener(LoansStreams.INPUT)
@SendTo("loans-out")
public KStream<?, Loans> process(KStream<?, Loans> l) {
    log.info("Received: {}", l);
    return l;
}

and then I get an error even less clear at least to me (previous error clearly mentioned some binder issue):

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 19:01:06.016 ERROR 13276 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalArgumentException: Method must be declarative
        at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:510) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:168) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
        at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:226) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]

In case it helps somehow, I want to evoluate this idea to apply SAGAS but it is not the focus of this question. Firstly, I need get the basic up and running.

*edited

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.mybank</groupId>
    <artifactId>kafka-cloud-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-cloud-stream</name>
    <description>Spring Cloud Stream With Kafka</description>

    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <!-- version>5.1.5.RELEASE</version-->
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Answer:

"A default binder has been requested, but there are no binders available ...", please add dependency as below.

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

Question:

Sorry for the question being too generic, but someone has some tutorial or guide on how to perform producer and consumer testing with kafka embedded. I've tried several, but there are several versions of dependencies and none actually works =/

I'm using spring cloud stream kafka.


Answer:

We generally recommend using the Test Binder in tests but if you want to use an embedded kafka server, it can be done...

Add this to your POM...

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Test app...

@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {

    public static void main(String[] args) {
        SpringApplication.run(So43330544Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public byte[] handle(byte[] in){
        return new String(in).toUpperCase().getBytes();
    }

}

application.properties...

spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544

Test case...

@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

    @Autowired
    private KafkaTemplate<byte[], byte[]> template;

    @Autowired
    private KafkaProperties properties;

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void testSendReceive() {
        template.send("so0544in", "foo".getBytes());
        Map<String, Object> configs = properties.buildConsumerProperties();
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
        Consumer<byte[], byte[]> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton("so0544out"));
        ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
        consumer.commitSync();
        assertThat(records.count()).isEqualTo(1);
        assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
    }

}

Question:

I am using Spring Cloud Stream, with RabbitMQ binder. It works great with byte[] payload and Java native serialization, but I need to work with JSON payload.

Here's my processor class.

@EnableBinding(Processor.class)
public class MessageProcessor {
    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public OutputDto handleIncomingMessage(InputDto inputDto) {
        // Run some job.
        return new OutputDto();
    }
}

InputDto and OutputDto are POJOs with Jackson annotations.

  • How do I configure JSON conversion strategy?
  • How should message headers look like to be accepted and processed?

Answer:

In your consumer you can add a content type configuration, e.g.

spring.cloud.stream.bindings.input.content-type: application/x-java-object;type=my.package.InputDto

You could also add

spring.cloud.stream.bindings.output.content-type: application/json

to force the outgoing message payload to be JSON (for interop etc.).

Note that "input" and "output" are the binder channel names (i.e. as defined in Processor in your app).

I think there is a good chance this could be made easier or more automatic, but there is some engineering effort required to make that happen in Spring Cloud. There's an issue in github if you want to follow it: https://github.com/spring-cloud/spring-cloud-stream/issues/156.

To send a message manually to a Spring Cloud Stream you can set the headers up yourself manually (but it's easier to use a Stream). A JSON message looks like this in the Rabbit admin UI:

priority:   0
delivery_mode:  2
headers:    
    contentType:    text/plain
    originalContentType:    application/json
content_type:   text/plain