Hot questions for Spring Integration

Top 10 Java Open Source / Spring / Spring Integration

Question:

I am getting below exception

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

Configuration: RabbitMQ 3.3.5 on windows

On Config file in %APPDATA%\RabbitMQ\rabbit.config I have done below change as per https://www.rabbitmq.com/access-control.html

[{rabbit, [{loopback_users, []}]}].

I also tried creating a user/pwd - test/test doesn't seem to make it work.

Tried the Steps from this post.

Other Configuration Details are as below:

Tomcat hosted Spring Application Context:

<!-- Rabbit MQ configuration Start -->
    <!-- Connection Factory -->
    <rabbit:connection-factory id="rabbitConnFactory" virtual-host="/" username="guest" password="guest" port="5672"/>

    <!-- Spring AMQP Template -->
    <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnFactory" routing-key="ecl.down.queue" queue="ecl.down.queue" />

    <!-- Spring AMQP Admin -->
    <rabbit:admin id="admin" connection-factory="rabbitConnFactory"/>

    <rabbit:queue id="ecl.down.queue" name="ecl.down.queue" />

    <rabbit:direct-exchange name="ecl.down.exchange">
        <rabbit:bindings>
            <rabbit:binding key="ecl.down.key" queue="ecl.down.queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

In my Controller Class

@Autowired
RmqMessageSender rmqMessageSender;

//Inside a method
rmqMessageSender.submitToECLDown(orderInSession.getOrderNo());

In My Message sender:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component("messageSender")
public class RmqMessageSender  {

    @Autowired
    AmqpTemplate                rabbitTemplate;

    public void submitToRMQ(String orderId){
        try{
            rabbitTemplate.convertAndSend("Hello World");
        } catch (Exception e){
            LOGGER.error(e.getMessage());
        }
    }       
}

Above exception Block gives below Exception


org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.


Error Log

  =ERROR REPORT==== 7-Nov-2014::18:04:37 ===
closing AMQP connection <0.489.0> (10.1.XX.2XX:52298 -> 10.1.XX.2XX:5672):
    {handshake_error,starting,0,
                     {amqp_error,access_refused,
                                 "PLAIN login refused: user 'guest' can only connect via localhost",
                                 'connection.start_ok'}}

Pls find below the pom.xml entry

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.3.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
            <version>4.0.4.RELEASE</version>
        </dependency>

Please let me know if you have any thoughts/suggestions


Answer:

I am sure what Artem Bilan has explained here might be one of the reasons for this error:

Caused by: com.rabbitmq.client.AuthenticationFailureException: 
ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. 
For details see the

but the solution for me was that I logged in to rabbitMQ admin page (http://localhost:15672/#/users) with the default user name and password which is guest/guest then added a new user and for that new user I enabled the permission to access it from virtual host and then used the new user name and password instead of default guest and that cleared the error.

Question:

I have a web application that has a Spring Integration logic running with it in a separated thread. The problem is that at some point my Spring Integration logic tries to use a request scoped bean and then i get the following errors:

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.tenantContext': Scope 'request' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.


Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.

I have the ContextLoaderListener set:

<listener>
    <listener-class>org.springframework.web.context.request.RequestContextListener</listener-class>
</listener>

My Scoped Bean is annotated like that(since I heard that proxing my bean would help):

@Component
@Scope(value = "request", proxyMode = ScopedProxyMode.TARGET_CLASS)  
public class TenantContext  implements Serializable {

Is what I'm doing possible? If yes, what am I missing here? If no, any other suggestions on how I can achieve that?


Answer:

For Spring 4 Frameworks add servletContext.addListener(new RequestContextListener());
public class WebApplicationInitializer extends AbstractAnnotationConfigDispatcherServletInitializer {
    @Override
    protected Class<?>[] getRootConfigClasses() {
        return new Class[] { RootConfiguration.class };
    }

    @Override
    protected Class<?>[] getServletConfigClasses() {
        return new Class[] { WebMvcConfiguration.class };
    }

    @Override
    protected String[] getServletMappings() {
        return new String[] { "/" };
    }

    @Override
    protected Filter[] getServletFilters() {
        return new Filter[] { new HiddenHttpMethodFilter() };
    }

    **@Override
    public void onStartup(ServletContext servletContext) throws ServletException {
        super.onStartup(servletContext);
        servletContext.addListener(new RequestContextListener());
    }**
}

Question:

I have been through this and understood that I need to create a TcpReceivingChannelAdapter to accept connections. But I don't know how to proceed with that.

Could someone guide me over this?


Answer:

See the tcp-client-server sample for some pointers using XML configuration.

For Java configuration; here's a simple Spring Boot app...

package com.example;

import java.net.Socket;

import javax.net.SocketFactory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.integration.transformer.ObjectToStringTransformer;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class So39290834Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39290834Application.class, args);
        Socket socket = SocketFactory.getDefault().createSocket("localhost", 9999);
        socket.getOutputStream().write("foo\r\n".getBytes());
        socket.close();
        Thread.sleep(1000);
        context.close();
    }

    @Bean
    public TcpNetServerConnectionFactory cf() {
        return new TcpNetServerConnectionFactory(9999);
    }

    @Bean
    public TcpReceivingChannelAdapter inbound(AbstractServerConnectionFactory cf) {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(cf);
        adapter.setOutputChannel(tcpIn());
        return adapter;
    }

    @Bean
    public MessageChannel tcpIn() {
        return new DirectChannel();
    }

    @Transformer(inputChannel = "tcpIn", outputChannel = "serviceChannel")
    @Bean
    public ObjectToStringTransformer transformer() {
        return new ObjectToStringTransformer();
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    public void service(String in) {
        System.out.println(in);
    }

}

