Creating a dispatch queue / thread handler in C++ with pipes: FIFOs overfilling

Threads are resource-heavy to create and use, so often a pool of threads will be reused for asynchronous tasks. A task is packaged up, and then "posted" to a broker that will enqueue the task on the next available thread.

This is the idea behind dispatch queues (i.e. Apple's Grand Central Dispatch), and thread handlers (Android's Looper mechanism).

Right now, I'm trying to roll my own. In fact, I'm plugging a gap in Android whereby there is an API for posting tasks in Java, but not in the native NDK. However, I'm keeping this question platform independent where I can.

Pipes are the ideal choice for my scenario. I can easily poll the file descriptor of the read-end of a pipe(2) on my worker thread, and enqueue tasks from any other thread by writing to the write-end. Here's what that looks like:

int taskRead, taskWrite;

void setup() {
    // Create the pipe
    int taskPipe[2];
    ::pipe(taskPipe);
    taskRead = taskPipe[0];
    taskWrite = taskPipe[1];

    // Set up a routine that is called when task_r reports new data
    function_that_polls_file_descriptor(taskRead, []() {
        // Read the callback data
        std::function<void(void)>* taskPtr;
        ::read(taskRead, &taskPtr, sizeof(taskPtr));

        // Run the task - this is unsafe! See below.
        (*taskPtr)();

        // Clean up
        delete taskPtr;
    });
}

void post(const std::function<void(void)>& task) {
    // Copy the function onto the heap
    auto* taskPtr = new std::function<void(void)>(task);

    // Write the pointer to the pipe - this may block if the FIFO is full!
    ::write(taskWrite, &taskPtr, sizeof(taskPtr));
}

This code puts a std::function on the heap, and passes the pointer to the pipe. The function_that_polls_file_descriptor then calls the provided expression to read the pipe and execute the function. Note that there are no safety checks in this example.

This works great 99% of the time, but there is one major drawback. Pipes have a limited size, and if the pipe is filled, then calls to post() will hang. This in itself is not unsafe, until a call to post() is made within a task.

auto evil = []() {
    // Post a new task back onto the queue
    post({});
    // Not enough new tasks, let's make more!
    for (int i = 0; i < 3; i++) {
        post({});
    }

    // Now for each time this task is posted, 4 more tasks will be added to the queue.
});

post(evil);
post(evil);
...

If this happens, then the worker thread will be blocked, waiting to write to the pipe. But the pipe's FIFO is full, and the worker thread is not reading anything from it, so the entire system is in deadlock.

What can be done to ensure that calls to post() eminating from the worker thread always succeed, allowing the worker to continue processing the queue in the event it is full?

Thanks to all the comments and other answers in this post, I now have a working solution to this problem.

The trick I've employed is to prioritise worker threads by checking which thread is calling post(). Here is the rough algorithm:

pipe ← NON-BLOCKING-PIPE()
overflow ← Ø
POST(task)
    success ← WRITE(task, pipe)
    IF NOT success THEN
        IF THREAD-IS-WORKER() THEN
            overflow ← overflow ∪ {task}
        ELSE
            WAIT(pipe)
            POST(task)

Then on the worker thread:

LOOP FOREVER
    task ← READ(pipe)
    RUN(task)

    FOR EACH overtask ∈ overflow
        RUN(overtask)

    overflow ← Ø

The wait is performed with pselect(2), adapted from the answer by @Sigismondo.

Here's the algorithm implemented in my original code example that will work for a single worker thread (although I haven't tested it after copy-paste). It can be extended to work for a thread pool by having a separate overflow queue for each thread.

int taskRead, taskWrite;

// These variables are only allowed to be modified by the worker thread
std::__thread_id workerId;
std::queue<std::function<void(void)>> overflow;
bool overflowInUse;

