Skip to main content

Analytics with ClickHouse

This feature allows exporting information about connections, subscriptions and client operations to ClickHouse thus providing an integration with a real-time (with seconds delay) analytics storage. ClickHouse is super fast and simple to operate with, and it allows effective data keeping for a window of time.

This unlocks a great observability and possibility to perform various analytics queries for better user behavior understanding, check application correctness, building trends, reports and so on.

Configuration#

To enable integration with ClickHouse add the following section to a configuration file:

config.json
{    ...    "clickhouse_analytics": {        "enabled": true,        "clickhouse_dsn": [            "tcp://127.0.0.1:9000",            "tcp://127.0.0.1:9001",            "tcp://127.0.0.1:9002",            "tcp://127.0.0.1:9003"        ],        "clickhouse_database": "centrifugo",        "clickhouse_cluster": "centrifugo_cluster",        "export_connections": true,        "export_operations": true,        "export_http_headers": [            "User-Agent",            "Origin",            "X-Real-Ip",        ]    }}

All ClickHouse analytics options scoped to clickhouse_analytics section of configuration.

Toggle this feature using enabled boolean option.

tip

While we have a nested configuration here it's still possible to use environment variables to set options. For example, use CENTRIFUGO_CLICKHOUSE_ANALYTICS_ENABLED env var name for configure enabled option mentioned above. I.e. nesting expressed as _ in Centrifugo.

Centrifugo can export data to different ClickHouse instances, addresses of ClickHouse can be set over clickhouse_dsn option.

You also need to set a ClickHouse cluster name (clickhouse_cluster) and database name clickhouse_database.

export_connections tells Centrifugo to export connection information snapshots. Information about connection will be exported once a connection established and then periodically while connection alive. See below on table structure to see which fields are available.

export_operations tells Centrifugo to export individual client operation information. See below on table structure to see which fields are available.

export_http_headers is a list of HTTP headers to export for connection information.

export_grpc_metadata is a list of metadata keys to export for connection information for GRPC unidirectional transport.

Connections table#

SHOW CREATE TABLE centrifugo.connections;
β”Œβ”€statement───────────────────────────────────────────────────────────────────────────────────────┐│ CREATE TABLE centrifugo.connections(    `client` FixedString(36),    `user` String,    `name` String,    `version` String,    `transport` String,    `channels` Array(String),    `headers.key` Array(String),    `headers.value` Array(String),    `metadata.key` Array(String),    `metadata.value` Array(String),    `time` DateTime)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/connections', '{replica}')PARTITION BY toYYYYMMDD(time)ORDER BY timeTTL time + toIntervalDay(1)SETTINGS index_granularity = 8192 β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

And distributed one:

SHOW CREATE TABLE centrifugo.connections_distributed;
β”Œβ”€statement───────────────────────────────────────────────────────────────────────────────────────┐│ CREATE TABLE centrifugo.connections_distributed(    `client` FixedString(36),    `user` String,    `name` String,    `version` String,    `transport` String,    `channels` Array(String),    `headers.key` Array(String),    `headers.value` Array(String),    `metadata.key` Array(String),    `metadata.value` Array(String),    `time` DateTime)ENGINE = Distributed('centrifugo_cluster', 'centrifugo', 'connections', murmurHash3_64(client)) β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Operations table#

SHOW CREATE TABLE centrifugo.operations;
β”Œβ”€statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐│ CREATE TABLE centrifugo.operations(    `client` FixedString(36),    `user` String,    `op` String,    `channel` String,    `method` String,    `error` UInt32,    `disconnect` UInt32,    `duration` UInt64,    `time` DateTime)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/operations', '{replica}')PARTITION BY toYYYYMMDD(time)ORDER BY timeTTL time + toIntervalDay(1)SETTINGS index_granularity = 8192 β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

And distributed one:

SHOW CREATE TABLE centrifugo.operations_distributed;
β”Œβ”€statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐│ CREATE TABLE centrifugo.operations_distributed(    `client` FixedString(36),    `user` String,    `op` String,    `channel` String,    `method` String,    `error` UInt32,    `disconnect` UInt32,    `duration` UInt64,    `time` DateTime)ENGINE = Distributed('centrifugo_cluster', 'centrifugo', 'operations', murmurHash3_64(client)) β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Query examples#

Show unique users which were connected:

SELECT DISTINCT userFROM centrifugo.connections_distributed;
β”Œβ”€user─────┐│ user_1   β”‚β”‚ user_2   β”‚β”‚ user_3   β”‚β”‚ user_4   β”‚β”‚ user_5   β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Show total number of publication attempts which were throttled by Centrifugo (received Too many requests error with code 111):

