Skip to main content

Stream history and recovery

A real-time connection is stateful: the client holds a view of the world that it keeps up to date from a stream of messages. So the moment a connection drops, an awkward question appears โ€” "while I was gone, did I miss anything?"

Centrifugo can answer that question itself. When a channel keeps a short history, Centrifugo remembers recent publications and replays exactly the ones a returning client missed โ€” without touching your backend. This chapter is about how that works, why it's designed the way it is, and which configuration knobs control it.

It covers history and recovery for standard stream channels (the default subscription type). If you instead need keyed, synchronized state rather than a flowing message stream, see map subscriptions.

Why recovery mattersโ€‹

The naive answer to "did I miss anything?" is: ask the database. One client doing that on reconnect is nothing. But WebSocket apps don't lose one connection at a time โ€” a balancer reload, a deploy, or a network blip drops everyone at once, and they all reconnect within seconds. If every returning client queries your database to refresh its state, that reconnect storm turns into a thundering herd right when your system is already under stress.

Recovery breaks that link. Try toggling it:

A balancer reloaded โ€” every client reconnects at onceclientsCENTRIFUGOhistory broker#10#11#12#13โœ“ replayed from the brokerYOUR BACKENDappdatabasecalm โ€” not queried
Returning clients are served from the broker โ€” the database load from reconnects is ~0.

Because the missed messages are served from Centrifugo's fast history broker, the database load from a mass reconnect drops to roughly zero. For setups with hundreds of thousands or millions of connections this can be the difference between a smooth redeploy and an outage. This idea โ€” keeping a short, fast event stream per channel so clients can catch up without hitting the database โ€” is explored in depth in Scaling WebSocket (see also Massive reconnect).

The history streamโ€‹

Recovery is built on a simple structure: when history is enabled, every publication in a channel is appended to a stream โ€” a bounded, ordered, sliding window of recent messages.

Channel chat:42 history streamepoch: gWuY#8evicted#9pub#10pub#11pub#12pubtop offsetappendoffset++sliding window โ€” newest history_size publications, kept for history_ttlLose the stream (engine restart, eviction) and the epoch changes โ€” marking it a different stream.

Two values make the stream usable for catch-up:

  • offset โ€” an incremental uint64 stamped on each publication. It's the message's position in the stream, and what a client uses to say "I'm up to here."
  • epoch โ€” an arbitrary string identifying this particular stream. It matters because a stream can be lost and recreated (a Memory-engine node restarts, a broker is cleared). After that, offsets start over from the beginning โ€” so offset 10 in the new stream is a completely different message than offset 10 in the old one. A changed epoch is Centrifugo's way of saying "this is not the stream you were reading", so a stale offset is never trusted.

The window is deliberately bounded by two namespace options:

Both must be greater than zero to enable history โ€” setting only one does nothing. The design intent is that streams are ephemeral: they're created on the fly, they can expire, and they can be lost at any moment. That keeps history cheap, and it's why your main database should always remain the ultimate source of truth.

Where the data actually lives depends on the broker: the Memory broker keeps the stream in process memory (gone on restart); the Redis broker stores it in a Redis Stream, inheriting Redis' persistence; the PostgreSQL broker stores it in Postgres. All are fast enough to absorb reconnect traffic โ€” the trade-off between them is durability and operational fit, not whether recovery works. A separate history_meta_ttl controls how long the lightweight stream metadata (its epoch and top offset) survives โ€” kept longer than the data itself so a channel's identity outlives any single message.

History is a cache, not your source of truth

Think of a channel's history as a bounded cache of the most recent messages โ€” capped by history_size, aged out by history_ttl. It is deliberately not a durable message queue or an event log you can replay from the beginning of time, and it is never authoritative. It can be empty, truncated, or lost at any moment โ€” so your application database stays the source of truth, and history is the fast shortcut that saves you from hitting it on every reconnect.

tip

History is off by default. Enable it per namespace via channel options. Once on, it's available from both the server API and (with permission) the client API.

How recovery worksโ€‹

With history in place, recovery is a small, automatic protocol on top of it. The SDK does the bookkeeping โ€” for a bidirectional client you don't write any of this by hand:

  1. On subscribe, the server returns the stream's current epoch and top offset. The SDK stores them.
  2. As publications arrive, each carries the next offset; the SDK advances its saved position.
  3. The connection drops. Messages may be published while the client is away.
  4. On resubscribe, the SDK sends back the last seen epoch and offset.
  5. Centrifugo looks at the stream and decides whether it can fill the gap from that position. If it can, the missed publications come back in the subscribe reply โ€” in order and deduplicated โ€” and the client sees recovered: true. If it can't, it returns recovered: false and no publications.