Question:

What is the fundamental difference between inbound and outbound channel adapters?

Any examples would be very helpful.

I have reviewed the Spring docs and this "directional" distinction is not clear to me. I support an application that has an outbound-channel-adapter configured, but I find the behavior counter intuitive with the outbound label. This adapter gets an external file, then brings it in to the application where we parse the file and persist the data.

This is similar to this question, but I wanted to focus more generally on channel adapters, and hopefully get more feedback!

Thanks!


Answer:

Channel adapters are for one-way integration (gateways are bidirectional).

Concretely, inbound adapters are at the beginning of a flow, outbound adapters terminate a flow. Flows are typically rendered (and conceptually thought of as flowing from left to right)...

inbound-c-a->someComponent->someOtherComponent->outbound-ca

(where -> represents a channel).

There are two types of inbound channel adapters:

  • MessageProducers
  • MessageSources

MessageProducers are termed "message-driven" i.e. they unilaterally produce messages in a completely asynchronous manner, as soon as they are started; examples are JMS message-driven adapter, TCP inbound channel adapter, IMAP Idle (mail) channel adapter, etc.

MessageSources on the other hand are polled - a poller with some trigger causes the framework to ask the source for a message; the trigger can be on a fixed rate, cron expression etc. Examples are the (S)FTP adapters, Mail inbound adapter (POP3. IMAP).

Examples of outbound adapters are Mail outbound adapter (SMTP).

Gateways are two-way (request/reply).

Inbound gateways are where some external system sends a request and Spring Integration replies.

Outbound gateways are where Spring Integration makes the request and some external system replies.

I hope that clears things up.

Question:

I'm trying to create a server in Spring that's listening on a TCP port and accepts connections. I know how to route incoming requests to my service, and it can respond to those. However I would like to send messages to certain clients without any request received. For example, sometimes I have to inform a client about that it has got a message.

To do this, I think I need a way to identify the clients, e.g. by letting them log in. Is there a way to have a "session" object for each active connection in which I can store login data?

How could I send a message to a client which has logged in with username X?

Is this possible in Spring at all?


Answer:

Starting with version 3.0; the frameworks now emits connection events when there are connection state changes. You can capture these events using an ApplicationListener, or using an <event:inbound-channel-adapter/>.

The TcpConnectionOpenEvent contains a connectionId; you can send arbitrary messages to any connection once you know its id, by populating the IpHeaders.connectionId header (ip_connectionId) in a message and sending it to a <tcp:outbound-channel-adapter/>.

If you need to support request/reply as well as sending arbitrary messages, you need to use a collaborating pair of channel adapters for all communication, not a gateway.

EDIT

Here's a simple Boot app...

package com.example;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.Socket;

import javax.net.SocketFactory;

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpServerConnectionFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class So25102101Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(So25102101Application.class)
                .web(false)
                .run(args);
        int port = context.getBean(TcpServerConnectionFactory.class).getPort();
        Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
        BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        String line = reader.readLine();
        System.out.println(line);
        context.close();
    }

    @Bean
    public TcpReceivingChannelAdapter server(TcpNetServerConnectionFactory cf) {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(cf);
        adapter.setOutputChannel(inputChannel());
        return adapter;
    }

    @Bean
    public MessageChannel inputChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    public TcpNetServerConnectionFactory cf() {
        return new TcpNetServerConnectionFactory(0);
    }

    @Bean
    public IntegrationFlow outbound() {
        return IntegrationFlows.from(outputChannel())
                .handle(sender())
                .get();
    }

    @Bean
    public MessageHandler sender() {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(cf());
        return tcpSendingMessageHandler;
    }

    @Bean
    public ApplicationListener<TcpConnectionOpenEvent> listener() {
        return new ApplicationListener<TcpConnectionOpenEvent>() {

            @Override
            public void onApplicationEvent(TcpConnectionOpenEvent event) {
                outputChannel().send(MessageBuilder.withPayload("foo")
                        .setHeader(IpHeaders.CONNECTION_ID, event.getConnectionId())
                        .build());
            }

        };
    }

}

