Extend

Drain Pipeline

Batch events, retry on failure, fan out to multiple destinations, and ship browser logs to your server. The shared pipeline that wraps every drain in production.

In production, sending one HTTP request per emitted event doesn't scale. The drain pipeline buffers events and sends them in batches, retries on transient failures, drops the oldest events when the buffer overflows, and lets a single drain function fan out to several destinations in parallel. The same pipeline powers the HTTP browser drain used for client-side logs.

drain pipeline · batch + retry·BUFFERING
app emits#0
never blocks the response
in-memory buffer0 / 10
·
·
·
·
·
·
·
·
·
·
fills until size or 5s interval
flush · http POST0 ok
onDropped · maxBufferSize · drain.flush()
emitted0
batches sent0
dropped0
You want to…See
Wrap any drain in batch + retry + bufferQuick start
Send each event to several destinations in parallelFanout
Ship browser logs to your server endpointHTTP drain (browser to server)
Tune batch size, retry strategy, buffer sizeConfiguration

Add the drain pipeline (batch + retry + fanout)

Quick start

The pipeline wraps any drain. The wiring depends on your framework — pick the tab that matches yours; every other example below uses the same shape.

// server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

export default defineNitroPlugin((nitroApp) => {
  const pipeline = createDrainPipeline<DrainContext>()
  const drain = pipeline(createAxiomDrain())

  nitroApp.hooks.hook('evlog:drain', drain)
  nitroApp.hooks.hook('close', () => drain.flush())
})
Always flush the pipeline before the process exits (drain.flush()). On Nitro use the close hook; on standalone scripts call it before process.exit; on serverless runtimes use waitUntil(drain.flush()).

How it works

Events are buffered as they arrive on evlog:drain. A batch flushes when either batch.size is reached or batch.intervalMs expires (whichever comes first). On failure, the same batch is retried with the configured backoff; once retry.maxAttempts is exhausted, onDropped is called with the lost events. The buffer is bounded by maxBufferSize — once full, the oldest events are dropped to keep memory flat.

Configuration

pipeline-config.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

const pipeline = createDrainPipeline<DrainContext>({
  batch: {
    size: 50,          // Flush every 50 events
    intervalMs: 5000,  // Or every 5 seconds, whichever comes first
  },
  retry: {
    maxAttempts: 3,
    backoff: 'exponential',
    initialDelayMs: 1000,
    maxDelayMs: 30000,
  },
  maxBufferSize: 1000,
  onDropped: (events, error) => {
    console.error(`[evlog] Dropped ${events.length} events:`, error?.message)
  },
})

export const drain = pipeline(createAxiomDrain())

Options reference

OptionDefaultDescription
batch.size50Maximum events per batch
batch.intervalMs5000Max time (ms) before flushing a partial batch
retry.maxAttempts3Total attempts including the initial one
retry.backoff'exponential''exponential' | 'linear' | 'fixed'
retry.initialDelayMs1000Base delay for the first retry
retry.maxDelayMs30000Upper bound for any retry delay
maxBufferSize1000Max buffered events before dropping oldest
onDropped-Callback when events are dropped (overflow or retry exhaustion)

Backoff strategies

StrategyDelay patternUse case
exponential1s, 2s, 4s, 8s…Default. Best for transient failures that may need time to recover
linear1s, 2s, 3s, 4s…Predictable delay growth
fixed1s, 1s, 1s, 1s…Same delay every time. Useful for rate-limited APIs

Returned drain function

The function returned by pipeline(drain) is hook-compatible and exposes:

PropertyTypeDescription
drain(ctx)(ctx: T) => voidPush a single event into the buffer
drain.flush()() => Promise<void>Force-flush all buffered events
drain.pendingnumberNumber of events currently buffered

Fanout

