Centrifuge – real-time messaging with Go
An introduction to Centrifuge – real-time messaging with Go
In this post I'll try to introduce Centrifuge - the heart of Centrifugo.
Centrifuge is a real-time messaging library for the Go language.
This post is going to be pretty long (looks like I am a huge fan of long reads) – so make sure you also have a drink (probably two) and let's go!
How it's all started
I wrote several blog posts before (for example this one – yep, it's on Medium...) about an original motivation of Centrifugo server.
Centrifugo server is not the same as Centrifuge library for Go. It's a full-featured project built on top of Centrifuge library. Naming can be confusing, but it's not too hard once you spend some time with ecosystem.
In short – Centrifugo was implemented to help traditional web frameworks dealing with many persistent connections (like WebSocket or SockJS HTTP transports). So frameworks like Django or Ruby on Rails, or frameworks from the PHP world could be used on a backend but still provide real-time messaging features like chats, multiplayer browser games, etc for users. With a little help from Centrifugo.
Now there are cases when Centrifugo server used in conjunction even with a backend written in Go. While Go mostly has no problems dealing with many concurrent connections – Centrifugo provides some features beyond simple message passing between a client and a server. That makes it useful, especially since design is pretty non-obtrusive and fits well microservices world. Centrifugo is used in some well-known projects (like ManyChat, Yoola.io, Spot.im, Badoo etc).
At the end of 2018, I released Centrifugo v2 based on a real-time messaging library for Go language – Centrifuge – the subject of this post.
It was a pretty hard experience to decouple Centrifuge out of the monolithic Centrifugo server – I was unable to make all the things right immediately, so Centrifuge library API went through several iterations where I introduced backward-incompatible changes. All those changes targeted to make Centrifuge a more generic tool and remove opinionated or limiting parts.
So what is Centrifuge?
This is ... well, a framework to build real-time messaging applications with Go language. If you ever heard about socket.io – then you can think about Centrifuge as an analogue. I think the most popular applications these days are chats of different forms, but I want to emphasize that Centrifuge is not a framework to build chats – it's a generic instrument that can be used to create different sorts of real-time applications – real-time charts, multiplayer games.
The obvious choice for real-time messaging transport to achieve fast and cross-platform bidirectional communication these days is WebSocket. Especially if you are targeting a browser environment. You mostly don't need to use WebSocket HTTP polyfills in 2021 (though there are still corner cases so Centrifuge supports SockJS polyfill).
Centrifuge has its own custom protocol on top of plain WebSocket or SockJS frames.
The reason why Centrifuge has its own protocol on top of underlying transport is that it provides several useful primitives to build real-time applications. The protocol described as strict Protobuf schema. It's possible to pass JSON or binary Protobuf-encoded data over the wire with Centrifuge.
GRPC is very handy these days too (and can be used in a browser with a help of additional proxies), some developers prefer using it for real-time messaging apps – especially when one-way communication needed. It can be a bit better from integration perspective but more resource-consuming on server side and a bit trickier to deploy.
Take a look at WebTransport – a brand-new spec for web browsers to allow fast communication between a client and a server on top of QUIC – it may be a good alternative to WebSocket in the future. This in a draft status at the moment, but it's already possible to play with in Chrome.
Own protocol is one of the things that prove the framework status of Centrifuge. This dictates certain limits (for example, you can't just use an alternative message encoding) and makes developers use custom client connectors on a front-end side to communicate with a Centrifuge-based server (see more about connectors in ecosystem part).
But protocol solves many practical tasks – and here we are going to look at real-time features it provides for a developer.
Centrifuge Node
To start working with Centrifuge you need to start Centrifuge server Node. Node is a core of Centrifuge – it has many useful methods – set event handlers, publish messages to channels, etc. We will look at some events and channels concept very soon.
Also, Node abstracts away scalability aspects, so you don't need to think about how to scale WebSocket connections over different server instances and still have a way to deliver published messages to interested clients.
For now, let's start a single instance of Node that will serve connections for us:
node, err := centrifuge.New(centrifuge.DefaultConfig)
if err != nil {
log.Fatal(err)
}
if err := node.Run(); err != nil {
log.Fatal(err)
}
It's also required to serve a WebSocket handler – this is possible just by registering centrifuge.WebsocketHandler in HTTP mux:
wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})
http.Handle("/connection/websocket", wsHandler)
Now it's possible to connect to a server (using Centrifuge connector for a browser called centrifuge-js):
const centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket');
centrifuge.connect();
Though connection will be rejected by the server since we also need to provide authentication details – Centrifuge expects explicitly provided connection Credentials to accept connection.
Authentication
Let's look at how we can tell Centrifuge details about connected user identity, so it could accept an incoming connection.
There are two main ways to authenticate client connection in Centrifuge.
The first one is over the native middleware mechanism. It's possible to wrap centrifuge.WebsocketHandler or centrifuge.SockjsHandler with middleware that checks user authentication and tells Centrifuge current user ID over context.Context:
func auth(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cred := ¢rifuge.Credentials{
UserID: "42",
}
newCtx := centrifuge.SetCredentials(r.Context(), cred)
r = r.WithContext(newCtx)
h.ServeHTTP(w, r)
})
}
So WebsocketHandler can be registered this way (note that a handler now wrapped by auth middleware):
wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})
http.Handle("/connection/websocket", auth(wsHandler))
Another authentication way is a bit more generic – developers can authenticate connection based on custom token sent from a client inside first WebSocket/SockJS frame. This is called connect frame in terms of Centrifuge protocol. Any string token can be set – this opens a way to use JWT, Paceto, and any other kind of authentication tokens. For example see an authenticaton with JWT.
BTW it's also possible to pass any information from client side with a first connect message from client to server and return custom information about server state to a client. This is out of post scope though.
Nothing prevents you to integrate Centrifuge with OAuth2 or another framework session mechanism – like Gin for example.
Channel subscriptions
As soon as a client connected and successfully authenticated it can subscribe to channels. Channel (room or topic in other systems) is a lightweight and ephemeral entity in Centrifuge. Channel can have different features (we will look at some channel features below). Channels created automatically as soon as the first subscriber joins and destroyed as soon as the last subscriber left.
The application can have many real-time features – even on one app screen. So sometimes client subscribes to several channels – each related to a specific real-time feature (for example one channel for chat updates, one channel likes notification stream, etc).
Channel is just an ASCII string. A developer is responsible to find the best channel naming convention suitable for an application. Channel naming convention is an important aspect since in many cases developers want to authorize subscription to a channel on the server side – so only authorized users could listen to specific channel updates.
Let's look at a basic subscription example on the client-side:
centrifuge.subscribe('example', function(msgCtx) {
console.log(msgCtx)
})
On the server-side, you need to define subscribe event handler. If subscribe event handler not set then the connection won't be able to subscribe to channels at all. Subscribe event handler is where a developer may check permissions of the current connection to read channel updates. Here is a basic example of subscribe event handler that simply allows subscriptions to channel example for all authenticated connections and reject subscriptions to all other channels:
node.OnConnect(func(client *centrifuge.Client) {
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
if e.Channel != "example" {
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied)
return
}
cb(centrifuge.SubscribeReply{}, nil)
})
})
You may notice a callback style of reacting to connection related things. While not being very idiomatic for Go it's very practical actually. The reason why we use callback style inside client event handlers is that it gives a developer possibility to control operation concurrency (i.e. process sth in separate goroutines or goroutine pool) and still control the order of events. See an example that demonstrates concurrency control in action.
Now if some event published to a channel:
// Here is how we can publish data to a channel.
node.Publish("example", []byte(`{"input": "hello"}`))
– data will be delivered to a subscribed client, and message will be printed to Javascript console. PUB/SUB in its usual form.
Though Centrifuge protocol based on Protobuf schema in example above we published a JSON message into a channel. By default, we can only send JSON to connections since default protocol format is JSON. But we can switch to Protobuf-based binary protocol by connecting to ws://localhost:8000/connection/websocket?format=protobuf endpoint – then it's possible to send binary data to clients.
Async message passing
While Centrifuge mostly shines when you need channel semantics it's also possible to send any data to connection directly – to achieve bidirectional asynchronous communication, just what a native WebSocket provides.
To send a message to a server one can use the send method on the client-side:
centrifuge.send({"input": "hello"});
On the server-side data will be available inside a message handler:
client.OnMessage(func(e centrifuge.MessageEvent) {
log.Printf("message from client: %s", e.Data)
})
And vice-versa, to send data to a client use Send method of centrifuge.Client:
client.Send([]byte(`{"input": "hello"}`))
To listen to it on the client-side:
centrifuge.on('message', function(data) {
console.log(data);
});
RPC
RPC is a primitive for sending a request from a client to a server and waiting for a response (in this case all communication still happens via asynchronous message passing internally, but Centrifuge takes care of matching response data to request previously sent).
On client side it's as simple as:
const resp = await centrifuge.rpc('my_method', {});
On server side RPC event handler should be set to make calls available:
client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) {
if e.Method == "my_method" {
cb(centrifuge.RPCReply{Data: []byte(`{"result": "42"}`)}, nil)
return
}
cb(centrifuge.RPCReply{}, centrifuge.ErrorMethodNotFound)
})
Note, that it's possible to pass the name of RPC and depending on it and custom request params return different results to a client – just like a regular HTTP request but over asynchronous WebSocket (or SockJS) connection.
Server-side subscriptions
In many cases, a client is a source of knowledge which channels it wants to subscribe to on a specific application screen. But sometimes you want to control subscriptions to channels on a server-side. This is also possible in Centrifuge.
It's possible to provide a slice of channels to subscribe connection to at the moment of connection establishment phase:
node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
return centrifuge.ConnectReply{
Subscriptions: map[string]centrifuge.SubscribeOptions{
"example": {},
},
}, nil
})
Note, that OnConnecting does not follow callback-style – this is because it can only happen once at the start of each connection – so there is no need to control operation concurrency.
In this case on the client-side you will have access to messages published to channels by listening to on('publish') event:
centrifuge.on('publish', function(msgCtx) {
console.log(msgCtx);
});
Also, centrifuge.Client has Subscribe and Unsubscribe methods so it's possible to subscribe/unsubscribe client to/from channel somewhere in the middle of its long WebSocket session.
Windowed history in channel
Every time a message published to a channel it's possible to provide custom history options. For example:
node.Publish(
"example",
[]byte(`{"input": "hello"}`),
centrifuge.WithHistory(300, time.Minute),
)
In this case, Centrifuge will maintain a windowed Publication cache for a channel - or in other words, maintain a publication stream. This stream will have time retention (one minute in the example above) and the maximum size will be limited to the value provided during Publish (300 in the example above).
Every message inside a history stream has an incremental offset field. Also, a stream has a field called epoch – this is a unique identifier of stream generation - thus client will have a possibility to distinguish situations where a stream is completely removed and there is no guarantee that no messages have been lost in between even if offset looks fine.
Client protocol provides a possibility to paginate over a stream from a certain position with a limit:
const streamPosition = {'offset': 0, epoch: 'xyz'}
resp = await sub.history({since: streamPosition, limit: 10});
Iteration over history stream is a new feature which is just merged into Centrifuge master branch and can only be used from Javascript client at the moment.
Also, Centrifuge has an automatic message recovery feature. Automatic recovery is very useful in scenarios when tons of persistent connections start reconnecting at once. I already described why this is useful in one of my previous posts about Websocket scalability. In short – since WebSocket connections are stateful then at the moment of mass reconnect they can create a very big spike in load on your main application database. Such mass reconnects are a usual thing in practice - for example when you reload your load balancers or re-deploying the Websocket server (new code version).
Of course, recovery can also be useful for regular short network disconnects - when a user travels in the subway for example. But you always need a way to load an actual state from the main application database in case of an unsuccessful recovery.
To enable automatic recovery you can provide the Recover flag in subscribe options:
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Recover: true,
},
}, nil)
})
Obviously, recovery will work only for channels where history stream maintained. The limitation in recovery is that all missed publications sent to client in one protocol frame – pagination is not supported during recovery process. This means that recovery is mostly effective for not too long offline time without tons of missed messages.
Online presence and presence stats
Another cool thing Centrifuge exposes to developers is online presence information for channels. Presence information contains a list of active channel subscribers. This is useful to show the online status of players in a game for example.
Also, it's possible to turn on Join/Leave message feature inside channels: so each time connection subscribes to a channel all channel subscribers receive a Join message with client information (client ID, user ID). As soon as the client unsubscribes Leave message is sent to remaining channel subscribers with information who left a channel.
Here is how to enable both online presence and join/leave features for a subscription to channel:
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: true,
JoinLeave: true,
},
}, nil)
})
On a client-side then it's possible to call for the presence and setting event handler for join/leave messages.
The important thing to be aware of when using Join/Leave messages is that this feature can dramatically increase CPU utilization and overall traffic in channels with a big number of active subscribers – since on every client connect/disconnect event such Join or Leave message must be sent to all subscribers. The advice here – avoid using Join/Leave messages or be ready to scale (Join/Leave messages scale well when adding more Centrifuge Nodes – more about scalability below).
One more thing to remember is that online presence information can also be pretty expensive to request in channels with many active subscribers – since it returns information about all connections – thus payload in response can be large. To help a bit with this situation Centrifuge has a presence stats client API method. Presence stats only contain two counters: the number of active connections in the channel and amount of unique users in the channel.
If you still need to somehow process online presence in rooms with a massive number of active subscribers – then I think you better do it in near real-time - for example with fast OLAP like ClickHouse.