websocket asynchronous feedback during a long process

websocket-async/await
async websocket python
async websocket esp32
linux websocket client
which web socket event will occur when the client or user receives data from the server
javascript websocket
websocket message
websocket front-end

I am trying to implement a feedback in a web page that let the user start a long process from an Excel sheet (sight, yes...). For each line of the data, the processing time is about 1 second, and the common data length is between 40 and 100 items, so the overall processing time can be greater than a minute.

I am displaying a preview of the data in the page, starting the process through a websocket and would like to show a progression from the same websocket.

The processing in itself is made by an external package and the page complexity is minimal, so I have wrapped it in a Lite single file.

My problem is that the long processing started in the websocket route is blocking the feedback until it has finished and all the progession events are sent at the same time at the end. For what I understand, it is related to the event loop of Mojolicious and I should start the processing separately to avoid freezing the processing of websocket.

Note that I have tried a separate channel of feedback with an EventSource to push some progession to the client during the processing, but it shows the same completion at once at the end.

Here is my code simplified, I am using a sleep() to simulate the long process. I am starting with

perl mojo_notify_ws.pl daemon

Could you suggest how to modify the websocket route to allow a real time feedback?

use Mojolicious::Lite;
use Mojo::JSON qw(encode_json decode_json j);

use Data::Dumper;

$|++;

any '/' => sub {
    my $c = shift;
    $c->render('index');
};

my $peer;
websocket '/go' => sub {
    use Carp::Always;
    my $ws = shift;

    $peer = $ws->tx;
    app->log->debug(sprintf 'Client connected: %s', Dumper $peer->remote_address);

    # do not subscribe to 'text' else 'json' won't work
    #$ws->on(text => sub {
    #    my ($ws, $msg) = @_;
    #    app->log->debug("Received text from websocket: `$msg`");
    #        });

    # $peer->send('{"type": "test"}');
    # say 'default inactivity timeout='. (p $ws->inactivity_timeout());
    $ws->inactivity_timeout(120);

    $ws->on(json => sub {
        my ($ws, $msg) = @_;
        app->log->debug('Received from websocket:', Dumper(\$msg));
        unless($msg){
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');

        # simulate
        my $loop = Mojo::IOLoop->singleton;

#        $loop->subprocess( sub {
#            my $sp = shift;

        for my $cell (1..3) {
            # $loop->delay( sub {
                app->log->debug("sending cell $cell");
                my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                };
                $ws->send( { json => $payload } );
                sleep(2);
                # $loop->timer(2, sub {say 'we have waited 2 secs!';})->wait;
            # });
        };

#        }, sub {} );#subprocess

        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
    });

    $ws->on(finish => sub {
        my ($ws, $code, $reason) = @_;
        $reason = '' unless defined $reason;
        app->log->debug("Client disconnected: $code ($reason)");
    });

    app->log->debug('Reached end of ws route definition');
};

app->start;

__DATA__

@@ index.html.ep
<html>
    <head>
    <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.js"></script>
    <script>
var timerID = 0; 
function keepAlive(ws) { 
    var timeout = 20000;  
    if (ws.readyState == ws.OPEN) {  
        ws.send('ping');  
    }  
    timerId = setTimeout(function(){keepAlive(ws);}, timeout);  
}  
function cancelKeepAlive() {  
    if (timerId) {  
        clearTimeout(timerId);  
    }  
}

function flagCell(cell, result){
    var id='#CELL_' + cell;
    var cell = $(id);
    if(cell) {
        if (result=='OK') {
            cell.css('color', 'green');
            cell.text('⯲');
        } else {
            cell.css('color','red');
            cell.text('✘');
        }
    }
}

function process(){
    //debugger;
    console.log('Opening WebSocket');
    var ws = new WebSocket('<%= url_for('go')->to_abs %>');

    ws.onopen = function (){
        console.log('Websocket Open');
        //keepAlive(ws);
        ws.send(JSON.stringify({cmd: "let's go Perl"}));
    };
    //incoming
    ws.onmessage = function(evt){
        var data = JSON.parse(evt.data);
        console.log('WS received '+JSON.stringify(data));
        if (data.type == 'ticket') {
            console.log('Server has send a status');
            console.log('Cell:'+data.cell + ' res:' + data.result);

            flagCell(data.cell, data.result);
        } else if (data.type == 'end') {
            console.log('Server has finished.');
            //cancelKeepAlive();
            ws.close();
        } else {
            console.log('Unknown message:' + evt.data);
        }
    };
    ws.onerror = function (evt) {
        console.log('ws error:', evt.data);
    }
    ws.onclose = function (evt) {
        if(evt.wasClean) {
            console.log('Connection closed cleanly');
        } else {
            console.log('Connection reseted');
        }
        console.log('Code:'+ evt.code + ' Reason:' + evt.reason);
    }
}

    </script>
    </head>
    <body>
        <button type=button id='upload' onclick="process();">Go</button><br>
        <div style='font-family:sans;'>
            <table border="1px">
              <tr><td id="CELL_1">&nbsp;</td><td>Foo</td></tr>
              <tr><td id="CELL_2">&nbsp;</td><td>Bar</td></tr>
              <tr><td id="CELL_3">&nbsp;</td><td>Baz</td></tr>
            </table>
        </div>
    </body>
