Stream
evlog ships a stream primitive so any local consumer can subscribe to wide events without re-implementing a drain. There are two layers, building on each other:
- In-process bus —
createStreamDrain(), the canonical pub/sub. Sync listeners, async iterators, ring-buffered replay. - Network bridge —
startStreamServer(), an opt-in HTTP mini-server that exposes the in-process bus over Server-Sent Events for browsers, CLIs, or external devtools.
pnpm dev, on long-lived self-hosted servers, on VMs and containers (Fly, Railway, Coolify…). They do not work on serverless platforms (Vercel Functions, Cloudflare Workers, AWS Lambda) — each invocation is an isolated process. Use a real broker (Redis Streams, NATS, Pub/Sub) for cross-instance fan-out there.In-process bus
createStreamDrain() is just a drain. Register it on the evlog drain hook and subscribe to events as they're emitted — no HTTP, no serialization, no extra hops.
Subscribe to wide events in-process
import { createStreamDrain } from 'evlog/stream'
const stream = createStreamDrain({ buffer: 200 })
nitroApp.hooks.hook('evlog:drain', stream.drain)
const off = stream.subscribe((event) => {
if (event.level === 'error') notify(event)
})
off()
for (const past of stream.recent()) {
bootstrap(past)
}
for await (const event of stream.events()) {
bootstrap(event)
}
Options
| Option | Type | Description |
|---|---|---|
buffer | number | Ring-buffer size for recent() snapshots. Set to 0 to disable. Default: 500. |
perSubscriberQueue | number | Max queued events per events() async iterator before the oldest are dropped. The drain itself is never blocked. Default: 1000. |
filter | (event) => boolean | Optional predicate run on each drained event — return false to skip the event entirely (neither buffered nor delivered). |
stream.drain(events) accepts either a single event or a batch. stream.recent() returns a snapshot of the buffered events (oldest first, most recent last) for ad-hoc inspection or to seed a UI panel.
Network bridge — stream server
startStreamServer() boots a tiny node:http server in the same process as your app, on its own ephemeral port, and exposes the in-process bus over Server-Sent Events. Any consumer (browser tab, CLI, Tauri/Electron devtool) can subscribe — your application API is untouched.
Expose the evlog stream over SSE
What boots up
When you opt in, evlog calls startStreamServer() and:
- Picks an ephemeral free port (or the one you specify)
- Spins a tiny
node:httpserver on127.0.0.1(or your host) - Writes the discovered URL to
.evlog/stream.url - Prints a one-line banner in the server console
- Hooks the in-process stream drain into the evlog pipeline
- Exposes
GET /for the SSE stream andGET /infofor handshake metadata
On shutdown (SIGINT / SIGTERM / process exit) the server cleans up listeners and removes the URL file.
Per-framework opt-in
export default defineNuxtConfig({
modules: ['evlog/nuxt'],
evlog: {
stream: true,
// or: stream: { port: 4444, token: process.env.EVLOG_STREAM_TOKEN }
},
})
import { defineStreamedInstrumentation } from 'evlog/next/stream'
export const { register } = defineStreamedInstrumentation({
stream: true,
})
import { startStreamServer } from 'evlog/stream'
if (process.env.NODE_ENV !== 'production' && process.env.EVLOG_STREAM === '1') {
const { drain } = await startStreamServer()
// Plug `drain` into the evlog drain hook for your framework
}
The Hono / Express / Fastify integrations don't need a "feature PR" — startStreamServer() is orthogonal to your framework. You boot it once and connect its drain to the evlog pipeline like any other drain.
Discovery
The mini-server runs on a random port, so any consumer must discover it.
http://127.0.0.1:53942
Read directly from disk:
import { readFile } from 'node:fs/promises'
const url = (await readFile('.evlog/stream.url', 'utf-8')).trim()
Or — for same-origin browser tabs in a Nuxt app — hit the discovery route:
const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())
Wire format
Every SSE message has the shape { evlog: '1', type, data }:
type | When | data |
|---|---|---|
hello | First frame after connect | { evlogVersion, bufferSize, heartbeatMs } |
replay | Right after hello, replays buffered events when the consumer passed ?since=<iso> | WideEvent |
event | Every emitted event after that | WideEvent |
ping | Heartbeat (default every 15s, configurable via heartbeatMs) | { ts: number } |
const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())
const es = new EventSource(url)
es.onmessage = (msg) => {
const { type, data } = JSON.parse(msg.data)
if (type === 'event') events.push(data)
}
Auth + CORS
| Option | Behavior |
|---|---|
token | When set, the server requires Authorization: Bearer <token> on every request and 401s otherwise. |
| (no token) | Connections are accepted only when there is no Origin header or the origin host is local (127.0.0.1 / localhost / [::1]). Other origins receive 403. |
host | Default 127.0.0.1 — never reachable from the LAN. Override to 0.0.0.0 only with a token set. |
heartbeatMs | Heartbeat interval (default 15000). |
buffer | Ring buffer kept on the underlying default stream — replayed for late-joining clients via ?since=<iso>. Default 500. |
Going further
- FS reader — replay or tail historic NDJSON files (cross-process, survives restarts)
- Consumer recipes — build a minimal devtool, pipe to curl + jq, replay history then go live
Overview
Observe what flows through the pipeline (stream, fs reader, consumer recipes), plug into the pipeline (plugins, enrichers, tail sampling, identity headers), or build your own bricks (custom drains, drain pipeline, custom framework integration).
Custom framework
Build evlog support for an HTTP framework (or non-HTTP runtime) without a built-in integration. Use defineFrameworkIntegration for the (ctx, next) middleware shape, or createMiddlewareLogger / createRequestLogger for everything else.