void setup() {
    int taskPipe[2];
    ::pipe(taskPipe);
    taskRead = taskPipe[0];
    taskWrite = taskPipe[1];

    // Make the pipe non-blocking to check pipe overflows manually
    ::fcntl(taskWrite, F_SETFL, ::fcntl(taskWrite, F_GETFL, 0) | O_NONBLOCK);

    // Save the ID of this worker thread to compare later
    workerId = std::this_thread::get_id();
    overflowInUse = false;

    function_that_polls_file_descriptor(taskRead, []() {
        // Read the callback data
        std::function<void(void)>* taskPtr;
        ::read(taskRead, &taskPtr, sizeof(taskPtr));

        // Run the task
        (*taskPtr)();
        delete taskPtr;

        // Run any tasks that were posted to the overflow
        while (!overflow.empty()) {
            taskPtr = overflow.front();
            overflow.pop();

            (*taskPtr)();
            delete taskPtr;
        }

        // Release the overflow mechanism if applicable
        overflowInUse = false;
    });
}

bool write(std::function<void(void)>* taskPtr, bool blocking = true) {
    ssize_t rc = ::write(taskWrite, &taskPtr, sizeof(taskPtr));

    // Failure handling
    if (rc < 0) {
        // If blocking is allowed, wait for pipe to become available
        int err = errno;
        if ((errno == EAGAIN || errno == EWOULDBLOCK) && blocking) {
            fd_set fds;
            FD_ZERO(&fds);
            FD_SET(taskWrite, &fds);

            ::pselect(1, nullptr, &fds, nullptr, nullptr, nullptr);

            // Try again
            return write(tdata);
        }

        // Otherwise return false
        return false;
    }

    return true;
}

void post(const std::function<void(void)>& task) {
    auto* taskPtr = new std::function<void(void)>(task);

    if (std::this_thread::get_id() == workerId) {
        // The worker thread gets 1st-class treatment.
        // It won't be blocked if the pipe is full, instead
        // using an overflow queue until the overflow has been cleared.
        if (!overflowInUse) {
            bool success = write(taskPtr, false);
            if (!success) {
                overflow.push(taskPtr);
                overflowInUse = true;
            }
        } else {
            overflow.push(taskPtr);
        }
    } else {
        write(taskPtr);
    }
}

Updated: 20191019 I previously introduced the concept of dispatch queues and walked through the creation of a simple C++ dispatch queue implementation. The original dispatch queue example is implemented using std::mutex, std::thread, and std::condition_variable. Today I’d like to demonstrate the creation of a dispatch queue using ThreadX RTOS primitives instead of the C++ builtin types

Make the pipe write file descriptor non-blocking, so that write fails with EAGAIN when the pipe is full.


One improvement is to increase the pipe buffer size.

Another is to use a UNIX socket/socketpair and increase the socket buffer size.

Yet another solution is to use a UNIX datagram socket which many worker threads can read from, but only one gets the next datagram. In other words, you can use a datagram socket as a thread dispatcher.

Updated: 20190913 We previously provided an implementation of a dispatch queue using ThreadX RTOS primitives. In this article, I’ll provide an example C++ dispatch queue implementation using the popular FreeRTOS. We’ll start with a review of what dispatch queues are. If you’re familiar with them, feel free to skip to the following section. Table of … Continue reading "Implementing an

You can use the old good select to determine whether the file descriptors are ready to be used for writing:

The file descriptors in writefds will be watched to see if space is available for write (though a large write may still block).

Since you are writing a pointer, your write() cannot be classified as large at all.

Clearly you must be ready to handle the fact that a post may fail, and then be ready to retry it later... otherwise you will be facing indefinitely growing pipes, until you system will break again.

More or less (not tested):