pom deps:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-ip</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Question:

I am new with spring integration and working in spring integration http module for my project requirement. I am sending request from outbound gateway as a http client. I am trying to initiate a request to the server and server should return me the message payload with my set values. I am converting object to JSON using to send to server I am sending a request to inbound gateway present on the server side from client(HttpClientDemo) shown below. For that purpose, I am converting my object into the JSON and then converting to JSON string to object on the client side, performing some simple operation there and sending it back to the client(HttpClientDemo) but before this, I am getting exception related to HttpMessageConverter as below:

Exception in thread "main" org.springframework.web.client.RestClientException: Could not extract response: no suitable HttpMessageConverter found for response type [class com.mycompany.MyChannel.model.FFSampleResponseHttp] and content type [text/plain;charset=UTF-8]
    at org.springframework.web.client.HttpMessageConverterExtractor.extractData(HttpMessageConverterExtractor.java:108)
    at org.springframework.web.client.RestTemplate$ResponseEntityResponseExtractor.extractData(RestTemplate.java:784)
    at org.springframework.web.client.RestTemplate$ResponseEntityResponseExtractor.extractData(RestTemplate.java:769)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:549)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:517)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:462)
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.handleRequestMessage(HttpRequestExecutingMessageHandler.java:421)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:170)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribablMyChannel.doSend(AbstractSubscribablMyChannel.java:77)
    at org.springframework.integration.channel.AbstractMessagMyChannel.send(AbstractMessagMyChannel.java:255)
    at org.springframework.integration.channel.AbstractMessagMyChannel.send(AbstractMessagMyChannel.java:223)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:93)

Please find Below related code :

Client side code: HttpClientDemo.java

public class HttpClientDemo {

    private static Logger logger = Logger.getLogger(HttpClientDemo.class);
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("/META-INF/spring/integration/http-outbound-config.xml");
RequestGateway requestGateway = context.getBean("requestGateway", RequestGateway.class);        
        FFSampleRequestHttp FFSampleRequesthttp = new FFSampleRequestHttp();
        FFSampleRequesthttp.setMyChannelID("1");
        FFSampleRequesthttp.setMyNumber("88");
        FFSampleRequesthttp.setReferenceID("9I");
        FFSampleRequesthttp.setTemplateType(1);
        FFSampleRequesthttp.setTimestamp("today");
        FFSampleResponseHttp  reply = requestGateway.FFSampleResponsegatway(FFSampleRequesthttp);
            logger.info("Replied with: " + reply);
    }

}

My Request Gateway is as follows: RequestGateway.java

public interface RequestGateway {


    FFSampleResponseHttp FFSampleResponsegatway(FFSampleRequestHttp request);

}

http-outbound-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-http="http://www.springframework.org/schema/integration/http"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http.xsd">

    <int:gateway id="requestGateway" 
                 service-interface="com.mycompany.MyChannel.Common.RequestGateway"
                 default-request-channel="requestChannel"/>

    <int:channel id="requestChannel"/>
    <int:channel id="requestChannel1"/>


<!--    com.mycompany.MyChannel.model.FFSampleResponseHttp -->

    <int-http:outbound-gateway request-channel="requestChannel1" reply-channel="replyChannel1" url="http://localhost:8080/MyChannel_prj-1.0.0.BUILD-SNAPSHOT/receiveGateway"  http-method="POST"  extract-request-payload="true" expected-response-type="com.mycompany.MyChannel.model.FFSampleResponseHttp"/>

   <int:object-to-json-transformer  input-channel="requestChannel" output-channel="requestChannel1" content-type="application/json"  result-type="STRING"/>



   <bean id="FFSampleRequestHttp" class="com.mycompany.MyChannel.model.FFSampleRequestHttp"></bean>


</beans>

Web.xml

<?xml version="1.0" encoding="ISO-8859-1" standalone="no"?>
<web-app xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="2.4" xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">