</html>

EDIT:

Grinnz has provided a suitable solution, but for the record, here is my attempt with Mojo::IOLoop::Subprocess callback but then I have no feedback at all. I am running on Linux and Subprocess seems to fork, and the the parent process seems to terminates the websocket immediately edit: no: I eventually found that the $ws->send() is at the wrong place as it should be placed in the second sub{} that is run in the parent side, and not in the first that is run in the child process. This code should be refactored to have one subprocess per loop iteration and a final step for the notification of end.

Here is the modified on(json)

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');

    # my $loop = Mojo::IOLoop->singleton;
    my $subprocess = Mojo::IOLoop::Subprocess->new;
    app->log->debug("we are pid $$");
    $subprocess->run( 
        sub {
            my $sp = shift;
            for my $cell (1..3) {
                app->log->debug("starting process for cell $cell in pid $$");     
                sleep(2);
                app->log->debug("sending cell $cell to ws");
                my $payload = {
                    type => 'ticket',
                    cell => $cell,
                    result => $cell % 2 ? 'OK' : 'NOK'
                };
                $ws->send( { json => $payload } ); # FIXME: actually this line is in the wrong place
                                                   # and should be in the second sub{}
            };
        },
        sub {
            my ($sp, $err, @results) = @_; 
            $ws->reply->exception($err) and return if $err;
            app->log->debug('sending end of process ->websocket');
            $ws->send({json => { type => 'end' } });
        });  
    # Start event loop if necessary
    $subprocess->ioloop->start unless $subprocess->ioloop->is_running;       
});

And the corresponding log:

[Wed Oct  3 19:51:58 2018] [debug] Received: `let's go Perl`
[Wed Oct  3 19:51:58 2018] [debug] we are pid 8898
[Wed Oct  3 19:51:58 2018] [debug] Client disconnected: 1006 ()
[Wed Oct  3 19:51:58 2018] [debug] starting process for cell 1 in pid 8915
[Wed Oct  3 19:52:00 2018] [debug] sending cell 1 to ws
[Wed Oct  3 19:52:00 2018] [debug] starting process for cell 2 in pid 8915
[Wed Oct  3 19:52:02 2018] [debug] sending cell 2 to ws
[Wed Oct  3 19:52:02 2018] [debug] starting process for cell 3 in pid 8915
[Wed Oct  3 19:52:04 2018] [debug] sending cell 3 to ws
[Wed Oct  3 19:52:04 2018] [debug] sending end of process ->websocket
[Wed Oct  3 19:52:04 2018] [debug] Client disconnected: 1005 ()

I also experimented with Mojo::IOLoop->delay to generate a complicated sequence of steps in a way similar to the Promise solution, but this one is sending also all the notifications at once at the end:

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');

    app->log->debug("we are pid $$");

    my @steps;
    for my $cell (1..3) {
        push @steps, 
            sub {
                app->log->debug("subprocess for cell pid $cell");
                # my $sp = shift;
                my $delay = shift;
                sleep(2);
                app->log->debug("end of sleep for cell $cell");
                $delay->pass($cell % 2 ? 'OK' : 'NOK');
            },
            sub {
                my $delay = shift;
                my $result = shift;

                app->log->debug("sending cell $cell from pid $$ - result was $result");
                my $payload = {
                    type => 'ticket',
                    cell => $cell,
                    result => $result
            };
            $ws->send( { json => $payload } );
            $delay->pass;    
        };
    }

    # add final step to notify end of processing
    push @steps, sub {
        my $delay = shift;
        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
        $delay->pass;
    };

    my $delay = Mojo::IOLoop::Delay->new;
    app->log->debug("Starting delay...");
    $delay->steps( @steps );
    app->log->debug("After the delay");

});

It is not possible to magically make Perl code non-blocking. That's why your blocking operation is holding up the websocket responses and event loop.

A single subprocess will not work for this, because only the original worker process that handled the request can respond to the websocket, and subprocesses can only return once. You can, however, use a subprocess to prepare each response you want to send. Your use of subprocesses is not quite correct however.

The first subroutine passed to the subprocess executes in a fork and thus doesn't block the main process. The second subroutine executes in the parent once the subprocess completes, and receives the return value of the first subroutine. This is where you need to send your responses.