drain pipeline
0/5 delivered
wide event
{
  level:    "info",
  method:   "POST",
  path:     "/checkout",
  duration: 234,
  user:     {...},
  cart:     {...}
}
BUILDING
  1. axiom
    pending
  2. otlp
    pending
  3. sentry
    pending
  4. nuxthub
    pending
  5. fs
    pending
fire-and-forget retry with backoff one failure ≠ blocked response ✓ all destinations resolved

In production, the same wide event often needs to reach more than one destination: a long-term store (Axiom), a metrics tool (Datadog), an error tracker (Sentry), and a local fs drain for incident replay. The pipeline batches once, then your drain fans out the batch to every destination in parallel via Promise.allSettled so a single slow / failing destination cannot block the others.

Fan out evlog events to multiple destinations

The recipe

import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createDatadogDrain } from 'evlog/datadog'
import { createSentryDrain } from 'evlog/sentry'
import { createFsDrain } from 'evlog/fs'
import type { DrainContext } from 'evlog'

const pipeline = createDrainPipeline<DrainContext>({
  batch: { size: 50, intervalMs: 5000 },
  retry: { maxAttempts: 3 },
  maxBufferSize: 1000,
})

const axiom = createAxiomDrain()
const datadog = createDatadogDrain()
const sentry = createSentryDrain({ minLevel: 'error' })
const fs = createFsDrain({ dir: '.evlog/logs', maxFiles: 14 })

export const drain = pipeline(async (batch) => {
  await Promise.allSettled([
    axiom(batch),
    datadog(batch),
    sentry(batch),
    fs(batch),
  ])
})

What you get

  • Parallel dispatch — every destination receives the batch concurrently via Promise.allSettled
  • Tolerant fanout — if Datadog's API throws, Axiom / Sentry / fs still complete; the pipeline only retries the whole batch when the wrapping function rejects
  • Shared backpressure — the buffer is sized once for the whole pipeline; if the wrapping drain falls behind, the oldest events are dropped consistently for every destination

Per-drain filtering

Wrap a destination drain so it only sees events you care about:

import type { DrainContext } from 'evlog'

const sentry = createSentryDrain({ dsn: process.env.SENTRY_DSN! })

async function sentryErrorsOnly(batch: DrainContext[]): Promise<void> {
  const errors = batch.filter(c => c.event?.level === 'error')
  if (errors.length > 0) await sentry(errors)
}

export const drain = pipeline(async (batch) => {
  await Promise.allSettled([
    axiom(batch),
    sentryErrorsOnly(batch),
  ])
})

Most built-in drains expose minLevel directly, so you only need this pattern for non-level filters (path, custom field, etc.).

Custom drain function

You don't need an adapter. Pass any async function that accepts a batch:

pipeline-custom.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'

const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 100 } })

export const drain = pipeline(async (batch) => {
  await fetch('https://your-service.com/logs', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(batch.map(ctx => ctx.event)),
  })
})

For anything more involved (config resolution, retries, identity headers), use defineHttpDrain instead and let the toolkit handle the boilerplate.

HTTP drain (browser to server)

The HTTP drain is a framework-agnostic transport for shipping client-side logs to your server. It composes on top of the same pipeline primitives, with browser-specific defaults (fetch keepalive + sendBeacon on visibilitychange).

The evlog/browser import path is deprecated and re-exports the same API as evlog/http. It will be removed in the next major release. Prefer evlog/http for new code.

Set up the HTTP transport for client logs

Quick start

app.ts
import { initLogger, log } from 'evlog'
import { createHttpLogDrain } from 'evlog/http'

const drain = createHttpLogDrain({
  drain: { endpoint: 'https://logs.example.com/v1/ingest' },
})
initLogger({ drain })

log.info({ action: 'page_view', path: location.pathname })

How it works (browser specifics)

  1. log.info() / log.warn() / log.error() push events into a memory buffer
  2. Events are batched by size (default 25) or time interval (default 2 s)
  3. Batches are sent via fetch with keepalive: true so requests survive page navigation
  4. When the page becomes hidden (tab switch, navigation), buffered events are flushed via navigator.sendBeacon as a fallback
  5. Your server endpoint receives a DrainContext[] JSON array and processes it however you like

