Hot questions for Spring WebSocket

Top 10 Java Open Source / Spring / Spring WebSocket

Question:

Is it possible to send a message to specific session?

I have an unauthenticated websocket between clients and a Spring servlet. I need to send an unsolicited message to a specific connection when an async job ends.

@Controller
public class WebsocketTest {


     @Autowired
    public SimpMessageSendingOperations messagingTemplate;

    ExecutorService executor = Executors.newSingleThreadExecutor();

    @MessageMapping("/start")
    public void start(SimpMessageHeaderAccessor accessor) throws Exception {
        String applicantId=accessor.getSessionId();        
        executor.submit(() -> {
            //... slow job
            jobEnd(applicantId);
        });
    }

    public void jobEnd(String sessionId){
        messagingTemplate.convertAndSend("/queue/jobend"); //how to send only to that session?
    }
}

As you can see in this code, the client can start an async job and when it finishes, it needs the end message. Obviously, I need to message only the applicant and not broadcast to everyone. It would be great to have an @SendToSession annotation or messagingTemplate.convertAndSendToSession method.

UPDATE

I tried this:

messagingTemplate.convertAndSend("/queue/jobend", true, Collections.singletonMap(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId));

But this broadcasts to all sessions, not only the one specified.

UPDATE 2

Test with convertAndSendToUser() method. This test is and hack of the official Spring tutorial: https://spring.io/guides/gs/messaging-stomp-websocket/

This is the server code:

@Controller
public class WebsocketTest {

    @PostConstruct
    public void init(){
        ScheduledExecutorService statusTimerExecutor=Executors.newSingleThreadScheduledExecutor();
        statusTimerExecutor.scheduleAtFixedRate(new Runnable() {                
            @Override
            public void run() {
                messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"));
            }
        }, 5000,5000, TimeUnit.MILLISECONDS);
    } 

     @Autowired
        public SimpMessageSendingOperations messagingTemplate;
}

and this is the client code:

function connect() {
            var socket = new WebSocket('ws://localhost:8080/hello');
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function(frame) {
                setConnected(true);
                console.log('Connected: ' + frame);
                stompClient.subscribe('/user/queue/test', function(greeting){
                    console.log(JSON.parse(greeting.body));
                });
            });
        }

Unfortunately client doesn't receive its per-session reply every 5000ms as expected. I'm sure that "1" is a valid sessionId for the 2nd client connected because I see it in debug mode with SimpMessageHeaderAccessor.getSessionId()

BACKGROUND SCENARIO

I want to create a progress bar for a remote job, client asks server for an async job and it checks its progress by websocket message sent from server. This is NOT a file upload but a remote computation, so only server knows the progress of each job. I need to send a message to specific session because each job is started by session. Client asks for a remote computation Server starts this job and for every job step reply to applicant client with its job progress status. Client gets messages about its job and build up a progress/status bar. This is why I need a per-session messages. I could also use a per-user messages, but Spring does not provide per user unsolicited messages. (Cannot send user message with Spring Websocket)

WORKING SOLUTION

 __      __ ___   ___  _  __ ___  _  _   ___      ___   ___   _    _   _  _____  ___  ___   _  _ 
 \ \    / // _ \ | _ \| |/ /|_ _|| \| | / __|    / __| / _ \ | |  | | | ||_   _||_ _|/ _ \ | \| |
  \ \/\/ /| (_) ||   /| ' <  | | | .` || (_ |    \__ \| (_) || |__| |_| |  | |   | || (_) || .` |
   \_/\_/  \___/ |_|_\|_|\_\|___||_|\_| \___|    |___/ \___/ |____|\___/   |_|  |___|\___/ |_|\_|

Starting from the UPDATE2 solution I had to complete convertAndSendToUser method with last param (MessageHeaders):

messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"), createHeaders("1"));

where createHeaders() is this method:

private MessageHeaders createHeaders(String sessionId) {
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);
        return headerAccessor.getMessageHeaders();
    }

Answer:

No need to create specific destinations, it's already done out of the box as of Spring 4.1 (see SPR-11309).

Given users subscribe to a /user/queue/something queue, you can send a message to a single session with:

As stated in the SimpMessageSendingOperations Javadoc, since your user name is actually a sessionId, you MUST set that as a header as well otherwise the DefaultUserDestinationResolver won't be able to route the message and will drop it.

SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor
    .create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);

messagingTemplate.convertAndSendToUser(sessionId,"/queue/something", payload, 
    headerAccessor.getMessageHeaders());

You don't need users to be authenticated for this.

Question:

My error shows up in the console of my browser:

"WebSocket connection to 'ws://localhost:32768/DspClusterWebServices/myHandler' failed: Unexpected response code: 200"

I am using Spring Websockets 4.1.5 and Tomcat 8.0.18. My WebSocketConfigurer implementation class looks like:

@Configuration
@Controller
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer
{
   class MyHandler implements WebSocketHandler
   {
      @Override
      public void afterConnectionEstablished(WebSocketSession session) throws Exception
      {
         System.out.println("afterConntectionEstablished called");
      }

       ...implements rest of functions with a System.out.println and false for supportsPartialMessages()

      }
   }

   @Override registerWebSocketHandlers(WebSocketHandlerRegistry registry)
   {
      registry.addHandler(myHandler(), "myHandler").withSockJS();
   }

   @Bean
   public WebSocketHandler myHandler()
   {
      return new MyHandler();
   }
}

My testWebsocketClient.js tries to connect with this code, but has a error code of 200:

websocket = new WebSocket("ws://localhost:8080/myApp/myHandler");

I cannot figure out what to try next. I thought that this would cause the afterConnectionEstablished(WebSocketSession session) method to fire? Isn't code 200 good?


Answer:

Please check http://procbits.com/connecting-to-a-sockjs-server-from-native-html5-websocket!

After you append /websocket (to your URL), it will give you the error

Failed to parse Origin header value [null]

;) , which then will in turn lead you to that link.

You'll have to add .setAllowedOrigins("*") to your addHandler() method, and then it could finally work!

Question:

I'm implementing a version of the stock application where the server able to reject topic subscription for certain topic based on the user rights. Is there a way in spring-websocket to do this?

For example:

In the stock example project we have price topic for 3 instrument: Apple, Microsoft, Google And have two user: User1, User2

User1 should have access to Apple and Microsoft User2 should have access to Google only

If User1 subscribe to Google he should got rejected response, and message shouldn't broadcast to him afterwards.


Answer:

Thanks to Rossen Stoyanchev answer on github I was manage to solve this by adding interceptor to the inbound channel. Changes needed in the spring-websocket-portfolio demo application is the following:

Change websocket configuration:

public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.setInterceptors(new TopicSubscriptionInterceptor());
}

And the interceptor was something like this:

public class TopicSubscriptionInterceptor extends ChannelInterceptorAdapter {

private static Logger logger = org.slf4j.LoggerFactory.getLogger(TopicSubscriptionInterceptor.class);


@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
    StompHeaderAccessor headerAccessor= StompHeaderAccessor.wrap(message);
    if (StompCommand.SUBSCRIBE.equals(headerAccessor.getCommand())) {
        Principal userPrincipal = headerAccessor.getUser();
        if(!validateSubscription(userPrincipal, headerAccessor.getDestination()))
        {
            throw new IllegalArgumentException("No permission for this topic");
        }
    }
    return message;
}

private boolean validateSubscription(Principal principal, String topicDestination)
{
    if (principal == null) {
        // unauthenticated user
        return false;
    }
    logger.debug("Validate subscription for {} to topic {}",principal.getName(),topicDestination);
    //Additional validation logic coming here
    return true;
}

}

Question:

I would like to understand how convertAndSendToUser works in Spring SockJS+Websocket framework.

In client, we would connect as

stompClient.connect(login, password, callback())

which will result in connect request with "Stomp credentials" of login and password, that can be seen e.g. if we handle SessionConnectEvent http://www.sergialmar.com/2014/03/detect-websocket-connects-and-disconnects-in-spring-4/

But it remains unclear to me whether this will be the "user" meant in server-side send operation to a queue:

 simpMessagingTemplate.convertAndSendToUser(username, "/queue/reply", message);

The closest I can get is to read this thread Sending message to specific user on Spring Websocket, answer by Thanh Nguyen Van, but it is still unclear.

Basically what I need to do, is to subscribe some clients to same topic, but on server, send them different data. Client may supply user identifier.


Answer:

I have been trying to understand websockets and came across this question. Was thoroughly disappointed not to find an answer. Here's something for future reader.

