NamedPipeServerStream.ReadAsync() does not exit when CancellationToken requests cancell

When the NamedPipeServer stream reads any data from the pipe it does not react to CancellationTokenSource.Cancel()

Why is that?

How can I limit the time I'm waiting in the server for data from the client?

Code to reproduce:

static void Main(string[] args)
{
    Server();
    Clinet();
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();
}

private static async Task Server()
{
    using (var cancellationTokenSource = new CancellationTokenSource(1000))
    using (var server = new NamedPipeServerStream("test",
        PipeDirection.InOut,
        1,
        PipeTransmissionMode.Byte,
        PipeOptions.Asynchronous))
    {
        var cancellationToken = cancellationTokenSource.Token;
        await server.WaitForConnectionAsync(cancellationToken);
        await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
        var buffer = new byte[4];
        await server.ReadAsync(buffer, 0, 4, cancellationToken);
        Console.WriteLine("exit server");
    }
}

private static async Task Clinet()
{
    using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
    {
        var buffer = new byte[4];
        client.Connect();
        client.Read(buffer, 0, 4);
        await Task.Delay(5000);
        await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
        Console.WriteLine("client exit");
    }
}

Expected result:

exit server
<client throws exception cuz server closed pipe>

Actual result:

client exit
exit server

EDIT

The answer with CancelIo seems promising, and it does allow the server to end communication when the cancellation token is canceled. However, I don't understand why my "base scenario" stopped working when using ReadPipeAsync.

Here is the code, it includes 2 client functions:

  1. Clinet_ShouldWorkFine - a good client that reads/writes in time
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - a client too slow, server should end the communication

Expected:

  1. Clinet_ShouldWorkFine - execution ends without any excepiton
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - server closes the pipe, client throws exception

Actual:

  1. Clinet_ShouldWorkFine - server stops at first call to ReadPipeAsync, pipe is closed afer 1s, client throws exception
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - server closes the pipe, client throws exception

Why is Clinet_ShouldWorkFine not working when the server uses ReadPipeAsync

class Program
{
    static void Main(string[] args) {
        // in this case server should close the pipe cuz client is too slow
        try {
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => {
                Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
            });
            tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
            Task.WhenAll(tasks).Wait();
        }
        catch (Exception ex) {
            Console.WriteLine(ex);
        }

        // in this case server should exchange data with client fine
        try {
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => {
                Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
            });
            tasks[2] = Clinet_ShouldWorkFine();
            Task.WhenAll(tasks).Wait();
        }
        catch (Exception ex) {
            Console.WriteLine(ex);
        }

        Console.WriteLine("press [enter] to exit");
        Console.ReadLine();
    }

    private static async Task Server()
    {
        using (var cancellationTokenSource = new CancellationTokenSource(1000))
        using (var server = new NamedPipeServerStream("test",
            PipeDirection.InOut,
            1,
            PipeTransmissionMode.Byte,
            PipeOptions.Asynchronous))
        {
            var cancellationToken = cancellationTokenSource.Token;
            await server.WaitForConnectionAsync(cancellationToken);
            await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
            await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
            var buffer = new byte[4];
            var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            Console.WriteLine("exit server");
        }
    }

    private static async Task Clinet_ShouldWorkFine()
    {
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        {
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            Console.WriteLine("client exit");
        }
    }

    private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
    {
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        {
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await Task.Delay(5000);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            Console.WriteLine("client exit");
        }
    }
}

public static class AsyncPipeFixer {

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => {
            try { return pipe.EndRead(async); }
            finally { registration.Dispose(); }
        }, cancellationToken);
    }

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIo(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);

}

.NET programmers get horribly in trouble with async/await when they write little test programs like this. It composes poorly, it is turtles all the way up. This program is missing the final turtle, the tasks are deadlocking. Nobody is taking care of letting the task continuations execute, as would normally happen in (say) a GUI app. Exceedingly hard to debug as well.

First make a minor change so the deadlock is completely visible:

   int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