Two-tier API

createHttpLogDrain(options)

High-level, pre-composed: creates a pipeline with batching, retry, and auto-flush on visibilitychange. Returns a PipelineDrainFn<DrainContext> directly usable with initLogger({ drain }).

app.ts
import { initLogger, log } from 'evlog'
import { createHttpLogDrain } from 'evlog/http'

const drain = createHttpLogDrain({
  drain: { endpoint: 'https://logs.example.com/v1/ingest' },
  pipeline: { batch: { size: 50, intervalMs: 5000 } },
})

initLogger({ drain })
log.info({ action: 'click', target: 'buy-button' })

createHttpDrain(config)

Low-level transport function. Use this when you want full control over the pipeline configuration:

app.ts
import { createHttpDrain } from 'evlog/http'
import { createDrainPipeline } from 'evlog/pipeline'
import type { DrainContext } from 'evlog'

const transport = createHttpDrain({
  endpoint: 'https://logs.example.com/v1/ingest',
})
const pipeline = createDrainPipeline<DrainContext>({
  batch: { size: 100, intervalMs: 10000 },
  retry: { maxAttempts: 5 },
})

const drain = pipeline(transport)

Configuration reference

HttpDrainConfig

OptionDefaultDescription
endpoint-(required) Full URL of the server ingest endpoint
headers-Custom headers sent with each fetch request (e.g. Authorization, X-API-Key)
timeout5000Request timeout in milliseconds
useBeacontrueUse sendBeacon when the page is hidden
credentials'same-origin'Fetch credentials mode ('omit', 'same-origin', 'include'). Set to 'include' for cross-origin endpoints

HttpLogDrainOptions

OptionDefaultDescription
drain-(required) HttpDrainConfig object
pipeline{ batch: { size: 25, intervalMs: 2000 }, retry: { maxAttempts: 2 } }Pipeline configuration overrides
autoFlushtrueAuto-register visibilitychange flush listener

sendBeacon fallback

When useBeacon is enabled (the default) and the page becomes hidden, the drain automatically switches from fetch to navigator.sendBeacon. This ensures logs are delivered even when the user closes the tab or navigates away.

sendBeacon has a browser-imposed payload limit (~64 KB). If the payload exceeds this, the drain throws an error. Keep batch sizes reasonable (the default of 25 is well within limits).

Authentication

Pass custom headers to protect your ingest endpoint:

app.ts
const drain = createHttpLogDrain({
  drain: {
    endpoint: 'https://logs.example.com/v1/ingest',
    headers: {
      'Authorization': 'Bearer ' + token,
    },
  },
})
headers are applied to fetch requests only. The sendBeacon API does not support custom headers, so when the page is hidden and sendBeacon is used, headers are not sent. If your endpoint requires authentication, validate via a session cookie (set credentials: 'include' for cross-origin endpoints) or disable sendBeacon with useBeacon: false.

Server endpoint

Your server needs a POST endpoint that accepts a DrainContext[] JSON body. Examples for common frameworks:

app.post('/v1/ingest', express.json(), (req, res) => {
  for (const entry of req.body) {
    console.log('[BROWSER]', JSON.stringify(entry))
  }
  res.sendStatus(204)
})

See the full browser example for a working Hono server + browser page.

Common pitfalls

  • Don't forget drain.flush() on shutdown — buffered events are lost otherwise
  • Tune batch.size to match your provider's recommended payload — too small wastes overhead, too big risks rejection
  • Don't run one pipeline per drain unless you need per-destination retries — sharing one pipeline keeps batching + buffering coherent
  • Don't fan out to a serverless-incompatible target without checking — the stream server reaches every connected client through the in-process stream; it's not a drain

Next steps