I will assume the reader has a basic understanding of spring websockets using Stomp. Terms like subscription, destination prefixes, topics, socket configuration file etc. are understood.

We know we can send messages to client from a stomp server using the topic prefixes that he is subscribed to e.g. /topic/hello. We also know we can send messages to a specific user because spring provides the convertAndSendToUser(username, destination, message) API. It accepts a String username which means if we somehow have a unique username for every connection, we should be able to send messages to specific users subscribed to a topic.

What's less understood is, where does this username come from ?

This username is part of a java.security.Principal interface. Each StompHeaderAccessor or WebSocketSession object has instance of this principal and you can get the user name from it. However, as per my experiments, it is not generated automatically. It has to be generated manually by the server for every session.

To use this interface first you need to implement it.

class StompPrincipal implements Principal {
    String name

    StompPrincipal(String name) {
        this.name = name
    }

    @Override
    String getName() {
        return name
    }
}

Then you can generate a unique StompPrincipal for every connection by overriding the DefaultHandshakeHandler. You can use any logic to generate the username. Here is one potential logic which uses UUID :

class CustomHandshakeHandler extends DefaultHandshakeHandler {
    // Custom class for storing principal
    @Override
    protected Principal determineUser(ServerHttpRequest request,
                                      WebSocketHandler wsHandler,
                                      Map<String, Object> attributes) {
        // Generate principal with UUID as name
        return new StompPrincipal(UUID.randomUUID().toString())
    }
}

Lastly, you need to configure your websockets to use your custom handshake handler.

@Override
void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
    stompEndpointRegistry
         .addEndpoint("/stomp") // Set websocket endpoint to connect to
         .setHandshakeHandler(new CustomHandshakeHandler()) // Set custom handshake handler
         .withSockJS() // Add Sock JS support
}

That's It. Now your server is configured to generate a unique principal name for every connection. It will pass that principal as part of StomHeaderAccessor objects that you can access through connection event listeners, MessageMapping functions etc...

From event listeners :

@EventListener
void handleSessionConnectedEvent(SessionConnectedEvent event) {
    // Get Accessor
    StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage())
}

From Message Mapped APIs

@MessageMapping('/hello')
protected void hello(SimpMessageHeaderAccessor sha, Map message) {
    // sha available in params
}

One last note about using convertAndSendToUser(...). When sending messages to a user, you will use something like this

convertAndSendToUser(sha.session.principal.name, '/topic/hello', message)

However, for subscribing the client, you will use

client.subscribe('/user/topic/hello', callback)

If you subscribe the client to /topic/hello you will only receive broadcasted messages.

Question:

In our current application, we use Spring Websockets over STOMP. We are looking to scale horizontally. Are there any best practices on how we should handle websocket traffic over multiple tomcat instances and how can we maintain session info across multiple nodes.Is there a working sample that one can refer to?


Answer:

Your requirement can be divided into 2 sub tasks:

  1. Maintain session info across multiple nodes: You can try Spring Sessions clustering backed by Redis (see: HttpSession with Redis). This very very simple and already has support for Spring Websockets (see: Spring Session & WebSockets).

  2. Handle websockets traffic over multiple tomcat instances: There are several ways to do that.

    • The first way: Using a full-featured broker (eg: ActiveMQ) and try new feature Support multiple WebSocket servers (from: 4.2.0 RC1)
    • The second way: Using a full-feature broker and implement a distributed UserSessionRegistry (eg: Using Redis :D ). The default implementation DefaultUserSessionRegistry using an in-memory storage.

Updated: I've written a simple implementation using Redis, try it if you are interested

To configure a full-featured broker (broker relay), you can try:

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    ...

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic", "/queue")
            .setRelayHost("localhost") // broker host
            .setRelayPort(61613) // broker port
            ;
        config.setApplicationDestinationPrefixes("/app");
    }

    @Bean
    public UserSessionRegistry userSessionRegistry() {
        return new RedisUserSessionRegistry(redisConnectionFactory);
    }

    ...
}

and

import java.util.Set;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundHashOperations;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.messaging.simp.user.UserSessionRegistry;
import org.springframework.util.Assert;

/**
 * An implementation of {@link UserSessionRegistry} backed by Redis.
 * @author thanh
 */
public class RedisUserSessionRegistry implements UserSessionRegistry {