<servlet>
    <servlet-name>MyChannel-http</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>/WEB-INF/servlet-config.xml</param-value>
    </init-param>
    <load-on-startup>2</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>MyChannel-http</servlet-name>
    <url-pattern>/receiveGateway</url-pattern>
   </servlet-mapping>


    <welcome-file-list>
        <welcome-file>index.html</welcome-file>
    </welcome-file-list>

</web-app>

servlet-config.xml

    <int:channel id="receivMyChannel"/>

<int-http:inbound-gateway request-channel="receivMyChannel" path="/receiveGateway" supported-methods="POST"/>

    <int:service-activator input-channel="receivMyChannel">
        <bean class="com.mycompany.MyChannel.serviceImpl.FFSampleHttpImpl">
        <constructor-arg ref = "FFSampleRequestHttp"></constructor-arg>
        </bean>
        </int:service-activator>


     <bean id="FFSampleRequestHttp" class="com.mycompany.MyChannel.model.FFSampleRequestHttp"></bean>
     <bean id="FFSampleResponseHttp" class="com.mycompany.MyChannel.model.FFSampleResponseHttp"></bean>

    </beans> 



public class FFSampleHttpImpl{

    private static org.apache.log4j.Logger log = Logger
            .getLogger(FFSampleImpl.class);

    @Autowired
    FFSampleRequestHttp request;
    public FFSampleHttpImpl() {
    }

    public FFSampleHttpImpl(FFSampleRequestHttp request) {
        super();
        this.request = request;
    }





    public String issueResponseFor(String str) throws JsonParseException, JsonMappingException, IOException {

        ObjectMapper mapper = new ObjectMapper();

        FFSampleRequestHttp FFSampleRequestHttp = mapper.readValue(new String(str), FFSampleRequestHttp.class);

        FFSampleRequestHttp.setReferenceID("Hi My Number");

        String  strs = new String();

        strs = mapper.writeValueAsString(FFSampleRequestHttp);

            return strs;

        }

}

FFSampleRequestHttp.java

public class FFSampleRequestHttp {
    protected String MyNumber;  
    protected String referenceID;   
    protected String myChannelID;
    protected String timestamp;
    protected int templateType;
    public String getMyNumber() {
        return MyNumber;
    }
    public void setMyNumber(String MyNumber) {
        this.MyNumber = MyNumber;
    }
    public String getReferenceID() {
        return referenceID;
    }
    public void setReferenceID(String referenceID) {
        this.referenceID = referenceID;
    }
    public String getMyChannelID() {
        return myChannelID;
    }
    public void setMyChannelID(String myChannelID) {
        this.myChannelID = myChannelID;
    }
    public String getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
    public int getTemplateType() {
        return templateType;
    }
    public void setTemplateType(int templateType) {
        this.templateType = templateType;
    }
    }

FFSampleResponseHttp.java

public class FFSampleResponseHttp {
    protected String MyNumber;
    protected String referenceID;
    protected String myChannelID;
    protected String timestamp;
    protected int templateType;

    public String getMyNumber() {
        return MyNumber;
    }
    public void setMyNumber(String MyNumber) {
        this.MyNumber = MyNumber;
    }
    public String getReferenceID() {
        return referenceID;
    }
    public void setReferenceID(String referenceID) {
        this.referenceID = referenceID;
    }
    public String getMyChannelID() {
        return myChannelID;
    }
    public void setMyChannelID(String myChannelID) {
        this.myChannelID = myChannelID;
    }
    public String getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
    public int getTemplateType() {
        return templateType;
    }
    public void setTemplateType(int templateType) {
        this.templateType = templateType;
    }
    }

When I run the above code I get following error:

16:55:46.843 [main] DEBUG o.s.web.client.RestTemplate - Writing [{"MyNumber":"88","referenceID":"9I","myChannelID":"1","timestamp":"today","templateType":1}] as "text/plain;charset=UTF-8" using [org.springframework.http.converter.StringHttpMessageConverter@7d31a3e2]
16:55:46.988 [main] DEBUG o.s.web.client.RestTemplate - POST request for "http://localhost:8080/MyChannel_prj-1.0.0.BUILD-SNAPSHOT/receiveGateway" resulted in 200 (OK)
Exception in thread "main" org.springframework.web.client.RestClientException: Could not extract response: no suitable HttpMessageConverter found for response type [class com.mycompany.MyChannel.model.FFSampleResponseHttp] and content type [text/plain;charset=UTF-8]
    at org.springframework.web.client.HttpMessageConverterExtractor. 

