Skip to main content

Built-in API command async consumers

In server API chapter we've shown how to execute various Centrifugo server API commands (publish, broadcast, etc.) over HTTP or GRPC. In many cases you will call those APIs from your application business logic synchronously. But to deal with temporary network and availability issues, and achieve reliable execution of API commands upon changes in your primary application database you may want to use queuing techniques and call Centrifugo API asynchronously.

Asynchronous delivery of real-time events upon changes in primary database may be done is several ways. Some companies use transactional outbox pattern, some using techniques like Kafka Connect with CDC (Change Data Capture) approach. The fact Centrifugo provides API allows users to implement any of those techniques and build worker which will send API commands to Centrifugo reliably.

But Centrifugo also provides some built-in asynchronous consumers to simplify the integration process.

Supported consumers

The following built-in async consumers are available at this point:

Again, while built-in consumers can simplify integration, you still can use whatever queue system you need and integrate your own consumer with Centrifugo sending requests to server API.

We also recommend looking at Pitfalls of async publishing part in our previous blog post – while in many cases you get reliable at least once processing, you may come across some pitfalls in the process, being prepared and understanding them is important. Then depending on the real-time feature you can decide which approach is better – synchronous publishing or asynchronous integration.

How consumers work

By default, consumers expect to consume messages which represent Centrifugo server API commands. I.e. while in synchronous server API you are using HTTP or GRPC to send commands – with asynchronous consumers you are inserting API command to PostgreSQL outbox table, or delivering to Kafka topic – and it will be soon consumed and processed asynchronously by Centrifugo.

Async consumers only process commands which modify state – such as publish, broadcast, unsubscribe, disconnect, etc. Sending read commands for async execution simply does not make any sense, and they will be ignored. Also, batch method is not supported.

Centrifugo only supports JSON payloads for asynchronous commands coming to consumers for now. If you need binary format – reach out with your use case.

If Centrifugo encounters an error while processing consumed messages – then internal errors will be retried, all other errors logged on error level – and the message will be marked as processed. The processing logic for broadcast API is special: if any of the publications to any channel from broadcast channels array failed – then the entire broadcast command will be retried. To prevent duplicate messages being published during such retries – consider using idempotency_key in the broadcast command.

tip

Our Chat/Messenger tutorial shows PostgreSQL outbox and Kafka consumer in action. It also shows techniques to avoid duplicate messages (idempotent publications) and deal with late message delivery (idempotent processing on client side). Whether you need those techniques – depends on the nature of app. Various real-time features may require different ways of sending real-time events. Both synchronous API calls and async calls have its own advantages and trade-offs. We also talk about this in Asynchronous message streaming to Centrifugo with Benthos blog post.

Publication data mode

As mentioned, Centrifugo expects server API commands in received message content. Once the command consumed – it's processed in the same way as HTTP or GRPC server APIs process the request.

Sometimes though, you may have a system that already produces messages in a format ready to be published into Centrifugo channels. Most Centrifugo async consumers have a special mode to consume publications – called Publication Data Mode. In that case, payload of message must contain a data ready to be published into Centrifugo channels. Users can provide Centrifugo-specific publication fields like a list of channels to publish into in message headers/attributes. See documentation of each specific consumer to figure out exact option names. For example, Kafka consumer has publication data mode. And similar for other consumers.

Note, since you can provide many consumers in Centrifugo configuration - it's totally possible to have consumers working in different modes.

Ordering guarantees

Carefully read specific consumer documentation for understanding message processing ordering properties – ordered processing can be achieved with some of them, and can not with others.

How to enable

Consumers can be set in the configuration using consumers array:

config.json
{
"consumers": [
{
"enabled": true,
"name": "xxx",
"type": "postgresql",
"postgresql": {...}
},
{
"enabled": true,
"name": "yyy",
"type": "kafka",
"kafka": {...}
}
]
}

consumers[]

So consumers may be configured using consumers array on configuration top level.

On top level each consumer object in the consumers array has the following configuration options.

consumers[].enabled

Boolean. Default: false.

When set to true allows enabling the configured consumer.

consumers[].name

String. Default: "". Required.

Describes name of consumer. Must be unique for each consumer and match the regex ^[a-zA-Z0-9_]{2,} - i.e. latin symbols, digits and underscores and be at least 2 symbols. This name will be used for logging purposes, metrics, also to override some options with environment variables.

consumers[].type

