Stream
evlog ships a stream primitive so any local consumer can subscribe to wide events without re-implementing a drain. There are two layers, building on each other:
- In-process bus —
createStreamDrain(), the canonical pub/sub. Sync listeners, async iterators, ring-buffered replay. - Network bridge —
startStreamServer(), an opt-in HTTP mini-server that exposes the in-process bus over Server-Sent Events for browsers, CLIs, or external devtools.
pnpm dev, on long-lived self-hosted servers, on VMs and containers (Fly, Railway, Coolify…). They do not work on serverless platforms (Vercel Functions, Cloudflare Workers, AWS Lambda) — each invocation is an isolated process. Use a real broker (Redis Streams, NATS, Pub/Sub) for cross-instance fan-out there.In-process bus
createStreamDrain() is just a drain. Register it on the evlog drain hook and subscribe to events as they're emitted — no HTTP, no serialization, no extra hops.
Subscribe to wide events in-process
import { createStreamDrain } from 'evlog/stream'
const stream = createStreamDrain({ buffer: 200 })
nitroApp.hooks.hook('evlog:drain', stream.drain)
const off = stream.subscribe((event) => {
if (event.level === 'error') notify(event)
})
off()
for (const past of stream.recent()) {
bootstrap(past)
}
for await (const event of stream.events()) {
bootstrap(event)
}
Options
| Option | Type | Description |
|---|---|---|
buffer | number | Ring-buffer size for recent() snapshots. Set to 0 to disable. Default: 500. |
perSubscriberQueue | number | Max queued events per events() async iterator before the oldest are dropped. The drain itself is never blocked. Default: 1000. |
filter | (event) => boolean | Optional predicate run on each drained event — return false to skip the event entirely (neither buffered nor delivered). |
stream.drain(events) accepts either a single event or a batch. stream.recent() returns a snapshot of the buffered events (oldest first, most recent last) for ad-hoc inspection or to seed a UI panel.
Network bridge — stream server
startStreamServer() boots a tiny node:http server in the same process as your app, on its own ephemeral port, and exposes the in-process bus over Server-Sent Events. Any consumer (browser tab, CLI, Tauri/Electron devtool) can subscribe — your application API is untouched.
Expose the evlog stream over SSE
What boots up
When you opt in, evlog calls startStreamServer() and:
- Picks an ephemeral free port (or the one you specify)
- Spins a tiny
node:httpserver on127.0.0.1(or your host) - Writes the discovered URL to
.evlog/stream.url - Prints a one-line banner in the server console
- Hooks the in-process stream drain into the evlog pipeline
- Exposes
GET /for the SSE stream andGET /infofor handshake metadata
On shutdown (SIGINT / SIGTERM / process exit) the server cleans up listeners and removes the URL file.
Per-framework opt-in
export default defineNuxtConfig({
modules: ['evlog/nuxt'],
evlog: {
stream: true,
// or: stream: { port: 4444, token: process.env.EVLOG_STREAM_TOKEN }
},
})
import { defineStreamedInstrumentation } from 'evlog/next/stream'
export const { register } = defineStreamedInstrumentation({
stream: true,
})
import { startStreamServer } from 'evlog/stream'
if (process.env.NODE_ENV !== 'production' && process.env.EVLOG_STREAM === '1') {
const { drain } = await startStreamServer()
// Plug `drain` into the evlog drain hook for your framework
}
The Hono / Express / Fastify integrations don't need a "feature PR" — startStreamServer() is orthogonal to your framework. You boot it once and connect its drain to the evlog pipeline like any other drain.
Discovery
The mini-server runs on a random port, so any consumer must discover it.
http://127.0.0.1:53942
Read directly from disk:
import { readFile } from 'node:fs/promises'
const url = (await readFile('.evlog/stream.url', 'utf-8')).trim()
Or — for same-origin browser tabs in a Nuxt app — hit the discovery route:
const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())
Wire format
Every SSE message has the shape { evlog: '1', type, data }:
type | When | data |
|---|---|---|
hello | First frame after connect | { evlogVersion, bufferSize, heartbeatMs } |
replay | Right after hello, replays buffered events when the consumer passed ?since=<iso> | WideEvent |
event | Every emitted event after that | WideEvent |
ping | Heartbeat (default every 15s, configurable via heartbeatMs) | { ts: number } |
const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())
const es = new EventSource(url)
es.onmessage = (msg) => {
const { type, data } = JSON.parse(msg.data)
if (type === 'event') events.push(data)
}
Auth + CORS
| Option | Behavior |
|---|---|
token | When set, the server requires Authorization: Bearer <token> on every request and 401s otherwise. |
| (no token) | Connections are accepted only when there is no Origin header or the origin host is local (127.0.0.1 / localhost / [::1]). Other origins receive 403. |
host | Default 127.0.0.1 — never reachable from the LAN. Override to 0.0.0.0 only with a token set. |
heartbeatMs | Heartbeat interval (default 15000). |
buffer | Ring buffer kept on the underlying default stream — replayed for late-joining clients via ?since=<iso>. Default 500. |
Going further
- FS reader — replay or tail historic NDJSON files (cross-process, survives restarts)
- Consumer recipes — build a minimal devtool, pipe to curl + jq, replay history then go live