    /**
     * The prefix for each key of the Redis Set representing a user's sessions. The suffix is the unique user id.
     */
    static final String BOUNDED_HASH_KEY_PREFIX = "spring:websockets:users:";

    private final RedisOperations<String, String> sessionRedisOperations;

    @SuppressWarnings("unchecked")
    public RedisUserSessionRegistry(RedisConnectionFactory redisConnectionFactory) {
        this(createDefaultTemplate(redisConnectionFactory));
    }

    public RedisUserSessionRegistry(RedisOperations<String, String> sessionRedisOperations) {
        Assert.notNull(sessionRedisOperations, "sessionRedisOperations cannot be null");
        this.sessionRedisOperations = sessionRedisOperations;
    }

    @Override
    public Set<String> getSessionIds(String user) {
        Set<String> entries = getSessionBoundHashOperations(user).members();
        return (entries != null) ? entries : Collections.<String>emptySet();
    }

    @Override
    public void registerSessionId(String user, String sessionId) {
        getSessionBoundHashOperations(user).add(sessionId);
    }

    @Override
    public void unregisterSessionId(String user, String sessionId) {
        getSessionBoundHashOperations(user).remove(sessionId);
    }

    /**
     * Gets the {@link BoundHashOperations} to operate on a username
     */
    private BoundSetOperations<String, String> getSessionBoundHashOperations(String username) {
        String key = getKey(username);
        return this.sessionRedisOperations.boundSetOps(key);
    }

    /**
     * Gets the Hash key for this user by prefixing it appropriately.
     */
    static String getKey(String username) {
        return BOUNDED_HASH_KEY_PREFIX + username;
    }

    @SuppressWarnings("rawtypes")
    private static RedisTemplate createDefaultTemplate(RedisConnectionFactory connectionFactory) {
        Assert.notNull(connectionFactory, "connectionFactory cannot be null");
        StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        return template;
    }

}

Question:

I try to use Spring with websocket. I started my investigation with this tutorial.

In my side client I have something like that to initialize the connection to the server :

function connect() {
    var socket = new SockJS('/myphotos/form');
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function(frame) {
        setConnected(true);
        console.log('Connected: ' + frame);
        stompClient.subscribe('/topic/greetings', function(greeting){
            showGreeting(JSON.parse(greeting.body).content);
        });
    });
}

It works great, in my controller I’m able to do my process in the following class :

@Controller
@RequestMapping("/")
public class PhotoController {

    @MessageMapping("/form")
    @SendTo("/topic/greetings")
    public Greeting validate(AddPhotosForm addPhotosForm) {
        return new Greeting("Hello world !");
    }
}

Now what I want to do it’s having a thread sending a message to the client listening on “/topic/greeting”. I wrote my Runnable class like this :

public class FireGreeting implements Runnable {

    private PhotoController listener;

    public FireGreeting(PhotoController listener) {
        this.listener = listener;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep( 2000 );
                listener.fireGreeting();
            } catch ( InterruptedException e ) {
                e.printStackTrace();
            }
        }   
    }
}

And completed my controller like that :

@Controller
@RequestMapping("/")
public class PhotoController {

    @MessageMapping("/form")
    @SendTo("/topic/greetings")
    public Greeting validate(AddPhotosForm addPhotosForm) {

        // added this part
        FireGreeting r = new FireGreeting( this );
        new Thread(r).start();

        return new Greeting("Hello world !");
    }

    // added this method
    @SendTo("/topic/greetings")
    public Greeting fireGreeting() {
        System.out.println("Fire");
        return new Greeting("Fire");
    }
}

The method PhotoController.fireGreeting is called as I want but nothing happened on the client side. Any suggestions ? Thank you.


Answer:

I was able to solve my problem thanks to @Boris the Spider. The correct solution is to do something like that :

@Controller
@RequestMapping("/")
public class PhotoController {

    @Autowired
    private SimpMessagingTemplate template;

    @MessageMapping("/form")
    @SendTo("/topic/greetings")
    public Greeting validate(AddPhotosForm addPhotosForm) {

        FireGreeting r = new FireGreeting( this );
        new Thread(r).start();

        return new Greeting("Hello world !");
    }

    public void fireGreeting() {
        System.out.println("Fire");
        this.template.convertAndSend("/topic/greetings", new Greeting("Fire"));
    }
}