I have used spring integration basic sample code for reference. Please provide your input. I also tried by using the spring object mapper in the configuration files with JSON to object transformer but then also I am getting similer issues for HttpMessageConverter. Please help me with your valuable inputs/suggestion and let me know if we have any limitation with spring integration http object mapper.


Hi Artem, Thanks for your reply. I am still facing some challenges mentioned below. I have done the changes in my configuration files as per of your suggestion. but facing issue when using Jackson2JsonObjectMapper and need your further help. Please find below issue description.

I have done changes in my files and now files are like below: My Servlet-Config.xml file content is as below:

<int:channel id="channel1" /> 
<int:channel id="channel2" /> 
<int:channel id="channel3" /> 
<int-http:inbound-gateway request-channel="channel1" supported-methods="POST" path="/receiveGateway" /> 
- <int:service-activator input-channel="channel2"> 
- <bean class="com.myCompany.myChannel.serviceImpl.FFSampleHttpImpl"> 
<constructor-arg ref="ffSampleRequestHttp" /> 
</bean> 
</int:service-activator> 

<int:json-to-object-transformer input-channel="channel1" output-channel="channel2" type="com.myCompany.myChannel.model.FFSampleRequestHttp" object-mapper="jackson2JsonObjectMapper" /> 

<bean id="jackson2JsonObjectMapper" class="org.springframework.integration.support.json.Jackson2JsonObjectMapper" /> 
<bean id="ffSampleRequestHttp" class="com.myCompany.myChannel.model.FFSampleRequestHttp" /> 
<bean id="ffSampleResponseHttp" class="com.myCompany.myChannel.model.FFSampleResponseHttp" /> 
</beans>

Out bound file config(file which is responsible to sent message to server):

<int:gateway id="requestGateway" service-interface="com.myCompany.myChannel.Common.RequestGateway" default-request-channel="requestChannel" /> 
  <int:channel id="requestChannel" /> 
  <int:channel id="requestChannel1" /> 
  <int:object-to-json-transformer input-channel="requestChannel" output-channel="requestChannel1" content-type="application/json" /> 
  <int-http:outbound-gateway request-channel="requestChannel1" reply-channel="channel4" url="http://localhost:8080/myChannel_prj-1.0.0.BUILD-SNAPSHOT/http/receiveGateway" http-method="POST" /> 
  <bean id="FFSampleRequestHttp" class="com.myCompany.myChannel.model.FFSampleRequestHttp" /> 
  <int:json-to-object-transformer input-channel="channel4" output-channel="requestChannel" type="com.myCompany.myChannel.model.FFSampleResponseHttp" object-mapper="jackson2JsonObjectMapper" /> 
  <bean id="jackson2JsonObjectMapper" class="org.springframework.integration.support.json.Jackson2JsonObjectMapper" /> 
  </beans>

My impl class method is as below:

public  FfSampleResponseHttp issueResponseFor(FfSampleRequestHttp request) {

        FfSampleResponseHttp ffSampleResponse2 = new FfSampleResponseHttp();

        ffSampleResponse2.setCifNumber("Yappi I am in the method");
        log.info("issueResponseFor(FfSampleRequesthttp request)");

        return ffSampleResponse2;

    }

I am able to call my service method issueResponseFor present in server side from the client but when this is processing further:

Caused by: java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader]
       at org.springframework.integration.support.json.Jackson2JsonObjectMapper.fromJson(Jackson2JsonObjectMapper.java:93)
       at org.springframework.integration.support.json.Jackson2JsonObjectMapper.fromJson(Jackson2JsonObjectMapper.java:44)
       at org.springframework.integration.support.json.AbstractJacksonJsonObjectMapper.fromJson(AbstractJacksonJsonObjectMapper.java:55)
       at org.springframework.integration.json.JsonToObjectTransformer.doTransform(JsonToObjectTransformer.java:78)
       at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33)
       ... 54 more

I have verified while debugging that the payload body while in response is coming blank in json object in the parameter of Jackson2JsonObjectMapper.fromJson(…) after roaming through my service method successfully. I am not able to understand where am I doing the mistake. Please provide your help/input. Again let me know if I am again missing something in my config files. Thank you very much for your support.


Answer:

As Artem Bilan said, this problem occures because MappingJackson2HttpMessageConverter supports response with application/json content-type only. If you can't change server code, but can change client code(I had such case), you can change content-type header with interceptor:

restTemplate.getInterceptors().add((request, body, execution) -> {
            ClientHttpResponse response = execution.execute(request,body);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
            return response;
        });

Question:

I did some searching but couldn't find any sample/example.

