Built-in API command async consumers
In server API chapter we've shown how to execute various Centrifugo server API commands (publish, broadcast, etc.) over HTTP or GRPC. In many cases you will call those APIs from your application business logic synchronously. But to deal with temporary network and availability issues, and achieve reliable execution of API commands upon changes in your primary application database you may want to use queuing techniques and call Centrifugo API asynchronously.
Asynchronous delivery of real-time events upon changes in primary database may be done is several ways. Some companies use transactional outbox pattern, some using techniques like Kafka Connect with CDC (Change Data Capture) approach. The fact Centrifugo provides API allows users to implement any of those techniques and build worker which will send API commands to Centrifugo reliably.
But Centrifugo also provides some built-in asynchronous consumers to simplify the integration process.
Supported consumers
The following built-in async consumers are available at this point:
- Consumer from PostgreSQL outbox table
- Consumer from Kafka topics
How it works
By default, consumers expect to consume messages which represent Centrifugo server API commands. I.e. while in synchronous server API you are using HTTP or GRPC to send commands – with asynchronous consumers you are inserting API command to PostgreSQL outbox table, or delivering to Kafka topic – and it will be soon consumed and processed asynchronously by Centrifugo.
Async consumers only process commands which modify state – such as publish, broadcast, unsubscribe, disconnect, etc. Sending read commands for async execution simply does not make any sense, and they will be ignored. Also, batch method is not supported.
Some consumers also provide a way to listen for raw publications – i.e. when payload already contains a data ready to publish into channels. This can be useful when you have a system that already produces messages in a format ready to be published into channels. For example, Kafka consumer has a special mode for this.
Centrifugo only supports JSON payloads for asynchronous commands coming to consumers for now. If you need binary format – reach out with your use case.
If Centrifugo encounters an error while processing consumed messages – then internal errors will be retried, all other errors logged on error
level – and the message will be marked as processed. The processing logic for broadcast API is special: if any of the publications to any channel from broadcast channels
array failed – then the entire broadcast command will be retried. To prevent duplicate messages being published during such retries – consider using idempotency_key
in the broadcast command.
Our Chat/Messenger tutorial shows PostgreSQL outbox and Kafka consumer in action. It also shows techniques to avoid duplicate messages (idempotent publications) and deal with late message delivery (idempotent processing on client side). Whether you need those techniques – depends on the nature of app. Various real-time features may require different ways of sending real-time events. Both synchronous API calls and async calls have its own advantages and trade-offs. We also talk about this in Asynchronous message streaming to Centrifugo with Benthos blog post.
How to enable
Consumers can be set in the configuration using consumers
array:
{
"consumers": [
{
"enabled": true,
"name": "xxx",
"type": "postgresql",
"postgresql": {...}
},
{
"enabled": true,
"name": "yyy",
"type": "kafka",
"kafka": {...}
}
]
}