Drain Pipeline
In production, sending one HTTP request per emitted event doesn't scale. The drain pipeline buffers events and sends them in batches, retries on transient failures, drops the oldest events when the buffer overflows, and lets a single drain function fan out to several destinations in parallel. The same pipeline powers the HTTP browser drain used for client-side logs.
| You want to… | See |
|---|---|
| Wrap any drain in batch + retry + buffer | Quick start |
| Send each event to several destinations in parallel | Fanout |
| Ship browser logs to your server endpoint | HTTP drain (browser to server) |
| Tune batch size, retry strategy, buffer size | Configuration |
Add the drain pipeline (batch + retry + fanout)
Quick start
The pipeline wraps any drain. The wiring depends on your framework — pick the tab that matches yours; every other example below uses the same shape.
// server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
// lib/evlog.ts
import type { DrainContext } from 'evlog'
import { createEvlog } from 'evlog/next'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
export const { withEvlog, useLogger, log, createError } = createEvlog({
service: 'my-app',
drain,
})
export const flushEvlog = () => drain.flush()
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
app.use(evlog({ drain })) // Hono / Express / Elysia
// await app.register(evlog, { drain }) // Fastify
// EvlogModule.forRoot({ drain }) // NestJS
process.on('SIGTERM', () => drain.flush())
// index.ts — plain TypeScript / Bun / Node script
import type { DrainContext } from 'evlog'
import { initLogger } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
initLogger({ drain })
await drain.flush() // before exit
drain.flush()). On Nitro use the close hook; on standalone scripts call it before process.exit; on serverless runtimes use waitUntil(drain.flush()).How it works
Events are buffered as they arrive on evlog:drain. A batch flushes when either batch.size is reached or batch.intervalMs expires (whichever comes first). On failure, the same batch is retried with the configured backoff; once retry.maxAttempts is exhausted, onDropped is called with the lost events. The buffer is bounded by maxBufferSize — once full, the oldest events are dropped to keep memory flat.
Configuration
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>({
batch: {
size: 50, // Flush every 50 events
intervalMs: 5000, // Or every 5 seconds, whichever comes first
},
retry: {
maxAttempts: 3,
backoff: 'exponential',
initialDelayMs: 1000,
maxDelayMs: 30000,
},
maxBufferSize: 1000,
onDropped: (events, error) => {
console.error(`[evlog] Dropped ${events.length} events:`, error?.message)
},
})
export const drain = pipeline(createAxiomDrain())
Options reference
| Option | Default | Description |
|---|---|---|
batch.size | 50 | Maximum events per batch |
batch.intervalMs | 5000 | Max time (ms) before flushing a partial batch |
retry.maxAttempts | 3 | Total attempts including the initial one |
retry.backoff | 'exponential' | 'exponential' | 'linear' | 'fixed' |
retry.initialDelayMs | 1000 | Base delay for the first retry |
retry.maxDelayMs | 30000 | Upper bound for any retry delay |
maxBufferSize | 1000 | Max buffered events before dropping oldest |
onDropped | - | Callback when events are dropped (overflow or retry exhaustion) |
Backoff strategies
| Strategy | Delay pattern | Use case |
|---|---|---|
exponential | 1s, 2s, 4s, 8s… | Default. Best for transient failures that may need time to recover |
linear | 1s, 2s, 3s, 4s… | Predictable delay growth |
fixed | 1s, 1s, 1s, 1s… | Same delay every time. Useful for rate-limited APIs |
Returned drain function
The function returned by pipeline(drain) is hook-compatible and exposes:
| Property | Type | Description |
|---|---|---|
drain(ctx) | (ctx: T) => void | Push a single event into the buffer |
drain.flush() | () => Promise<void> | Force-flush all buffered events |
drain.pending | number | Number of events currently buffered |
Fanout
{
level: "info",
method: "POST",
path: "/checkout",
duration: 234,
user: {...},
cart: {...}
}- axiompending
- otlppending
- sentrypending
- nuxthubpending
- fspending
In production, the same wide event often needs to reach more than one destination: a long-term store (Axiom), a metrics tool (Datadog), an error tracker (Sentry), and a local fs drain for incident replay. The pipeline batches once, then your drain fans out the batch to every destination in parallel via Promise.allSettled so a single slow / failing destination cannot block the others.
Fan out evlog events to multiple destinations
The recipe
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createDatadogDrain } from 'evlog/datadog'
import { createSentryDrain } from 'evlog/sentry'
import { createFsDrain } from 'evlog/fs'
import type { DrainContext } from 'evlog'
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 50, intervalMs: 5000 },
retry: { maxAttempts: 3 },
maxBufferSize: 1000,
})
const axiom = createAxiomDrain()
const datadog = createDatadogDrain()
const sentry = createSentryDrain({ minLevel: 'error' })
const fs = createFsDrain({ dir: '.evlog/logs', maxFiles: 14 })
export const drain = pipeline(async (batch) => {
await Promise.allSettled([
axiom(batch),
datadog(batch),
sentry(batch),
fs(batch),
])
})
What you get
- Parallel dispatch — every destination receives the batch concurrently via
Promise.allSettled - Tolerant fanout — if Datadog's API throws, Axiom / Sentry / fs still complete; the pipeline only retries the whole batch when the wrapping function rejects
- Shared backpressure — the buffer is sized once for the whole pipeline; if the wrapping drain falls behind, the oldest events are dropped consistently for every destination
Per-drain filtering
Wrap a destination drain so it only sees events you care about:
import type { DrainContext } from 'evlog'
const sentry = createSentryDrain({ dsn: process.env.SENTRY_DSN! })
async function sentryErrorsOnly(batch: DrainContext[]): Promise<void> {
const errors = batch.filter(c => c.event?.level === 'error')
if (errors.length > 0) await sentry(errors)
}
export const drain = pipeline(async (batch) => {
await Promise.allSettled([
axiom(batch),
sentryErrorsOnly(batch),
])
})
Most built-in drains expose minLevel directly, so you only need this pattern for non-level filters (path, custom field, etc.).
Custom drain function
You don't need an adapter. Pass any async function that accepts a batch:
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 100 } })
export const drain = pipeline(async (batch) => {
await fetch('https://your-service.com/logs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(batch.map(ctx => ctx.event)),
})
})
For anything more involved (config resolution, retries, identity headers), use defineHttpDrain instead and let the toolkit handle the boilerplate.
HTTP drain (browser to server)
The HTTP drain is a framework-agnostic transport for shipping client-side logs to your server. It composes on top of the same pipeline primitives, with browser-specific defaults (fetch keepalive + sendBeacon on visibilitychange).
evlog/browser import path is deprecated and re-exports the same API as evlog/http. It will be removed in the next major release. Prefer evlog/http for new code.Set up the HTTP transport for client logs
Quick start
import { initLogger, log } from 'evlog'
import { createHttpLogDrain } from 'evlog/http'
const drain = createHttpLogDrain({
drain: { endpoint: 'https://logs.example.com/v1/ingest' },
})
initLogger({ drain })
log.info({ action: 'page_view', path: location.pathname })
How it works (browser specifics)
log.info()/log.warn()/log.error()push events into a memory buffer- Events are batched by size (default 25) or time interval (default 2 s)
- Batches are sent via
fetchwithkeepalive: trueso requests survive page navigation - When the page becomes hidden (tab switch, navigation), buffered events are flushed via
navigator.sendBeaconas a fallback - Your server endpoint receives a
DrainContext[]JSON array and processes it however you like
Two-tier API
createHttpLogDrain(options)
High-level, pre-composed: creates a pipeline with batching, retry, and auto-flush on visibilitychange. Returns a PipelineDrainFn<DrainContext> directly usable with initLogger({ drain }).
import { initLogger, log } from 'evlog'
import { createHttpLogDrain } from 'evlog/http'
const drain = createHttpLogDrain({
drain: { endpoint: 'https://logs.example.com/v1/ingest' },
pipeline: { batch: { size: 50, intervalMs: 5000 } },
})
initLogger({ drain })
log.info({ action: 'click', target: 'buy-button' })
createHttpDrain(config)
Low-level transport function. Use this when you want full control over the pipeline configuration:
import { createHttpDrain } from 'evlog/http'
import { createDrainPipeline } from 'evlog/pipeline'
import type { DrainContext } from 'evlog'
const transport = createHttpDrain({
endpoint: 'https://logs.example.com/v1/ingest',
})
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 100, intervalMs: 10000 },
retry: { maxAttempts: 5 },
})
const drain = pipeline(transport)
Configuration reference
HttpDrainConfig
| Option | Default | Description |
|---|---|---|
endpoint | - | (required) Full URL of the server ingest endpoint |
headers | - | Custom headers sent with each fetch request (e.g. Authorization, X-API-Key) |
timeout | 5000 | Request timeout in milliseconds |
useBeacon | true | Use sendBeacon when the page is hidden |
credentials | 'same-origin' | Fetch credentials mode ('omit', 'same-origin', 'include'). Set to 'include' for cross-origin endpoints |
HttpLogDrainOptions
| Option | Default | Description |
|---|---|---|
drain | - | (required) HttpDrainConfig object |
pipeline | { batch: { size: 25, intervalMs: 2000 }, retry: { maxAttempts: 2 } } | Pipeline configuration overrides |
autoFlush | true | Auto-register visibilitychange flush listener |
sendBeacon fallback
useBeacon is enabled (the default) and the page becomes hidden, the drain automatically switches from fetch to navigator.sendBeacon. This ensures logs are delivered even when the user closes the tab or navigates away.sendBeacon has a browser-imposed payload limit (~64 KB). If the payload exceeds this, the drain throws an error. Keep batch sizes reasonable (the default of 25 is well within limits).
Authentication
Pass custom headers to protect your ingest endpoint:
const drain = createHttpLogDrain({
drain: {
endpoint: 'https://logs.example.com/v1/ingest',
headers: {
'Authorization': 'Bearer ' + token,
},
},
})
headers are applied to fetch requests only. The sendBeacon API does not support custom headers, so when the page is hidden and sendBeacon is used, headers are not sent. If your endpoint requires authentication, validate via a session cookie (set credentials: 'include' for cross-origin endpoints) or disable sendBeacon with useBeacon: false.Server endpoint
Your server needs a POST endpoint that accepts a DrainContext[] JSON body. Examples for common frameworks:
app.post('/v1/ingest', express.json(), (req, res) => {
for (const entry of req.body) {
console.log('[BROWSER]', JSON.stringify(entry))
}
res.sendStatus(204)
})
app.post('/v1/ingest', async (c) => {
const body = await c.req.json()
for (const entry of body) {
console.log('[BROWSER]', JSON.stringify(entry))
}
return c.body(null, 204)
})
See the full browser example for a working Hono server + browser page.
Common pitfalls
- Don't forget
drain.flush()on shutdown — buffered events are lost otherwise - Tune
batch.sizeto match your provider's recommended payload — too small wastes overhead, too big risks rejection - Don't run one pipeline per drain unless you need per-destination retries — sharing one pipeline keeps batching + buffering coherent
- Don't fan out to a serverless-incompatible target without checking — the stream server reaches every connected client through the in-process stream; it's not a drain
Next steps
- Custom Drains — build a drain for any backend with
defineHttpDrain/defineDrain - Adapters Overview — built-in adapters that work with the pipeline out of the box
- Best Practices — security and production tips
- Client logging — end-to-end browser → server flow
Custom drains
Build a drain for any backend without a built-in adapter — defineHttpDrain for HTTP destinations, defineDrain for any other transport. Standardized config resolution, retries, timeouts, and identity headers handled for you.
Configuration
Complete reference for all evlog configuration options including global logger settings, middleware options, environment context, and framework-specific overrides.