I've a requirement where geo coordinates from one table (input) are read, processed to generate POI's associated to coordinate. So one geo coordinate will result in one or more POI's that needs to be inserted into another table (output).

I'm currently using a JdbcCursorItemReader and JdbcBatchItemWriter to read one item/record and write one item/record. There is also an ItemProcessor that generates the POI's for a give geo coordinate.

Does a custom JdbcBatchItemWriter help me achieve this?

Any ideas? TIA.


Answer:

What you are really looking for is called a Splitter pattern:

Here is how it is defined in Spring Integration:

A Splitter is a type of Message Endpoint whose responsibility is to accept a Message from its input channel, split that Message into multiple Messages, and then send each of those to its output channel. This is typically used for dividing a "composite" payload object into a group of Messages containing the sub-divided payloads.

Configuration is extremely simple:

<channel id="inputChannel"/>

<splitter id="splitter" 
  ref="splitterBean" 
  method="split" 
  input-channel="inputChannel" 
  output-channel="outputChannel" />

<channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>

Or you can use annotations:

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}

You can of course write your own JdbcBatchItemWriter if it feels simpler. However Spring Integration already does it for you.

You can use Spring Integration JDBC Support => jdbc:inbound-channel-adapter / jdbc:outbound-channel-adapter and the above splitter to achieve what you want and.. simplicity.

Question:

According to the documentation provided here, I am trying on a POC to get messages into a listener as mentioned in the the same documentation, Below is how I have written the configuration.

@Configuration
public class KafkaConsumerConfig {

    public static final String TEST_TOPIC_ID = "record-stream";

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
    private String topic;

    @Value("${kafka.address:localhost:9092}")
    private String brokerAddress;


    /*
      @Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
      KafkaMessageListenerContainer<String, String> container) {
      KafkaMessageDrivenChannelAdapter<String, String>
      kafkaMessageDrivenChannelAdapter = new
      KafkaMessageDrivenChannelAdapter<>( container, ListenerMode.record);
      kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return
      kafkaMessageDrivenChannelAdapter; }

      @Bean public QueueChannel received() { return new QueueChannel(); }
     */

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(30000);
        return factory;

    }

    /*
     * @Bean public KafkaMessageListenerContainer<String, String> container()
     * throws Exception { ContainerProperties properties = new
     * ContainerProperties(this.topic); // set more properties return new
     * KafkaMessageListenerContainer<>(consumerFactory(), properties); }
     */

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        // props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest
                                                                        // smallest
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

}

and Listener is as below,

@Service
public class Listener {

    private Logger log = Logger.getLogger(Listener.class);


    @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory")
    public void process(String message/* , Acknowledgment ack */) {
        Gson gson = new Gson();
        Record record = gson.fromJson(message, Record.class);

        log.info(record.getId() + " " + record.getName());
        // ack.acknowledge();
    }

}

Even though I am producing messages to the same topic and this consumer is working on the same topic, Listener is not executing.

I am running Kafka 0.10.0.1, and here is my current pom. This consumer is working as a spring boot web application unlike many command line samples.

   <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>

        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>

        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

I have spent a good amount of time to figure out why this listener is not getting hit when the topic has messages, what is it I am doing wrong.

I know that I can receive the messages using a channel (I have commented configuration part of that out in the code), But here the concurrency is handle clean.

Is this kind of implementation is possible with a async message consumption.


Answer:

You have to add @EnableKafka alongside with the @Configuration.

Will add some description soon.

Meanwhile:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

Question:

I am experimenting with the Spring Reactor 3 components and Spring Integration to create a reactive stream (Flux) from a JMS queue.

I am attempting to create a reactive stream (Spring Reactor 3 Flux) from a JMS queue (ActiveMQ using Spring Integration) for clients to get the JMS messages asynchronously. I believe that I have everything hooked up correctly but the client does not receive any of the JMS messages until the server is stopped. Then all of the messages get "pushed" to the client a once.

Any help would be appreciated.

Here is the configuration file that I am using to configure the JMS, Integration components and the reactive publisher:

@Configuration
@EnableJms
@EnableIntegration
public class JmsConfiguration {

    @Value("${spring.activemq.broker-url:tcp://localhost:61616}")
    private String defaultBrokerUrl;

    @Value("${queues.patient:patient}")
    private String patientQueue;