bool post(const std::function<void(void)>& task) {
    bool post_res = false;

    // Copy the function onto the heap
    auto* taskPtr = new std::function<void(void)>(task);

    fd_set wfds;
    struct timeval tv;
    int retval;

    FD_ZERO(&wfds);
    FD_SET(taskWrite, &wfds);

    // Don't wait at all
    tv.tv_sec = 0;
    tv.tv_usec = 0;

    retval = select(1, NULL, &wfds, NULL, &tv);
    // select() returns 0 when no FD's are ready
    if (retval == -1) {
      // handle error condition
    } else if (retval > 0) {
      // Write the pointer to the pipe. This write will succeed
      ::write(taskWrite, &taskPtr, sizeof(taskPtr));
      post_res = true;
    }
    return post_res;
}

Updated: 20200602 Previously, I introduced the concept of dispatch queues. Here’s a quick review of what a dispatch queue is, in case you haven’t read the previous article: The dispatcher contains multiple generic-use threads and a work queue. Threads can dispatch single functional operations to run asynchronously on the dispatch work threads. Apple is encouraging … Continue reading

The simplest way to achieve that is to create a Thread and run the event queue's dispatch method in the thread: #include "mbed.h" // Create a queue that can hold a maximum of 32 events EventQueue queue(32 * EVENTS_EVENT_SIZE); // Create a thread that'll run the event queue's dispatch function Thread t; int main { // Start the event queue's

A dispatch work item encapsulates work to be performed on a dispatch queue or within a dispatch group. You can also use a work item as a dispatch source event, registration, or cancellation handler.

In this article we will discuss how to create threads in C++11 using std::thread. Introduction to C++11 Thread Library. Original C++ Standard supported only single thread programming. The new C++ Standard (referred to as C++11 or C++0x) was published in 2011. In C++11 a new thread library is introduced. Compilers Required:

Comments
  • Iirc file descriptors can be marked non-blocking, which would have write fail with EAGAIN in case of a full FIFO. This can be achieved with int flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK);
  • @IGarFieldI Thanks, but consider that this could result in posted tasks never being run if the queue is full. This is ok for some applications, but not for making thread handlers, where you want the guarantee that the task will be run eventually. I imagine that the solution will make use of non-blocking writes in some way though.
  • I would avoid to use a pipe to communicate between threads: you are involving OS feature not needed for inter-thread communication. for example a std::queue and a std::mutex would serve the same purpose, with more flexibility and less hassle
  • @AndrewHenle That's roughly what I'm getting at. Not saying pipes are at fault, but I need a way of not writing to the worker's own job queue without that filling up just the same.
  • I think that you cannot design your system without considering that the producer can produce data faster than the consumer. Pipes will block because of OS constraints, std::queue because of system limits eventually. when using messaging it's said that queues should be always empty: the consumer must be faster than the producer. If the opposite is happening (ie: an arbitrary limit has been surpassed) you are facing an error condition and you must handle it. If you don't do it, problems will happens. No architecture can guarantee to pile messages uncontrollably.
  • A more performant solution would be to use a mutex and a condition variable. This way you would minimize system calls and eliminate copying data between user-space and kernel. Also, you should just use a blocking write and remove the rest of the code in your write function because it effectively tries to do a blocking write with more code.
  • @MaximEgorushkin Could you please explain where would the mutex + condition go? As for the blocking write, this is true except for the flag on the write function that enables or disables blocking.
  • That would be a totally different question not deserving to be treated by a comment.
  • @MaximEgorushkin Could you write it in an answer?
  • When I have some time.
  • It's clear to see that just increasing the size won't work for the test case - at some point, it will fail, it's just a matter of when. The multiple worker threads is more interesting though.
  • @CJxD Yes and no and it depends.
  • Thanks for this answer. Does it have any specific advantage over using ::write with O_NONBLOCK? Also, the real issue lies in how to handle this error condition. If it just put the function in another queue, then that queue will fill up just the same.
  • using select is the old-fashioned way. select was born before threads for example. It just works very fine. O_NONBLOCK is really more modern and doesn't suffer of the drawback that write afterward MAY block if it's going to write a large block.