This takes a nasty little corner-case away, the Server method making it all the way to the "Server exited" message. A chronic problem with the Task class is that when a task completes or an awaited method finished synchronously then it will try to run the continuation directly. That happens to work in this program. By forcing it to obtain the async result, the deadlock is now obvious.


Next step is to fix Main() so these tasks can't deadlock anymore. That could look like this:

static void Main(string[] args) {
    try {
        var tasks = new Task[3];
        tasks[0] = Server();
        tasks[1] = tasks[0].ContinueWith(c => {
            Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
        });
        tasks[2] = Clinet();
        Task.WhenAll(tasks).Wait();
    }
    catch (Exception ex) {
        Console.WriteLine(ex);
    }
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();
}

Now we have a shot at getting ahead and actually fix the cancellation problem. The NamedPipeServerStream class does not implement ReadAsync itself, it inherits the method from one of its base classes, Stream. It has a ratty little detail that is completely under-documented. You can only see it when you stare at the framework source code. It can only detect cancellation when the cancel occurred before you call ReadAsync(). Once it the read is started it no longer can see a cancellation. The ultimate problem you are trying to fix.

It is a fixable problem, I have but a murky idea why Microsoft did not do this for PipeStreams. The normal way to force a BeginRead() method to complete early is to Dispose() the object, also the only way that Stream.ReadAsync() can be interrupted. But there is another way, on Windows it is possible to interrupt an I/O operation with CancelIo(). Let's make it an extension method:

using System;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.IO.Pipes;
using Microsoft.Win32.SafeHandles;

public static class AsyncPipeFixer {

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => {
            try { return pipe.EndRead(async); }
            finally { registration.Dispose(); }
        }, cancellationToken);
    }

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIo(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);

}

And finally tweak the server to use it:

    int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

Do beware that this workaround is specific to Windows so can't work in a .NETCore program that targets a Unix flavor. Then consider the heavier hammer, call pipe.Close() in the CancelPipeIo() method.

It can only detect cancellation when the cancel occurred before you call ReadAsync(). Once it the read is started it no longer can see a  9 NamedPipeServerStream.ReadAsync() does not exit when CancellationToken requests cancell Oct 3 '18 4 How to programatically query for documents where property is null in ravendb Nov 17 '17 3 Git delete remote branch in TFS git repository Sep 19 '16

ReadAsync First check for cancellation then start reading if the token canceled it doesn't have effect

add following line

cancellationToken.Register(server.Disconnect);

using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
    PipeDirection.InOut,
    1,
    PipeTransmissionMode.Byte,
    PipeOptions.Asynchronous))
{
    var cancellationToken = cancellationTokenSource.Token;
    cancellationToken.Register(server.Disconnect);
    await server.WaitForConnectionAsync(cancellationToken);
    await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
    var buffer = new byte[4];
    await server.ReadAsync(buffer, 0, 4, cancellationToken);
    Console.WriteLine("exit server");
}