    @Autowired
    MessageListenerAdapter messageListenerAdapter;

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsConnectionFactory());
        return factory;
    }

    @Bean
    public Queue patientQueue() {
        return new ActiveMQQueue(patientQueue);

    }

    @Bean
    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(defaultBrokerUrl);
        connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero"));
        return connectionFactory;
    }

    // Set the jackson message converter
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(jmsConnectionFactory());
        template.setDefaultDestinationName(patientQueue);
        template.setMessageConverter(jacksonJmsMessageConverter());
        return template;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setMessageConverter(jacksonJmsMessageConverter());
        return messageListenerAdapter;
    }

    @Bean
    public AbstractMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setMessageConverter(jacksonJmsMessageConverter());
        defaultMessageListenerContainer.setConnectionFactory(jmsConnectionFactory());
        defaultMessageListenerContainer.setDestinationName(patientQueue);
        defaultMessageListenerContainer.setMessageListener(messageListenerAdapter());
        defaultMessageListenerContainer.setCacheLevel(100);
        defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() {
            @Override
            public void handleError(Throwable t) {
                t.printStackTrace();
            }
        });

        return defaultMessageListenerContainer;
    }

    @Bean // Serialize message content to json using TextMessage
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }


    @Bean
    public MessageChannel jmsOutboundInboundReplyChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    public Publisher<Message<String>> pollableReactiveFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(messageListenerContainer()).get())
                .channel(MessageChannels.queue())
                .log(LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

    @Bean
    public MessageChannel jmsChannel() {
        return new DirectChannel();
    }

The controller that creates the Flux is:

@RestController
@RequestMapping("patients")
public class PatientChangePushController {
    private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now();
    private int durationInSeconds = 30;
    private Patient patient;
    AtomicReference<SignalType> checkFinally = new AtomicReference<>();

    @Autowired
    PatientService patientService;

    @Autowired
    @Qualifier("pollableReactiveFlow")
    private
    Publisher<Message<String>> pollableReactiveFlow;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Queue patientQueue;

    /**
     * Subscribe to a Flux of a patient that has been updated.
     *
     * @param id
     * @return
     */
    @GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Message<String>> getPatientAlerts(@PathVariable Long id) {

        Flux<Message<String>> messageFlux = Flux.from(pollableReactiveFlow);
        return messageFlux;
    }

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (long i = 0L; i < 100; i++) {
            Patient patient = new Patient();
            patient.setId(i);
            send(patient);
            System.out.println("Message was sent to the Queue");
        }

    }

    void send(Patient patient) {
        this.jmsTemplate.convertAndSend(this.patientQueue, patient);
    }

}

If anyone can tell me why the messages do not get sent to the client until after the server is killed, I would appreciate it.


Answer:

Works well for me:

@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
    }

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Bean
    public Publisher<Message<String>> jmsReactiveSource() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                        .destination("testQueue"))
                .channel(MessageChannels.queue())
                .log(LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getPatientAlerts() {
        return Flux.from(jmsReactiveSource())
                .map(Message::getPayload);
    }

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (int i = 0; i < 100; i++) {
            this.jmsTemplate.convertAndSend("testQueue", "testMessage #" + (i + 1));
        }
    }

}

In one terminal I have curl http://localhost:8080/events which waits for SSEs from that Flux.

In other terminal I perform curl http://localhost:8080/generate and see in the first one:

data:testMessage #1

data:testMessage #2

data:testMessage #3

data:testMessage #4

I use Spring Boot 2.0.0.BUILD-SNAPSHOT.

Also see here: https://spring.io/blog/2017/03/08/spring-tips-server-sent-events-sse

Question:

I am getting an exception Dispatcher has no subscribers on the outboundChannel and can't figure out why. I am sure its something simple, I have stripped back my code to a very simple sample below:

My context is:

<bean id="requestService"
    class="com.sandpit.RequestService" />

<integration:channel id="inboundChannel" />

<integration:service-activator id="service"
    input-channel="inboundChannel"
    output-channel="outboundChannel"
    ref="requestService"
    method="handleRequest" />

<integration:channel id="outboundChannel" />

<integration:gateway id="gateway"
    service-interface="com.sandpit.Gateway"
    default-request-channel="inboundChannel"
    default-reply-channel="outboundChannel" />

<bean class="com.sandpit.GatewayTester">
    <property name="gateway"
        ref="gateway" />
</bean>

My Java code is:

public interface Gateway {

    String receive();
    void send(String message);
}

public class RequestService {

    public String handleRequest(String request) {

        return "Request received: " + request;
    }
}

public class GatewayTester implements ApplicationListener<ContextRefreshedEvent> {

    private Gateway gateway;

