Posts libbus: A concurrent message passing library
Post
Cancel

libbus: A concurrent message passing library

Earlier this week I wrote, over the course of an evening, a very simple library to implement basic shared message passing called libbus. In this blog post I want to delve a bit deeper into the rationale and inner workings of this library, and review some hypothetical use cases.

Problem to solve

In multithreaded environments, message passing is something bound to happen. Programming languages with built-in concurrency have different mechanisms to achieve message passing; a common example are concurrent queues, where any number of threads can write to or read from such queue without race conditions. Usually, a thread owns a job queue from which it reads tasks in a loop. Concurrent queues are a deeply studied subject; I explored this field briefly when I wrote tlock-queue, a two lock queue based on a paper called Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.

C does not have built-in support for threads until the C11 standard, and even then support is not guaranteed (as indicated by the macro constant __STDC_NO_THREADS__). On top of that, only the basic concurrent data structures are provided, such as locks and conditions. You can use my queue or any other implementation to get over this issue. However, there are times when the number of threads is big and/or each needs its own job queue.

Imagine a program that implements some sort of pipeline processing, where Thread 1 reads some bytes from a file or a socket, does some processing and then sends the data to Thread 2, which does some processing as well, then sends the data to Thread 3 and so on. In this case, each thread needs a reference to the next thread’s job queue to send data to it, which is not an ideal system to manage.

Take a different example, where single threads might need to send messages to all of the other threads, like a shutdown signal. In this case, each thread needs a reference to every single thread’s message queue. Even then, what if one of the threads needs to prematurely shut down? It would need to send a message to all other threads, saying “I’m shutting down, do not send more messages to my queue”. Or how about a scenario where a new thread needs to be spawned? How can it start receiving messages from the already running threads? Soon enough you end up managing a shared variable that accounts for the number of running threads, and sending such amount of messages to a common queue to ensure that every thread receives the signal.

libbus solves these issues by implementing a concurrent data structure, which I call a bus. Independent clients can register their callbacks against this bus to receive messages directed to them. Any user with a reference to the bus can send messages to a registered client by just knowing the ID of the destination client. Users can also communicate with all of these clients at once through broadcast messaging.

Using this model, the program in the first example above would just need to know the ID of the next thread in the pipeline (which might very well be its own ID plus 1). The program in the second example is simplified as well, as threads can send broadcast messages. Clients can register and unregister their callbacks at any point as well, solving issues with asynchronous thread startup and shutdown.

Message model

One thing to keep in mind is that, even though separate threads can register their own callbacks independently, when Thread 1 sends Thread 2 a message, Thread 2’s callback is executed in Thread 1’s context. The only way to execute Thread 2’s callback in that same thread would be to use some sort of low-level interrupt, which might break the callee’s execution flow. This is why, when you register a callback, that same callback needs to handle the actual message passing to the intended thread. The following diagram illustrates this concept. Note that all register, unregister and send operations use GCC’s atomic builtins to ensure synchronization on the underlying data structure.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
                      +----------+    +------------+
                      | Thread 1 |    | t2callback +---> queue_push(t2ctx->queue, msg)
                      +----+-----+    +------------+              
                           |                ^
  bus_send(bus, t2id, msg) |                |
                           |       +--------+  t2callback(t2ctx, msg)
                           v       |
                   +---------------+---------------------+
                   |                libbus               |
                   +-------------------------------------+
                      ^
                      | bus_register(bus, t2id,
                      |    t2callback, t2ctx)
                      |
                +-----+------+
                |  Thread 2  +--------------> queue_pop(t2ctx->queue)
                +------------+  loops over

                                     Time
  ------------------------------------------------------------------------------>

First, Thread 2 registers its callback with bus_register. This callback is of course user-defined, meaning that you control what gets done when a message is sent to Thread 2. After some time, Thread 1 needs to send a message to Thread 2, which is done through bus_send. This redirects Thread 1’s execution flow to Thread 2’s callback; as mentioned above, this gets executed in Thread 1’s context, meaning that the callback must handle the actual message passing to Thread 2. The simplest case is to push this message to Thread 2’s task queue, but you could do something else, like changing a shared variable to signal a condition.

Inner workings and other ideas

As you can probably tell by this point, libbus essentially works as a synchronized callback table. This allows for a very simple implementation (around 100 sloc) while having an extendable usability. By providing a generic callback API, clients can define specific behavior to be invoked when a message is sent to them.

As previously explained, callbacks are executed in the calling thread, not the callee. This means that if you care about performance, you should avoid performing long-running tasks in these callbacks, as they “hijack” the calling thread. So, how would you implement a true bus, where message passing is delegated? As mentioned previously, the usability of libbus is quite extendable; for this use case, you would need to spawn a thread that handles all of the bus operations. Regular threads would register their callbacks normally, but instead of sending messages through bus_send, they would need to communicate with the bus thread. This thread would take tuples in the form of (destination, message) tuples from other threads, and would call bus_send(bus, destination, message) with this information.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
            +-------------------------------+
            |            Thread 1           |
            +---------------------+---------+
                                  |   queue_push(
            +------------+        |      busctx->queue,
            |  Thread 2  |        |      {t2id, msg}
            +-----+------+        |   )
                  |               v
                  |        +------------------+
    bus_register..|        |    Bus Thread    |
                  |        +--------+---------+
                  |                 |              +---------------+
                  |  bus_send(bus,  |  +---------->|  t2callback   |
                  |    t2id, msg),  |  |           +---------------+
                  v                 v  |
            +--------------------------+-------+
            |            libbus                |
            +----------------------------------+

                          Time
 ------------------------------------------------------------>

In the figure above, Thread 2 registers its callback against the bus data structure directly. In order for Thread 1 to communicate with Thread 2, it must delegate the actual message passing to the Bus Thread, which handles requests from its task queue (busctx->queue).

Final remarks

I hope this post serves as an explanation on the rationale behind libbus and some of its use cases. If you have any suggestions or improvements for libbus, do not hesitate to open a pull request on GitHub.

This post is licensed under CC BY 4.0 by the author.