That recovered flag is the important output. It lets the application stay cheap in the common case and fall back to a full state load from the backend only when recovery genuinely couldn't keep continuity.

The position-tracking in steps 1โ€“4 is done entirely by bidirectional SDKs โ€” they hold the epoch/offset and replay it on resubscribe, so your application code never deals with offsets at all. Unidirectional transports (SSE, HTTP-streaming, unidirectional WebSocket) can't drive this themselves; for them Centrifugo can still deliver the latest state on every (re)subscribe via cache recovery mode with auto_cache_recover.

The recovery decisionโ€‹

So when can Centrifugo recover? It comes down to two checks against the position the client sends back: is this the same stream (epoch matches), and is the gap fully present in history (no missing publications between the client's offset and the current top). Both must hold.

The scenarios below walk through what happens in practice โ€” and which configuration option governs each outcome:

client returnslast offset: 9epoch: gWuYstream now:epoch: gWuY#9had#10replay#11replay#12replay
โœ“ same epoch (stream intact)โœ“ no gap โ€” all missed pubs in history
recovered: true
Same stream, and every publication the client missed (#10โ€“#12) is still in history. Centrifugo replays them in order, deduplicated.
config ยท The gap fits within history_size and history_ttl.

The common thread: Centrifugo never delivers a partial recovery. If it can't prove the client's view can be made whole, it says so with recovered: false rather than handing over a stream with a silent hole โ€” and the application loads fresh state from its own database, exactly as it would on first load.

Positioning: detecting loss proactivelyโ€‹

Recovery handles the reconnect. Its sibling, positioning, handles the subtler case where a client stays connected but quietly falls behind โ€” PUB/SUB brokers deliver at-most-once, so a message can be dropped without the connection noticing.

With force_positioning on, Centrifugo periodically checks each client's position against the stream top. If it detects a client can no longer be in a valid position (a potential gap), it disconnects with the insufficient state code (3010) โ€” which is itself a reconnect signal, so the SDK comes back and runs the recovery flow above. Enabling recovery turns positioning on automatically, since the two work together: positioning notices the problem, recovery fixes it.

Two recovery modesโ€‹

Not every channel wants the same thing back. A chat needs every missed message; a "now playing" widget only needs the latest value. Centrifugo supports both via force_recovery_mode:

Same gap, two modesstreamforce_recovery_mode: "stream"replay every missed message, in ordermissed while away#10#11#12delivered on resubscribe#10#11#12chat ยท feeds ยท logscacheforce_recovery_mode: "cache"deliver only the latest โ€” the current statemissed while away#10#11#12delivered on resubscribe#10#11#12now-playing ยท prices ยท dashboards
  • stream (default) โ€” replay all missed publications in order. The right choice when each message is an event that matters on its own: chats, feeds, activity logs.
  • cache โ€” deliver only the single latest publication. The right choice when each publication is a complete snapshot of state and only the current one matters: prices, dashboards, presence, "now playing". Typically paired with history_size: 1.

Cache mode effectively turns a channel into a real-time key-value cache and can remove the "fetch initial state" step entirely. It has its own chapter with configuration details and the automatic variant for unidirectional clients: Cache recovery mode.

Using recovery in your appโ€‹

With a bidirectional SDK the application's only job is to react to the outcome on each (re)subscribe โ€” the SDK has already replayed any missed publications through the publication handler. The subscribed event carries two flags (protocol fields was_recovering / recovered):

  • wasRecovering โ€” the client asked to recover (it came back with a saved position). It says nothing about success.
  • recovered โ€” recovery actually succeeded: the gap was filled with no loss. This can be true with zero publications replayed โ€” if the client was already at the stream top, there was simply nothing to catch up on.

So wasRecovering: true with recovered: false is the meaningful "I tried to catch up but couldn't โ€” reload from the backend" signal:

sub.on('subscribed', (ctx) => {
if (ctx.wasRecovering && !ctx.recovered) {
loadOrdersFromBackend(); // continuity lost โ€” reload full state
}
});

One subtlety is worth knowing about for the common pattern of loading initial state from your own database, then subscribing: those two steps don't line up perfectly, so an update can slip through the gap between them (or arrive slightly late). There are a few ways to handle it โ€” a version-based reconciliation in your own code is one (walked through in reliable document state sync). If you'd rather not hand-roll it, some SDKs offer an optional getState callback that wires the same idea in for you โ€” read the stream position first, then load your data, and return the position so the SDK subscribes from exactly there and recovers on every reconnect:

const sub = client.newSubscription('orders:42', {
getState: async () => {
const pos = await api.getStreamPosition('orders:42'); // 1. capture position FIRST
renderOrders(await api.getOrders(42)); // 2. then load your data
return { offset: pos.offset, epoch: pos.epoch }; // 3. SDK recovers from here on
},
});
sub.on('publication', (ctx) => applyOrderUpdate(ctx.data));
sub.subscribe();

getState is one convenience for this, not a requirement. The broader point holds either way: when your own database is the source of truth and Centrifugo streams only the change events, you can combine the publication cache's reconnect-storm protection with a consistent view in every scenario. See app-owned state with stream subscriptions for more.

Configuration recapโ€‹

Everything above maps to a handful of namespace options:

OptionRole
history_sizeHow many publications the stream keeps (window size)
history_ttlHow long publications are kept (window age)
history_meta_ttlHow long the stream's epoch/offset metadata survives
force_recoveryMake subscriptions recoverable (implies positioning)
force_positioningDetect dropped messages on live connections (3010 on loss)
force_recovery_modestream (all messages) or cache (latest only)
client.recovery_max_publication_limitCap on publications recovered in one go (default 300)

A minimal recoverable namespace:

config.json
{
"channel": {
"namespaces": [
{
"name": "chat",
"history_size": 100,
"history_ttl": "300s",
"force_recovery": true
}
]
}
}

Recovery can also be requested per-subscription from the client side instead of forced namespace-wide โ€” in that case the client needs permission to access channel history.

Trade-offs and guidanceโ€‹

Recovery is intentionally scoped to be a fast, broker-backed continuity mechanism, not a durable mailbox. Keep that in mind:

  • Keep streams small and short. All missed publications come back in a single subscribe frame, so recovery is built for short disconnects โ€” surviving a reconnect storm, not catching a client up after an hour offline. Size and TTL should reflect that.
  • Always keep a backend fallback. Streams are ephemeral by design; on recovered: false (and on a fresh app load) the application should load full state from its own database. Recovery optimizes the common path โ€” it doesn't replace your source of truth.
  • Tolerate duplicates. Centrifugo currently returns recovered publications in order and without duplicates, but applications using recovery should be designed to tolerate an occasional repeat (e.g. a stable key in the payload).

Recovery shines for keeping the continuity of long-lived connections and shielding the backend from reconnect spikes. It's not the right tool for guaranteed, long-term delivery of every message โ€” for that, design around your database.

History iteration APIโ€‹

Automatic recovery is built on top of the same stream, but you can also read that stream yourself โ€” directly, with no subscription involved. Centrifugo exposes a history API from the server API (a history call) and from the client side (with history permission). Use it to page through recent messages, or to read just the current top offset + epoch when building your own positioning logic. It's built on three fields:

  • limit
  • since
  • reverse

Combining them lets you page through a stream in either direction:

history(limit: 0, since: null, reverse: false)     // just the current top offset + epoch
history(limit: -1, since: null, reverse: false) // from the beginning (up to client.history_max_publication_limit, default 300)
history(limit: -1, since: null, reverse: true) // from the end
history(limit: 10, since: null, reverse: false) // first 10
history(limit: 10, since: null, reverse: true) // last 10, newest first
history(limit: 10, since: {offset: 0, epoch: "epoch"}, reverse: false) // 10 after a known position
history(limit: 10, since: {offset: 11, epoch: "epoch"}, reverse: true) // 10 before a known position

Here's a Go program (using the gocent API library) that endlessly walks a stream, flipping direction each time it reaches an end โ€” not practical, but it shows the pagination pattern:

// Iterate by 10.
limit := 10
// Paginate in reversed order first, then invert it.
reverse := true
// Start with nil StreamPosition, then fill it with value while paginating.
var sp *gocent.StreamPosition

for {
historyResult, err = c.History(
ctx,
channel,
gocent.WithLimit(limit),
gocent.WithReverse(reverse),
gocent.WithSince(sp),
)
if err != nil {
log.Fatalf("Error calling history: %v", err)
}
for _, pub := range historyResult.Publications {
log.Println(pub.Offset, "=>", string(pub.Data))
sp = &gocent.StreamPosition{
Offset: pub.Offset,
Epoch: historyResult.Epoch,
}
}
if len(historyResult.Publications) < limit {
// Got all pubs, invert pagination direction.
reverse = !reverse
log.Println("end of stream reached, change iteration direction")
}
}

Rolling your ownโ€‹

Finally, recovery is opt-in convenience, not a cage. You can always bypass it and implement catch-up yourself on top of plain PUB/SUB โ€” query your backend for fresh state after every resubscribe, or iterate the Centrifugo stream manually with the history API above. The automatic mechanism just saves you from writing that for the common case.