String. Default: "". Required.

Type of consumer. At this point can be:

  • postgresql
  • kafka

Configure via env vars

To provide consumers over environment variable provide CENTRIFUGO_CONSUMERS var with JSON array serialized to string.

It's also possible to override consumer options over environment variables by using the name of consumer. For example:

CENTRIFUGO_CONSUMERS_<CONSUMER_NAME>_<OPTION_NAME>="???"

Or for specific type configuration:

CENTRIFUGO_CONSUMERS_<CONSUMER_NAME>_POSTGRESQL_<OPTION_NAME2>="???"

PostgreSQL outbox consumer

Centrifugo can natively integrate with PostgreSQL table for Transactional outbox pattern. The table in PostgreSQL must have predefined format Centrifugo expects:

CREATE TABLE IF NOT EXISTS centrifugo_outbox (
id BIGSERIAL PRIMARY KEY,
method text NOT NULL,
payload JSONB NOT NULL,
partition INTEGER NOT NULL default 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);

Then configure consumer of postgresql type in Centrifugo config:

{
...
"consumers": [
{
"enabled": true,
"name": "my_postgresql_consumer",
"type": "postgresql",
"postgresql": {
"dsn": "postgresql://user:password@localhost:5432/db",
"outbox_table_name": "centrifugo_outbox",
"num_partitions": 1,
"partition_select_limit": 100,
"partition_poll_interval": "300ms"
}
}
]
}

Here is how you can insert row in outbox table to publish into Centrifugo channel:

INSERT INTO centrifugo_outbox (method, payload, partition)
VALUES ('publish', '{"channel": "updates", "data": {"text": "Hello, world!"}}', 0);

Centrifugo supports LISTEN/NOTIFY mechanism of PostgreSQL to be notified about new data in the outbox table. To enable it you need first create a trigger in PostgreSQL:

CREATE OR REPLACE FUNCTION centrifugo_notify_partition_change()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('centrifugo_partition_change', NEW.partition::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER centrifugo_notify_partition_trigger
AFTER INSERT ON chat_outbox
FOR EACH ROW
EXECUTE FUNCTION centrifugo_notify_partition_change();

And then update consumer config – add "partition_notification_channel" option to it:

{
...
"consumers": [
{
"enabled": true,
"name": "my_postgresql_consumer",
"type": "postgresql",
"postgresql": {
...
"partition_notification_channel": "centrifugo_partition_change"
}
}
]
}

consumers[].postgresql

Options for consumer of postgresql type.

consumers[].postgresql.dsn

String. Default: "". Required.

DSN to PostgreSQL database, ex. "postgresql://user:password@localhost:5432/db". To override dsn over environment variables use CENTRIFUGO_CONSUMERS_<CONSUMER_NAME>_POSTGRESQL_DSN.

consumers[].postgresql.outbox_table_name

String. Default: "". Required.

The name of outbox table in selected database, ex. "centrifugo_outbox".

consumers[].postgresql.num_partitions

Integer. Default: 1.

The number of partitions to use. Centrifugo keeps strict order of commands per-partition by default. This option provides a way to create concurrent consumers each consuming from different partition of outbox table. Note, that partition numbers in start with 0, so when using 1 as num_partitions insert data with partition == 0 to the outbox table.

consumers[].postgresql.partition_select_limit

Integer. Default: 100.

Max number of commands to select in one query to outbox table.

consumers[].postgresql.partition_poll_interval

Duration. Default: "300ms".

Polling interval for each partition.

consumers[].postgresql.partition_notification_channel

String. Default: "".

Optional name of LISTEN/NOTIFY channel to trigger consuming upon data added to outbox partition.

consumers[].postgresql.tls

TLS object. By default, no TLS is used.

Client TLS configuration for PostgreSQL connection.

Kafka consumer

Another built-in consumer – is Kafka topics consumer. To configure Centrifugo to consume Kafka topic:

config.json
{
"consumers": [
{
"enabled": true,
"name": "my_kafka_consumer",
"type": "kafka",
"kafka": {
"brokers": ["localhost:9092"],
"topics": ["postgres.public.chat_cdc"],
"consumer_group": "centrifugo"
}
}
]
}

Then simply put message in the following format to Kafka topic:

{
"method": "publish",
"payload": {
"channel": "mychannel",
"data": {}
}
}

– and it will be consumed by Centrifugo and reliably processed.

Centrifugo preserves processing order within Kafka partitions.

consumers[].kafka

Options for consumer of kafka type.

consumers[].kafka.brokers

Array of string. Required.

Points Centrifugo to Kafka brokers. To override brokers over environment variables use CENTRIFUGO_CONSUMERS_<CONSUMER_NAME>_KAFKA_BROKERS – string with broker addresses separated by space.

consumers[].kafka.topics

Array of string. Required.

Tells which topics to consume.

consumers[].kafka.consumer_group

String. Required.

Sets the name of consumer group to use.

consumers[].kafka.max_poll_records

Integer. Default: 100.

Sets the maximum number of records to fetch from Kafka during a single poll operation.

consumers[].kafka.fetch_max_bytes

Integer. Default: 52428800 (50MB).

Sets the maximum number of bytes to fetch from Kafka in a single request. In many cases setting this to lower value can help with aggressive Kafka client memory usage under load.

consumers[].kafka.sasl_mechanism

String. Default: "".

SASL mechanism to use: "plain", "scram-sha-256", "scram-sha-512", "aws-msk-iam" are supported. Note, in case of "aws-msk-iam" Centrifugo uses sasl_user and sasl_password options as access key and secret key when configuring AWS auth.

consumers[].kafka.sasl_user

String. Default: "".

User for plain SASL auth. To override sasl_user over environment variables use CENTRIFUGO_CONSUMERS_<CONSUMER_NAME>_KAFKA_SASL_USER.

consumers[].kafka.sasl_password

String. Default: "".

Password for plain SASL auth. To override sasl_password over environment variables use CENTRIFUGO_CONSUMERS_<CONSUMER_NAME>_KAFKA_SASL_PASSWORD.

consumers[].kafka.tls

TLSConfig to configure Kafka client TLS.

consumers[].kafka.publication_data_mode

Publication data mode for Kafka consumer simplifies integrating Centrifugo with existing Kafka topics. By default, Centrifugo can integrate with Kafka topics but requires a special payload format, where each message in the topic represented a Centrifugo API command. This approach works well for Kafka topics specifically set up for Centrifugo.

When Publication Data Mode is enabled, Centrifugo expects messages in Kafka topics to contain data ready for direct publication, rather than server API commands. It is also possible to use special Kafka headers to specify the channels to which the data should be published.

The primary goal of this mode is to simplify Centrifugo's integration with existing Kafka topics, making it easier to deliver real-time messages to clients without needing to restructure the topic's payload format.

BTW, don't forget that since Centrifugo allows configuring an array of asynchronous consumers, it is possible to use Kafka consumers in different modes simultaneously.

To enable publication data mode:

config.json
{
"consumers": [
{
"enabled": true,
"name": "my_kafka_consumer",
"type": "kafka",
"kafka": {
"brokers": ["localhost:9092"],
"topics": ["my_topic"],
"consumer_group": "centrifugo",
"publication_data_mode": {
"enabled": true,
"channels_header": "x-centrifugo-channels"
"idempotency_key_header": "x-centrifugo-idempotency-key"
}
}
}
]
}

As you can see, channels to forward publication to may be provided as a value of a configured header. So you don't need to change payloads in topic to transform them to real-time messages with Centrifugo.

consumers[].kafka.publication_data_mode.enabled

Boolean. Default: false.

Enables Kafka publication data mode for the Kafka consumer.

consumers[].kafka.publication_data_mode.channels_header

String. Default: "".

Header name to extract channels to publish data into (channels must be comma-separated). Ex. of value: "channel1,channel2".

consumers[].kafka.publication_data_mode.idempotency_key_header

String. Default: "".

Header name to extract Publication idempotency key from Kafka message. See PublishRequest.

consumers[].kafka.publication_data_mode.delta_header

String. Default: "".

Header name to extract Publication delta flag from Kafka message which tells Centrifugo whether to use delta compression for message or not. See delta compression and PublishRequest.

Compatibility with Redpanda

Our local test suite for Kafka consumer passed with Redpanda v24.3.6, so it's generally compatible.

Nats Jetstream

Consumer from Nats Jetstream.

Note, message processing is unordered in case you have multiple Centrifugo instances consuming from Nats Jetstream (but possible if only one instance of Centrifugo consumes).

warning

Keep in mind, that Centrifugo does not create Nats Jetstream streams, it consumes from pre-created streams. Pay attention to a situation when Nats Jetstream streams are created in-memory or inside temporary directory of operating system. In such cases Nats streams may be unexpectedly lost at some point – this is a common mistake when working with Nats Jetstream.

consumers[].nats_jetstream

Type: NatsJetStreamConsumerConfig object

nats_jetstream allows defining options for consumer of nats_jetstream type.

consumers[].nats_jetstream.url

Type: string. Default: nats://127.0.0.1:4222

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_URL

url is the address of the NATS server.

consumers[].nats_jetstream.credentials_file

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_CREDENTIALS_FILE

credentials_file is the path to a NATS credentials file used for authentication (nats.UserCredentials). If provided, it overrides username/password and token.

consumers[].nats_jetstream.username

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_USERNAME

username is used for basic authentication (along with Password) if CredentialsFile is not provided.

consumers[].nats_jetstream.password

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_PASSWORD

password is used with Username for basic authentication.

consumers[].nats_jetstream.token

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_TOKEN

token is an alternative authentication mechanism if CredentialsFile and Username are not provided.

consumers[].nats_jetstream.stream_name

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_STREAM_NAME

stream_name is the name of the NATS JetStream stream to use.

consumers[].nats_jetstream.subjects

Type: []string

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_SUBJECTS

subjects is the list of NATS subjects (topics) to filter.

consumers[].nats_jetstream.durable_consumer_name

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_DURABLE_CONSUMER_NAME

durable_consumer_name sets the name of the durable JetStream consumer to use.

consumers[].nats_jetstream.deliver_policy

Type: string. Default: new

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_DELIVER_POLICY

deliver_policy is the NATS JetStream delivery policy for the consumer. By default, it is set to "new". Possible values: new, all.

consumers[].nats_jetstream.max_ack_pending

Type: int. Default: 100

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_MAX_ACK_PENDING

max_ack_pending is the maximum number of unacknowledged messages that can be pending for the consumer.

consumers[].nats_jetstream.method_header

Type: string. Default: centrifugo-method

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_METHOD_HEADER

method_header is the NATS message header used to extract the method name for dispatching commands. If provided in message, then payload must be just a serialized API request object.

consumers[].nats_jetstream.publication_data_mode

Type: NatsJetStreamPublicationDataModeConfig object

publication_data_mode configures extraction of pre-formatted publication data from message headers.

consumers[].nats_jetstream.publication_data_mode.enabled

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_PUBLICATION_DATA_MODE_ENABLED

enabled toggles publication data mode.

consumers[].nats_jetstream.publication_data_mode.channels_header

Type: string. Default: centrifugo-channels

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_PUBLICATION_DATA_MODE_CHANNELS_HEADER

channels_header is the name of the header that contains comma-separated channel names.

consumers[].nats_jetstream.publication_data_mode.idempotency_key_header

Type: string. Default: centrifugo-idempotency-key

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_PUBLICATION_DATA_MODE_IDEMPOTENCY_KEY_HEADER

idempotency_key_header is the name of the header that contains an idempotency key for deduplication.

consumers[].nats_jetstream.publication_data_mode.delta_header

Type: string. Default: centrifugo-delta

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_PUBLICATION_DATA_MODE_DELTA_HEADER

delta_header is the name of the header indicating whether the message represents a delta (partial update).

consumers[].nats_jetstream.publication_data_mode.version_header

Type: string. Default: centrifugo-version

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_PUBLICATION_DATA_MODE_VERSION_HEADER

version_header is the name of the header that contains the version of the message.

consumers[].nats_jetstream.publication_data_mode.version_epoch_header

Type: string. Default: centrifugo-version-epoch

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_PUBLICATION_DATA_MODE_VERSION_EPOCH_HEADER

version_epoch_header is the name of the header that contains the version epoch of the message.

consumers[].nats_jetstream.publication_data_mode.tags_header_prefix

Type: string. Default: centrifugo-tag-

Env: CENTRIFUGO_CONSUMERS_<NAME>_NATS_JETSTREAM_PUBLICATION_DATA_MODE_TAGS_HEADER_PREFIX

tags_header_prefix is the prefix used to extract dynamic tags from message headers.

consumers[].nats_jetstream.tls

Type: TLSConfig object

TLS object. By default, no TLS is used.

Redis Stream

Note, message processing is unordered in case you have multiple Centrifugo instances consuming from Redis Stream (but possible if only one instance of Centrifugo consumes and num_workers is 1).

consumers[].redis_stream

Type: RedisStreamConsumerConfig object

redis_stream allows defining options for consumer of redis_stream type.

consumers[].redis_stream.address

Type: []string. Default: redis://127.0.0.1:6379

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_ADDRESS

address is a list of Redis shard addresses. In most cases a single shard is used. But when many addresses provided Centrifugo will distribute keys between shards using consistent hashing.

consumers[].redis_stream.connect_timeout

Type: Duration. Default: 1s

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_CONNECT_TIMEOUT

connect_timeout is a timeout for establishing connection to Redis.

consumers[].redis_stream.io_timeout

Type: Duration. Default: 4s

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_IO_TIMEOUT

io_timeout is a timeout for all read/write operations against Redis (can be considered as a request timeout).

consumers[].redis_stream.db

Type: int. Default: 0

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_DB

db is a Redis database to use. Generally it's not recommended to use non-zero DB. Note, that Redis PUB/SUB is global for all databases in a single Redis instance. So when using non-zero DB make sure that different Centrifugo setups use different prefixes.

consumers[].redis_stream.user

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_USER

user is a Redis user.

consumers[].redis_stream.password

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PASSWORD

password is a Redis password.

consumers[].redis_stream.client_name

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_CLIENT_NAME

client_name allows changing a Redis client name used when connecting.

consumers[].redis_stream.force_resp2

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_FORCE_RESP2

force_resp2 forces use of Redis Resp2 protocol for communication.

consumers[].redis_stream.cluster_address

Type: []string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_CLUSTER_ADDRESS

cluster_address is a list of Redis cluster addresses. When several provided - data will be sharded between them using consistent hashing. Several Cluster addresses within one shard may be passed comma-separated.

consumers[].redis_stream.tls

Type: TLSConfig object

TLS object. By default, no TLS is used.

consumers[].redis_stream.sentinel_address

Type: []string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_SENTINEL_ADDRESS

sentinel_address allows setting Redis Sentinel addresses. When provided - Sentinel will be used. When multiple addresses provided - data will be sharded between them using consistent hashing. Several Sentinel addresses within one shard may be passed comma-separated.

consumers[].redis_stream.sentinel_user

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_SENTINEL_USER

sentinel_user is a Redis Sentinel user.

consumers[].redis_stream.sentinel_password

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_SENTINEL_PASSWORD

sentinel_password is a Redis Sentinel password.

consumers[].redis_stream.sentinel_master_name

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_SENTINEL_MASTER_NAME

sentinel_master_name is a Redis master name in Sentinel setup.

consumers[].redis_stream.sentinel_client_name

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_SENTINEL_CLIENT_NAME

sentinel_client_name is a Redis Sentinel client name used when connecting.

consumers[].redis_stream.sentinel_tls

Type: TLSConfig object

TLS object. By default, no TLS is used.

consumers[].redis_stream.streams

Type: []string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_STREAMS

streams to consume.

consumers[].redis_stream.consumer_group

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_CONSUMER_GROUP

consumer_group name to use.

consumers[].redis_stream.visibility_timeout

Type: Duration. Default: 30s

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_VISIBILITY_TIMEOUT

visibility_timeout is the time to wait for a message to be processed before it is re-queued.

consumers[].redis_stream.num_workers

Type: int. Default: 1

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_NUM_WORKERS

num_workers is the number of message workers to use for processing for each stream.

consumers[].redis_stream.payload_value

Type: string. Default: payload

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PAYLOAD_VALUE

payload_value is used to extract data from Redis Stream message.

consumers[].redis_stream.method_value

Type: string. Default: method

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_METHOD_VALUE

method_value is used to extract a method for command messages. If provided in message, then payload must be just a serialized API request object.

consumers[].redis_stream.publication_data_mode

Type: RedisStreamPublicationDataModeConfig object

publication_data_mode configures publication data mode.

consumers[].redis_stream.publication_data_mode.enabled

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PUBLICATION_DATA_MODE_ENABLED

enabled toggles publication data mode.

consumers[].redis_stream.publication_data_mode.channels_value

Type: string. Default: centrifugo-channels

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PUBLICATION_DATA_MODE_CHANNELS_VALUE

channels_value is used to extract channels to publish data into (channels must be comma-separated).

consumers[].redis_stream.publication_data_mode.idempotency_key_value

Type: string. Default: centrifugo-idempotency-key

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PUBLICATION_DATA_MODE_IDEMPOTENCY_KEY_VALUE

idempotency_key_value is used to extract Publication idempotency key from Redis Stream message.

consumers[].redis_stream.publication_data_mode.delta_value

Type: string. Default: centrifugo-delta

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PUBLICATION_DATA_MODE_DELTA_VALUE

delta_value is used to extract Publication delta flag from Redis Stream message.

consumers[].redis_stream.publication_data_mode.version_value

Type: string. Default: centrifugo-version

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PUBLICATION_DATA_MODE_VERSION_VALUE

version_value is used to extract Publication version from Redis Stream message.

consumers[].redis_stream.publication_data_mode.version_epoch_value

Type: string. Default: centrifugo-version-epoch

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PUBLICATION_DATA_MODE_VERSION_EPOCH_VALUE

version_epoch_value is used to extract Publication version epoch from Redis Stream message.

consumers[].redis_stream.publication_data_mode.tags_value_prefix

Type: string. Default: centrifugo-tag-

Env: CENTRIFUGO_CONSUMERS_<NAME>_REDIS_STREAM_PUBLICATION_DATA_MODE_TAGS_VALUE_PREFIX

tags_value_prefix is used to extract Publication tags from Redis Stream message.

Google Cloud PUB/SUB

Consumer from Google Cloud PUB/SUB.

Ordered processing is possible if OrderingKey is used and subscription created with ordering enabled. See in Google PUB/SUB docs

consumers[].google_pub_sub

Type: GooglePubSubConsumerConfig object

google_pub_sub allows defining options for consumer of google_pub_sub type.

consumers[].google_pub_sub.project_id

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_PROJECT_ID

Google Cloud project ID.

consumers[].google_pub_sub.subscriptions

Type: []string

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_SUBSCRIPTIONS

subscriptions is the list of Pub/Sub subscription ids to consume from.

consumers[].google_pub_sub.max_outstanding_messages

Type: int. Default: 100

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_MAX_OUTSTANDING_MESSAGES

max_outstanding_messages controls the maximum number of unprocessed messages.

consumers[].google_pub_sub.max_outstanding_bytes

Type: int. Default: 1000000

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_MAX_OUTSTANDING_BYTES

max_outstanding_bytes controls the maximum number of unprocessed bytes.

consumers[].google_pub_sub.auth_mechanism

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_AUTH_MECHANISM

auth_mechanism specifies which authentication mechanism to use: "default", "service_account".

consumers[].google_pub_sub.credentials_file

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_CREDENTIALS_FILE

credentials_file is the path to the service account JSON file if required.

consumers[].google_pub_sub.method_attribute

Type: string. Default: centrifugo-method

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_METHOD_ATTRIBUTE

method_attribute is an attribute name to extract a method name from the message. If provided in message, then payload must be just a serialized API request object.

consumers[].google_pub_sub.publication_data_mode

Type: GooglePubSubPublicationDataModeConfig object

publication_data_mode holds settings for the mode where message payload already contains data ready to publish into channels.

consumers[].google_pub_sub.publication_data_mode.enabled

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_PUBLICATION_DATA_MODE_ENABLED

enabled enables publication data mode.

consumers[].google_pub_sub.publication_data_mode.channels_attribute

Type: string. Default: centrifugo-channels

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_PUBLICATION_DATA_MODE_CHANNELS_ATTRIBUTE

channels_attribute is the attribute name containing comma-separated channel names.

consumers[].google_pub_sub.publication_data_mode.idempotency_key_attribute

Type: string. Default: centrifugo-idempotency-key

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_PUBLICATION_DATA_MODE_IDEMPOTENCY_KEY_ATTRIBUTE

idempotency_key_attribute is the attribute name for an idempotency key.

consumers[].google_pub_sub.publication_data_mode.delta_attribute

Type: string. Default: centrifugo-delta

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_PUBLICATION_DATA_MODE_DELTA_ATTRIBUTE

delta_attribute is the attribute name for a delta flag.

consumers[].google_pub_sub.publication_data_mode.version_attribute

Type: string. Default: centrifugo-version

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_PUBLICATION_DATA_MODE_VERSION_ATTRIBUTE

version_attribute is the attribute name for a version.

consumers[].google_pub_sub.publication_data_mode.version_epoch_attribute

Type: string. Default: centrifugo-version-epoch

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_PUBLICATION_DATA_MODE_VERSION_EPOCH_ATTRIBUTE

version_epoch_attribute is the attribute name for a version epoch.

consumers[].google_pub_sub.publication_data_mode.tags_attribute_prefix

Type: string. Default: centrifugo-tag-

Env: CENTRIFUGO_CONSUMERS_<NAME>_GOOGLE_PUB_SUB_PUBLICATION_DATA_MODE_TAGS_ATTRIBUTE_PREFIX

tags_attribute_prefix is the prefix for attributes containing tags.

AWS SQS

Consumer from Amazon Simple Queue Service.

Ordered processing is possible with FIFO queue and when using group IDs. See in AWS docs.

consumers[].aws_sqs

Type: AwsSqsConsumerConfig object

aws_sqs allows defining options for consumer of aws_sqs type.

consumers[].aws_sqs.queues

Type: []string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_QUEUES

queues is a list of SQS queue URLs to consume.

consumers[].aws_sqs.sns_envelope

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_SNS_ENVELOPE

sns_envelope, when true, expects messages to be wrapped in an SNS envelope – this is required when consuming from SNS topics with SQS subscriptions.

consumers[].aws_sqs.region

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_REGION

region is the AWS region.

consumers[].aws_sqs.max_number_of_messages

Type: int32. Default: 10

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_MAX_NUMBER_OF_MESSAGES

max_number_of_messages is the maximum number of messages to receive per poll.

consumers[].aws_sqs.wait_time_time

Type: Duration. Default: 20s

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_WAIT_TIME_TIME

wait_time_time is the long-poll wait time. Rounded to seconds internally.

consumers[].aws_sqs.visibility_timeout

Type: Duration. Default: 30s

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_VISIBILITY_TIMEOUT

visibility_timeout is the time a message is hidden from other consumers. Rounded to seconds internally.

consumers[].aws_sqs.max_concurrency

Type: int. Default: 1

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_MAX_CONCURRENCY

max_concurrency defines max concurrency during message batch processing.

consumers[].aws_sqs.credentials_profile

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_CREDENTIALS_PROFILE

credentials_profile is a shared credentials profile to use.

consumers[].aws_sqs.assume_role_arn

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_ASSUME_ROLE_ARN

assume_role_arn, if provided, will cause the consumer to assume the given IAM role.

consumers[].aws_sqs.method_attribute

Type: string. Default: centrifugo-method

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_METHOD_ATTRIBUTE

method_attribute is the attribute name to extract a method for command messages. If provided in message, then payload must be just a serialized API request object.

consumers[].aws_sqs.localstack_endpoint

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_LOCALSTACK_ENDPOINT

localstack_endpoint if set enables using localstack with provided URL.

consumers[].aws_sqs.publication_data_mode

Type: AWSPublicationDataModeConfig object

publication_data_mode holds settings for the mode where message payload already contains data ready to publish into channels.

consumers[].aws_sqs.publication_data_mode.enabled

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_PUBLICATION_DATA_MODE_ENABLED

enabled enables publication data mode.

consumers[].aws_sqs.publication_data_mode.channels_attribute

Type: string. Default: centrifugo-channels

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_PUBLICATION_DATA_MODE_CHANNELS_ATTRIBUTE

channels_attribute is the attribute name containing comma-separated channel names.

consumers[].aws_sqs.publication_data_mode.idempotency_key_attribute

Type: string. Default: centrifugo-idempotency-key

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_PUBLICATION_DATA_MODE_IDEMPOTENCY_KEY_ATTRIBUTE

idempotency_key_attribute is the attribute name for an idempotency key.

consumers[].aws_sqs.publication_data_mode.delta_attribute

Type: string. Default: centrifugo-delta

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_PUBLICATION_DATA_MODE_DELTA_ATTRIBUTE

delta_attribute is the attribute name for a delta flag.

consumers[].aws_sqs.publication_data_mode.version_attribute

Type: string. Default: centrifugo-version

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_PUBLICATION_DATA_MODE_VERSION_ATTRIBUTE

version_attribute is the attribute name for a version of publication.

consumers[].aws_sqs.publication_data_mode.version_epoch_attribute

Type: string. Default: centrifugo-version-epoch

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_PUBLICATION_DATA_MODE_VERSION_EPOCH_ATTRIBUTE

version_epoch_attribute is the attribute name for a version epoch of publication.

consumers[].aws_sqs.publication_data_mode.tags_attribute_prefix

Type: string. Default: centrifugo-tag-

Env: CENTRIFUGO_CONSUMERS_<NAME>_AWS_SQS_PUBLICATION_DATA_MODE_TAGS_ATTRIBUTE_PREFIX

tags_attribute_prefix is the prefix for attributes containing tags.

Azure Service Bus

Consumer from Azure Service Bus.

Ordered processing is possible when using message sessions.

consumers[].azure_service_bus

Type: AzureServiceBusConsumerConfig object

azure_service_bus allows defining options for consumer of azure_service_bus type.

consumers[].azure_service_bus.connection_string

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_CONNECTION_STRING

connection_string is the full connection string used for connection-string–based authentication.

consumers[].azure_service_bus.use_azure_identity

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_USE_AZURE_IDENTITY

use_azure_identity toggles Azure Identity (AAD) authentication instead of connection strings.

consumers[].azure_service_bus.fully_qualified_namespace

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE

fully_qualified_namespace is the Service Bus namespace, e.g. "your-namespace.servicebus.windows.net".

consumers[].azure_service_bus.tenant_id

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_TENANT_ID

tenant_id is the Azure Active Directory tenant ID used with Azure Identity.

consumers[].azure_service_bus.client_id

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_CLIENT_ID

client_id is the Azure AD application (client) ID used for authentication.

consumers[].azure_service_bus.client_secret

Type: string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_CLIENT_SECRET

client_secret is the secret associated with the Azure AD application.

consumers[].azure_service_bus.queues

Type: []string

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_QUEUES

queues is the list of the Azure Service Bus queues to consume from.

consumers[].azure_service_bus.use_sessions

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_USE_SESSIONS

use_sessions enables session-aware message handling. All messages must include a SessionID; messages within the same session will be processed in order.

consumers[].azure_service_bus.max_concurrent_calls

Type: int. Default: 1

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_MAX_CONCURRENT_CALLS

max_concurrent_calls controls the maximum number of messages processed concurrently.

consumers[].azure_service_bus.max_receive_messages

Type: int. Default: 1

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_MAX_RECEIVE_MESSAGES

max_receive_messages sets the batch size when receiving messages from the queue.

consumers[].azure_service_bus.method_property

Type: string. Default: centrifugo-method

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_METHOD_PROPERTY

method_property is the name of the message property used to extract the method (for API command). If provided in message, then payload must be just a serialized API request object.

consumers[].azure_service_bus.publication_data_mode

Type: AzureServiceBusPublicationDataModeConfig object

publication_data_mode configures how structured publication-ready data is extracted from the message.

consumers[].azure_service_bus.publication_data_mode.enabled

Type: bool

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_PUBLICATION_DATA_MODE_ENABLED

enabled toggles the publication data mode.

consumers[].azure_service_bus.publication_data_mode.channels_property

Type: string. Default: centrifugo-channels

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_PUBLICATION_DATA_MODE_CHANNELS_PROPERTY

channels_property is the name of the message property that contains the list of target channels.

consumers[].azure_service_bus.publication_data_mode.idempotency_key_property

Type: string. Default: centrifugo-idempotency-key

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_PUBLICATION_DATA_MODE_IDEMPOTENCY_KEY_PROPERTY

idempotency_key_property is the property that holds an idempotency key for deduplication.

consumers[].azure_service_bus.publication_data_mode.delta_property

Type: string. Default: centrifugo-delta

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_PUBLICATION_DATA_MODE_DELTA_PROPERTY

delta_property is the property that represents changes or deltas in the payload.

consumers[].azure_service_bus.publication_data_mode.version_property

Type: string. Default: centrifugo-version

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_PUBLICATION_DATA_MODE_VERSION_PROPERTY

version_property is the property that holds the version of the message.

consumers[].azure_service_bus.publication_data_mode.version_epoch_property

Type: string. Default: centrifugo-version-epoch

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_PUBLICATION_DATA_MODE_VERSION_EPOCH_PROPERTY

version_epoch_property is the property that holds the version epoch of the message.

consumers[].azure_service_bus.publication_data_mode.tags_property_prefix

Type: string. Default: centrifugo-tag-

Env: CENTRIFUGO_CONSUMERS_<NAME>_AZURE_SERVICE_BUS_PUBLICATION_DATA_MODE_TAGS_PROPERTY_PREFIX

tags_property_prefix defines the prefix used to extract dynamic tags from message properties.