Extend

Stream

Subscribe to wide events flowing through evlog — in-process with createStreamDrain, or over the network with the local SSE stream server.

evlog ships a stream primitive so any local consumer can subscribe to wide events without re-implementing a drain. There are two layers, building on each other:

  • In-process buscreateStreamDrain(), the canonical pub/sub. Sync listeners, async iterators, ring-buffered replay.
  • Network bridgestartStreamServer(), an opt-in HTTP mini-server that exposes the in-process bus over Server-Sent Events for browsers, CLIs, or external devtools.
Local-only by design. Both layers live inside a single Node / Bun / Deno process. They work in pnpm dev, on long-lived self-hosted servers, on VMs and containers (Fly, Railway, Coolify…). They do not work on serverless platforms (Vercel Functions, Cloudflare Workers, AWS Lambda) — each invocation is an isolated process. Use a real broker (Redis Streams, NATS, Pub/Sub) for cross-instance fan-out there.

In-process bus

createStreamDrain()·0/3 delivered
emit
bus
0/3
subscribe()
idle
events()
idle
replay()
0/8
ring
0/8
one emit · all subscribers replay for late joiners emitted: 0

createStreamDrain() is just a drain. Register it on the evlog drain hook and subscribe to events as they're emitted — no HTTP, no serialization, no extra hops.

Subscribe to wide events in-process

import { createStreamDrain } from 'evlog/stream'

const stream = createStreamDrain({ buffer: 200 })

nitroApp.hooks.hook('evlog:drain', stream.drain)

const off = stream.subscribe((event) => {
  if (event.level === 'error') notify(event)
})
off()

for (const past of stream.recent()) {
  bootstrap(past)
}
for await (const event of stream.events()) {
  bootstrap(event)
}

Options

OptionTypeDescription
buffernumberRing-buffer size for recent() snapshots. Set to 0 to disable. Default: 500.
perSubscriberQueuenumberMax queued events per events() async iterator before the oldest are dropped. The drain itself is never blocked. Default: 1000.
filter(event) => booleanOptional predicate run on each drained event — return false to skip the event entirely (neither buffered nor delivered).

stream.drain(events) accepts either a single event or a batch. stream.recent() returns a snapshot of the buffered events (oldest first, most recent last) for ad-hoc inspection or to seed a UI panel.

Network bridge — stream server

GET /sse·connecting
hello
data:{ type:"hello", data:{ pid: 84321, version: "2.16.0" } }
replay
data:{ type:"replay", data:{ POST /login → 200 } }
replay
data:{ type:"replay", data:{ GET / → 200 } }
event
data:{ type:"event", data:{ POST /checkout → 200 } }
event
data:{ type:"event", data:{ GET /cart → 200 } }
ping
data:{ type:"ping", data:{ ts: 1746796800123 } }
event
data:{ type:"event", data:{ POST /api/email → 500 } }
connect
replay0
events0
heartbeat30s

startStreamServer() boots a tiny node:http server in the same process as your app, on its own ephemeral port, and exposes the in-process bus over Server-Sent Events. Any consumer (browser tab, CLI, Tauri/Electron devtool) can subscribe — your application API is untouched.

Strict opt-in. Nothing starts unless you set the option explicitly. There is no auto-enable in dev — the server only boots when you ask for it.

Expose the evlog stream over SSE

What boots up

When you opt in, evlog calls startStreamServer() and:

  1. Picks an ephemeral free port (or the one you specify)
  2. Spins a tiny node:http server on 127.0.0.1 (or your host)
  3. Writes the discovered URL to .evlog/stream.url
  4. Prints a one-line banner in the server console
  5. Hooks the in-process stream drain into the evlog pipeline
  6. Exposes GET / for the SSE stream and GET /info for handshake metadata

On shutdown (SIGINT / SIGTERM / process exit) the server cleans up listeners and removes the URL file.

Per-framework opt-in

export default defineNuxtConfig({
  modules: ['evlog/nuxt'],
  evlog: {
    stream: true,
    // or: stream: { port: 4444, token: process.env.EVLOG_STREAM_TOKEN }
  },
})

The Hono / Express / Fastify integrations don't need a "feature PR" — startStreamServer() is orthogonal to your framework. You boot it once and connect its drain to the evlog pipeline like any other drain.

Discovery

The mini-server runs on a random port, so any consumer must discover it.

.evlog/stream.url
http://127.0.0.1:53942

Read directly from disk:

import { readFile } from 'node:fs/promises'

const url = (await readFile('.evlog/stream.url', 'utf-8')).trim()

Or — for same-origin browser tabs in a Nuxt app — hit the discovery route:

const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())

Wire format

Every SSE message has the shape { evlog: '1', type, data }:

typeWhendata
helloFirst frame after connect{ evlogVersion, bufferSize, heartbeatMs }
replayRight after hello, replays buffered events when the consumer passed ?since=<iso>WideEvent
eventEvery emitted event after thatWideEvent
pingHeartbeat (default every 15s, configurable via heartbeatMs){ ts: number }
Browser consumer
const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())
const es = new EventSource(url)

es.onmessage = (msg) => {
  const { type, data } = JSON.parse(msg.data)
  if (type === 'event') events.push(data)
}

Auth + CORS

OptionBehavior
tokenWhen set, the server requires Authorization: Bearer <token> on every request and 401s otherwise.
(no token)Connections are accepted only when there is no Origin header or the origin host is local (127.0.0.1 / localhost / [::1]). Other origins receive 403.
hostDefault 127.0.0.1 — never reachable from the LAN. Override to 0.0.0.0 only with a token set.
heartbeatMsHeartbeat interval (default 15000).
bufferRing buffer kept on the underlying default stream — replayed for late-joining clients via ?since=<iso>. Default 500.

Going further

  • FS reader — replay or tail historic NDJSON files (cross-process, survives restarts)
  • Consumer recipes — build a minimal devtool, pipe to curl + jq, replay history then go live