SELECT COUNT(*)FROM centrifugo.operations_distributedWHERE (error = 111) AND (op = 'publish');
β”Œβ”€count()─┐│    4502 β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The same for a specific user:

SELECT COUNT(*)FROM centrifugo.operations_distributedWHERE (error = 111) AND (op = 'publish') AND (user = 'user_200');
β”Œβ”€count()─┐│    1214 β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Show number of unique users subscribed to a specific channel in last 5 minutes (this is approximate since connections table contain periodic snapshot entries, clients could subscribe/unsubscribe in between snapshots – this is reflected in operations table):

SELECT COUNT(Distinct(user))FROM centrifugo.connections_distributedWHERE arrayExists(x -> (x = 'chat:index'), channels) AND (time >= (now() - toIntervalMinute(5)));
β”Œβ”€uniqExact(user)─┐│             101 β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Show top 10 users which called publish operation during last one minute:

SELECT    COUNT(op) AS num_ops,    userFROM centrifugo.operations_distributedWHERE (op = 'publish') AND (time >= (now() - toIntervalMinute(1)))GROUP BY userORDER BY num_ops DESCLIMIT 10;
β”Œβ”€num_ops─┬─user─────┐│      56 β”‚ user_200 β”‚β”‚      11 β”‚ user_75  β”‚β”‚       6 β”‚ user_87  β”‚β”‚       6 β”‚ user_65  β”‚β”‚       6 β”‚ user_39  β”‚β”‚       5 β”‚ user_28  β”‚β”‚       5 β”‚ user_63  β”‚β”‚       5 β”‚ user_89  β”‚β”‚       3 β”‚ user_32  β”‚β”‚       3 β”‚ user_52  β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Development#

The recommended way to run ClickHouse in production is with cluster. But during development you may want to run Centrifugo with single instance ClickHouse.

To do this set only one ClickHouse dsn and do not set cluster name:

config.json
{    ...    "clickhouse_analytics": {        "enabled": true,        "clickhouse_dsn": [            "tcp://127.0.0.1:9000"        ],        "clickhouse_database": "centrifugo",        "clickhouse_cluster": "",        "export_connections": true,        "export_operations": true,        "export_http_headers": [            "Origin",            "User-Agent"        ]    }}

Run ClickHouse locally:

docker run -it --rm -v /tmp/clickhouse:/var/lib/clickhouse -p 9000:9000 --name click yandex/clickhouse-server

Run ClickHouse client:

docker run -it --rm --link click:clickhouse-server yandex/clickhouse-client --host clickhouse-server

Issue queries:

:) SELECT * FROM centrifugo.operations
β”Œβ”€client───────────────────────────────┬─user─┬─op──────────┬─channel─────┬─method─┬─error─┬─disconnect─┬─duration─┬────────────────time─┐│ bd55ae3a-dd44-47cb-a4cc-c41f8e33803b β”‚ 2694 β”‚ connecting  β”‚             β”‚        β”‚     0 β”‚          0 β”‚   217894 β”‚ 2021-07-31 08:15:09 β”‚β”‚ bd55ae3a-dd44-47cb-a4cc-c41f8e33803b β”‚ 2694 β”‚ connect     β”‚             β”‚        β”‚     0 β”‚          0 β”‚        0 β”‚ 2021-07-31 08:15:09 β”‚β”‚ bd55ae3a-dd44-47cb-a4cc-c41f8e33803b β”‚ 2694 β”‚ subscribe   β”‚ $chat:index β”‚        β”‚     0 β”‚          0 β”‚    92714 β”‚ 2021-07-31 08:15:09 β”‚β”‚ bd55ae3a-dd44-47cb-a4cc-c41f8e33803b β”‚ 2694 β”‚ presence    β”‚ $chat:index β”‚        β”‚     0 β”‚          0 β”‚     3539 β”‚ 2021-07-31 08:15:09 β”‚β”‚ bd55ae3a-dd44-47cb-a4cc-c41f8e33803b β”‚ 2694 β”‚ subscribe   β”‚ test1       β”‚        β”‚     0 β”‚          0 β”‚     2402 β”‚ 2021-07-31 08:15:12 β”‚β”‚ bd55ae3a-dd44-47cb-a4cc-c41f8e33803b β”‚ 2694 β”‚ subscribe   β”‚ test2       β”‚        β”‚     0 β”‚          0 β”‚      634 β”‚ 2021-07-31 08:15:12 β”‚β”‚ bd55ae3a-dd44-47cb-a4cc-c41f8e33803b β”‚ 2694 β”‚ subscribe   β”‚ test3       β”‚        β”‚     0 β”‚          0 β”‚      412 β”‚ 2021-07-31 08:15:12 β”‚β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