Any code outside of that will be executed before the subprocess is even started, because this is asynchronous code, you need to sequence the logic via callbacks. You can use promises to make complicated sequencing simpler.

use Mojo::Promise;

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper(\$msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');

    my $promise = Mojo::Promise->new->resolve; # starting point
    # attach follow-up code for each cell, returning a new promise representing the whole chain so far
    for my $cell (1..3) {
        $promise = $promise->then(sub {
            my $promise = Mojo::Promise->new;
            Mojo::IOLoop->subprocess(sub {
                app->log->debug("sending cell $cell");
                sleep(2);
                my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                };
                return $payload;
            }, sub {
                my ($sp, $err, $payload) = @_;
                return $promise->reject($err) if $err; # indicates subprocess died
                $ws->send( { json => $payload }, sub { $promise->resolve } );
            });

            # here, the subprocess has not been started yet
            # it will be started when this handler returns to the event loop
            # then the second callback will run once the subprocess exits
            return $promise;
        };
    }

    # chain from last promise
    $promise->then(sub {
        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
    })->catch(sub {
        my $err = shift;
        # you can send or log something here to indicate an error occurred in one of the subprocesses
    });
});

Some other options I can go into more detail if they would be appropriate: Mojo::IOLoop::ReadWriteFork which would let you start just one subprocess and continuously receive STDOUT from it (you would need to serialize your payload yourself to send it on STDOUT, like with Mojo::JSON); or a regular subprocess that sends status information back to the parent over an external pub/sub broker that both processes can connect to, like Postgres, Redis, or Mercury (also would require serialization).

Asynchronous Websockets -- a quick tutorial, Asynchronous Websockets -- a quick tutorial ⏩ Post By ✅ Fabian Haupt The goal of this post is to discuss working with Websockets in a SharedConnection​=1 to indicate that we want to be able to write to this socket from multiple processes. Please feel free to provide feedback in the comments below. Sending messages in an asynchronous manner avoids blocking the sending thread. This is great when your solution needs to scale in order to support a large number of clients, but there is a limit on how long can we wait for the asynchronous process to complete. The Java WebSocket API gives you a few options in this regard.

You can use a thread instead of a subprocess to do the work. After creation of the thread you need a loop that updates the progress via websocket.

If you handle with critical workloads that really have to be finished under all circumstances (websocket is gone, network is down etc.) you should delegate it to another daemon that persists and communicates its state via a file or a socket.

If it is a non critical workload and you can easily restart it this may be a template for you.

# Insert this at module header
# use threads;
# use Thread::Queue;

my $queue  = Thread::Queue->new();
my $worker = threads->create(sub {
  # dummy workload. do your work here
  my $count = 60;
  for (1..$count) {
    sleep 1;
    $queue->enqueue($_/$count);
  }

  # undef to signal end of work
  $queue->enqueue(undef);

  return;
});

# blocking dequeuing ends when retrieving an undef'd value
while(defined(my $item = $queue->dequeue)) {
  # update progress via websocket
  printf("%f %\n", $item);
}

# join thread
$worker->join;

Asynchronous processing with remote callbacks using WebSockets , This post is about processing a task asynchronously and update the client with live feedback. For instance With the basic setup in place, when a task is triggered from the client, along with the task details, socket meta is sent. But there is a limit on how long can we wait for the asynchronous process to complete. The Java WebSocket API gives you a few options in this regard. Async Timeout support. first and foremost, there is a notion of a timeout and this can be configured using the setSendTimeout method in the RemoteEndpoint.Async interface; secondly, the failure result manifests itself using the Future object or java.websocket.SendResult; How do timeouts manifest ?

I have made a small change to your updated example to make it working as expected. You can use the progress feature of Subprocess module to ensure that the correct data is sent over the websocket asynchronously from the long subprocess.

The code now works as expected for me, the table state is updated on the client side every time the subprocess goes through an iteration.

The relevant part of the source code then looks like this:

$ws->on(
    json => sub {
        my ( $ws, $msg ) = @_;
        app->log->debug( 'Received from websocket:', Dumper( \$msg ) );
        unless ($msg) {
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug( sprintf 'Received: `%s`', $prompt // '<empty??>' );

        # my $loop = Mojo::IOLoop->singleton;
        my $subprocess = Mojo::IOLoop::Subprocess->new;
        app->log->debug("we are pid $$");
        $subprocess->run(
            sub {
                my $sp = shift;
                for my $cell ( 1 .. 3 ) {
                    app->log->debug(
                        "starting process for cell $cell in pid $$");
                    sleep(2);
                    app->log->debug("sending cell $cell to ws");
                    my $payload = {
                        type   => 'ticket',
                        cell   => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                    };
                    $sp->progress($payload);
                }
            },
            sub {
                my ( $sp, $err, @results ) = @_;

                #$ws->send( { json => $payload } );
                $ws->reply->exception($err) and return if $err;
                app->log->debug('sending end of process ->websocket');
                $ws->send( { json => { type => 'end' } } );
            }
        );

        # Start event loop if necessary
        $subprocess->on(
            progress => sub {
                my ( $subprocess, $payload ) = @_;
                $ws->send( { json => $payload } );
            }
        );
        $subprocess->ioloop->start unless $subprocess->ioloop->is_running;
    }
);

Asynchronous Request-Reply Pattern, Application code can make a synchronous API call in a non-blocking The API should validate both the request and the action to be performed before starting the long running process. modern callback technologies such as WebSockets or webhooks. There is currently no feedback for this document. The asynchronous nature of WebSockets means that as long as a WebSocket connection is open, an application can listen for events. To start listening for events, add callback functions to the WebSocket object or use the addEventListener() DOM method to add event listeners to the WebSocket objects.

Introduction to WebSockets, In addition, polling requires the client to open and close many unnecessary connections. The asynchronous nature of WebSockets means that as long as a WebSocket connection is open, handshakes, process them, and send those clients to a real WebSocket server. Your feedback is important to us. Second in WebKit browsers (Safari, Chrome) if the page is refreshed the WebSocket will open but no ..StatusUpdate is received and at least one process starts spinning in an infinite loop. Sometimes it may take more than one refresh, other times multiple processes start spinning.

Writing WebSocket client applications, Feedback▽ In order to communicate using the WebSocket protocol, you need to As establishing a connection is asynchronous and prone to failure there is no object containing the data the server needs to process the message secure WebSocket connections, and no longer support using them in  Asynchronous WebSocket client and server, supporting HTTP/1 and HTTP/2 for Ruby. - socketry/async-websocket

MQTT Essentials, In this chapter, we analyzed the requirements for controlling LEDs wired to different IoT boards with MQTT messages over WebSockets. combined with JavaScript and the Eclipse Paho JavaScript asynchronous client to control LEDs. and we were able to process the received messages to provide feedback to the user. The client establishes a connection through a process known as Web Socket handshake. The process begins with the client sending a regular HTTP request to the server. An Upgrade header is requested. In this request, it informs the server that request is for Web Socket connection. Web Socket URLs use the ws scheme. They are also used for secure Web Socket connections, which are the equivalent to HTTPs.

Comments
  • What was your "another attempt" at using Subprocess? Did you try the method I mentioned since the subprocess cannot communicate with the websocket?
  • @Grinnz: I just updated my question to show my different attempts with Subprocess and delay(). But your Promise solution is close to the expected result. At least it provides a kind asynchronous feedback. See my additional question in your answer comments.
  • Delays aren't really suggested anymore as they've been mostly replaced with Promise based tools in Mojo, and historically have been a more difficult concept for people to grasp. The reason it still blocks websocket communication is that delays are similar to promises in that they only organize when code is run, the code will still block while it runs, that's what the subprocesses are for.
  • Great this is very close to what I want to get! A last thing is the sequencing of promises. I though at first that all the results were sent at the same time, but actually all the promises seem to start at once, as using a sleep(2 * $cell) can produce the expected delayed feedbacks. The actual processing consist in creating tasks in a Jira server, do you think of a solution to process in sequence instead of in parallel to avoid performing a DOS by sending all requests at once? I feel that using sleep($cell) is a bit ugly. :)
  • Do you mean that you want to wait for each subprocess to complete before starting the next one?
  • To wait for each promise to resolve before the next one, you want to attach a ->then callback that creates the next promise to wait for, and the final ->then/->catch is just attached to the last promise in this chain. I've updated the example code with this logic.
  • Perfect! This is what I was searching for. You achieved to make me understand the Mojo::Promise I had found during my research but did not understood. I was looking for a decent solution for 2 weeks... Many thanks! The Mojo::IOLoop::ReadWriteFork looks interesting too, I may have some usage for different use cases and will evaluate later.
  • Interesting, but in my case it would replace a blocking processing by a blocking loop on statuses returned by the queue.
  • I accept it: blocking in this scenario is bad. Another approach to avoid the blocking here is to create a second thread that "observes" (dequeing) the first thread and do the progress updates. Both threads must be detached rather than joined to avoid blocking of the main process. The big caveat of using threads here is also that all ever loaded packages at this point in time will be cloned into a new interpreter. If you have limited resources (memory) this will may cause your system.
  • I only recently noticed your answer. This feature was only added in november 2018, after my question. This is now very convenient. Thanks!