ReadAsync() does not exit when CancellationToken requests cancell using (​var server = new NamedPipeServerStream("test", PipeDirection. CancellationToken provides a way to run code immediately in response to a request for cancellation which you can use to call the underlying APIs for the underlying asynchronous operation to cancel it. If an operation does not support cancellation, it either should not accept a CancellationToken in the first place (e.g., TextReader.ReadLineAsync

I'm just looking at your code and maybe a fresh pair of eyes on it...

As far as I can tell, in both your original and then further complex scenarios... you are passing an already cancelled cancellation token, which is pretty unpredictable how others implement (if any) exceptions being thrown within the methods...

Use the IsCancellationRequested property to check if the token is already cancelled and don't pass cancelled tokens.

Here is a sample of adding this into your code from the original question (you can do the same for your later ReadPipeAsync method.

var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);

if(!cancellationToken.IsCancellationRequested)
{
    await server.WriteAsync(new byte[] { 1, 2, 3, 4 }, 0, 4, cancellationToken);
}

if(!cancellationToken.IsCancellationRequested)
{
    var buffer = new byte[4];
    await server.ReadAsync(buffer, 0, 4, cancellationToken);
}

Console.WriteLine("exit server");

the above code will result in

exit server
client exit

which I think was your very original question too...

GitHub is home to over 50 million developers working together to host ReadAsync does not seem to support cancellation #23638 source = new NamedPipeServerStream("pipe-test", PipeDirection. Cancel(); // readTask will now fault with TaskCanceledException timeout = Task. Linked pull requests. I'm looking at this commit from @stephentoub and I see the CancellationToken is not used once the BeginRead/Write operation is started. Especially for the read scenario it makes sense to be able to stop the reading and continue working.

GitHub is home to over 50 million developers working together to cancellation of WaitForConnectionAsync/ReadAsync/WriteAsync #14974 CanBeCanceled_False() { using (NamedPipeServerStream server a cancelable token to the operation but then try to cancel it not via the Linked pull requests. A CancellationToken enables cooperative cancellation between threads, thread pool work items, or Task objects. You create a cancellation token by instantiating a CancellationTokenSource object, which manages cancellation tokens retrieved from its CancellationTokenSource.Token property. You then pass the cancellation token to any number of

Named pipes can be used for interprocess communication locally or over a network. Close(). Closes the current stream and releases any resources (such as sockets and writes them to another stream, using a specified cancellation token. to this NamedPipeServerStream object and monitors cancellation requests. Both #1 and #2. Request the async operation to cancel, but also cancel the wait on the async operation so that we may continue running sooner than the async operation might complete. In .NET, #1 is enabled by passing a CancellationToken to the async operation in question. For example, here I’m passing a token to a Stream operation:

When the NamedPipeServer stream reads any data from the pipe it does not react to. ReadAsync() does not exit when CancellationToken requests cancell. Exposes a Stream around a named pipe, supporting both synchronous and asynchronous read and write operations. The following example demonstrates a way to send a string from a parent process to a child process on the same computer using named pipes. This example creates a NamedPipeServerStream object in a parent process with a PipeDirection

Comments
  • @MrinalKamboj I'm using new CancellationTokenSource(1000) which calls .Cancel() after the specified time passes - in this case after 1000ms
  • @MrinalKamboj where should I add the Task.Delay(1000)? Sorry I don't get it. A side note: the code above exactly demontrates the really thing where I faced this problem. I run a python script from C# and talk to it via the pipe. I can't just add delays here or there since I know that C# get stuck exactly at the ReadAsync().
  • This is really interesting. I now understand why the server was deaf to cancellation messages. However I don't know why using the ReadPipeAsync does not work in my Happy path. Added code to my original post to demo to issue with a "good" client
  • Please be more clear about what "does not work" and "happy path" mean. As written the client should not work, you should get the "Pipe is broken" exception message.
  • I have added an edit to my origin post. It includes 2 "clients". 1. Clinet_ServerShouldEndCommunication_CuzClientIsSlow() is a representation of a slow client, it has a Task.Delay(5000), in this case the server should end communication cuz the client is too slow. 2. Clinet_ShouldWorkFine() representa a HappyPath ie. a well behaving client, server should be able to exchange data with Clinet_ShouldWorkFine() without any errors/exceptions.
  • Should there be an AsyncCallback here instead of null? Almost all similar named pipe code I see has the callback in place. Just wondering why it's omitted here?
  • Great suggestion Hans. I've used CancelIo in my unmanaged applications, not sure why it didn't occur to me to use it in my managed application.
  • How does the Sever.Disconnect helps here on Cancellation invocation. OP is looking for the Exception, which is not happening
  • if the server's pipe is closed the Clinet() method will throw exception. as the expected result is.
  • Genuine exception should come from the server, client should not manufacture exception, based on simple event like Disconnect
  • @Milad, thanks for that. I'm actually currently using cancellationToken.Register(server.Disconnect); as a workaround and it does handle the initial case I have posted correctly. However it does not work when the client does not read the data from the pipe. In this case server waits on the WriteAsync and when server.Disconnect happens WriteAsync throws PipeClosedException or simillar.