    public void setGateway(Gateway gateway) {

        this.gateway = gateway;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {

        gateway.send("Hello world!");
        System.out.println("FROM SERVICE: " + gateway.receive());
    }
}

Note: A breakpoint does tell me that the RequestService is actually handling the request.


Answer:

receive() with no args needs the reply channel to be a PollableChannel See the documentation.

add <queue/> to the outboundChannel.

Alternatively, You could change your gateway method to be String sendAndReceive(String in) and all will work as expected (and you can even remove the outboundChannel altogether).

Question:

I'd like use Spring Integration to implement a content based router that uses a default output channel if the expression value doesn't match any of the mappings. Here's my bean definition:

<int:router input-channel="channel_in" default-output-channel="channel_default" expression="payload.name">
    <int:mapping value="foo" channel="channel_one" />
    <int:mapping value="bar" channel="channel_two" />

However, it seems the default output channel is never used. If the expression evaluates to e.g. 'baz', the router seems to be looking for a channel named 'baz', instead of routing to the 'channel_default' channel:

org.springframework.integration.MessagingException: failed to resolve channel name 'baz'
  Caused by: org.springframework.integration.support.channel.ChannelResolutionException: 
    failed to look up MessageChannel bean with name 'baz'
  Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: 
    No bean named 'baz' is defined

Is what I want at all possible using the XML namespace, or do I need to code up my own implementation?


Answer:

Turns out that all I had to to to make this work was to set the router's ignore-channel-name-resolution-failures attribute to false:

<int:router input-channel="channel_in" default-output-channel="channel_default" 
  expression="payload.name" ignore-channel-name-resolution-failures="true">
    <int:mapping value="foo" channel="channel_one" />
    <int:mapping value="bar" channel="channel_two" />
</int:router>

I thought I had tried that before, but I seems I didn't.

Question:

Is there anyway to manually start/init a channel adapter?

I have two pairs of inbound/outbound adapters in my context.xml and would like to decide at runtime which one of them I want to get started.

EDIT:

The concrete scenario: I have a client, that can be configured at runtime to be an mqtt publisher or subscriber. My context.xml looks like this:

<int-mqtt:message-driven-channel-adapter 
    client-id="foo"
    auto-startup="true"
    url="tcp://192.168.97.164:1883"
    topics="testtopic/#"
    channel="writeToFile" />

<file:outbound-channel-adapter
    id="writeToFile"
    auto-startup="true"
    directory="./test/out"
    delete-source-files="false"/>

<int:transformer id="Transformer"
    ref="MessageTransformer"
    input-channel="readFromFile"
    output-channel="mqttOut"
    method="bytesFromFile" />

<bean id="MessageTransformer" class="MessageTransformer"/>

<int-mqtt:outbound-channel-adapter 
    id="mqttOut"
    client-id="foo"
    url="tcp://192.168.97.164:1883"
    auto-startup="false"
    default-qos="1"
    default-retained="true"
    default-topic="testtopic/bla"
    />

    <file:inbound-channel-adapter
    auto-startup="false" 
    id="readFromFile"
    directory="./test/in"
    filename-pattern="myFile*">
    <int:poller id="poller"
        fixed-rate="5000" />     
</file:inbound-channel-adapter>

As you can see, I have two settings: 1. Subscriber case: Read mqtt message -> Write to file 2. Publisher case: Poll a file from directory -> Send via mqtt

I decide at runtime what setting is to be applied.

So can you kindly tell me how this control-bus thing would fit here exactly?


Answer:

Set autoStartup="false" and either directly start()/stop() them, or use a <control-bus/> (send @myAdapter.start()).

Getting a direct reference (autowire etc), depends on the endpoint type. If it's a polled endpoint, inject a SourcePollingChannelAdapter; message-driven adapters vary, but generally are a MessageProducerSupport or MessagingGatewaySupport.

EDIT:

Read about the control-bus here.

Give the inbound adapter an id attribute.

Add <control-bus input-channel="control"/>

Add <int:gateway service-interface="foo.Controller" default-request-channel="control"/>

Create a gateway interface

public interface Controller {

    void control(String command);

}

@Autowire the gateway (or use context.getBean(Controller.class)).

Then, when you are ready to start the adapter, call, e.g. gateway.control("@mqttOut.start()").

You don't need auto-startup="false" on the outbound adapters.

However, for a simple use case like this, you might want to investigate using Spring profiles instead (put the adapters in a profile and enable the profile at runtime.

Question:

I am currently using Spring Integration Kafka to make real-time statistics. Though, the group name makes Kafka search all the previous values the listener didn't read.

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group ?


Answer:

Because I didn't saw any example of this, I'm gonna explain how I did here.

The class of your @KafkaListener must implement a ConsumerSeekAware class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


    }
}

Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.