Build on top

Sinks

Build, batch, and fan out drains — write a custom drain for any backend, wrap it in createDrainPipeline for batch + retry, and compose multiple destinations through one pipeline.

A drain is the terminal step of evlog's pipeline: a function that receives wide events and ships them somewhere — an HTTP API, a message queue, a database, a webhook, a local file. evlog ships built-in drains for popular providers (Adapters overview). This page covers the three things you do on top of them in production:

You want to…Use
Write a drain for a backend without a built-in adapterCustom drains
Batch + retry every drain so one HTTP call per event isn't a thingDrain pipeline
Send each event to several destinations in parallelFanout

Custom drains

When you need a destination that isn't covered by a built-in adapter, write your own with defineDrain (any transport) or defineHttpDrain (HTTP backends with auto timeouts + retries + identity headers).

Build a custom evlog drain

A 5-minute example — internal Loki drain

import { defineHttpDrain } from 'evlog/toolkit'

export function createLokiDrain(overrides?: { url?: string; token?: string }) {
  return defineHttpDrain<{ url: string; token: string }>({
    name: 'loki',
    resolve: () => ({
      url: overrides?.url ?? process.env.LOKI_URL!,
      token: overrides?.token ?? process.env.LOKI_TOKEN!,
    }),
    encode: (events, config) => ({
      url: `${config.url}/loki/api/v1/push`,
      headers: {
        'Content-Type': 'application/json',
        Authorization: `Bearer ${config.token}`,
      },
      body: JSON.stringify({
        streams: events.map(e => ({
          stream: { service: e.service, level: e.level },
          values: [[String(Date.parse(e.timestamp) * 1e6), JSON.stringify(e)]],
        })),
      }),
    }),
  })
}

defineHttpDrain handles abort timeouts, exponential backoff on 5xx / network errors, and automatic identity headers — you only describe how to encode events into the request.

The full reference — defineDrain for arbitrary transports, configuration resolution patterns, error handling — lives at Custom adapters.

Drain pipeline

drain pipeline · batch + retry·BUFFERING
app emits#0
never blocks the response
in-memory buffer0 / 10
·
·
·
·
·
·
·
·
·
·
fills until size or 5s interval
flush · http POST0 ok
onDropped · maxBufferSize · drain.flush()
emitted0
batches sent0
dropped0

Every drain in production should be wrapped in createDrainPipeline(). It batches events, retries on transient failures, and drops the oldest events when the buffer overflows. Without it, you make one HTTP request per emitted event, which doesn't scale beyond local dev.

Wrap an evlog drain in batch + retry pipeline

Quick example

import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import type { DrainContext } from 'evlog'

const pipeline = createDrainPipeline<DrainContext>({
  batch: { size: 50, intervalMs: 5000 },
  retry: { maxAttempts: 3 },
  maxBufferSize: 1000,
})

const axiom = createAxiomDrain()
const drain = pipeline(async (batch) => {
  await axiom(batch)
})

nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())

The full reference — pipeline options (batch, retry, maxBufferSize, onDropped), lifecycle (flush() and pending) — lives at Drain pipeline.

Common pitfalls

  • Don't forget drain.flush() on shutdown — buffered events are lost otherwise
  • Tune batch.size to match your provider's recommended payload — too small wastes overhead, too big risks rejection

Fanout

drain pipeline
0/5 delivered
wide event
{
  level:    "info",
  method:   "POST",
  path:     "/checkout",
  duration: 234,
  user:     {...},
  cart:     {...}
}
BUILDING
  1. axiom
    pending
  2. otlp
    pending
  3. sentry
    pending
  4. nuxthub
    pending
  5. fs
    pending
fire-and-forget retry with backoff one failure ≠ blocked response ✓ all destinations resolved

In production, the same wide event often needs to reach more than one destination: a long-term store (Axiom), a metrics tool (Datadog), an error tracker (Sentry), and a local fs drain for incident replay. The pipeline batches once, then your drain fans out the batch to every destination in parallel via Promise.allSettled so a single slow / failing destination cannot block the others.

Fan out evlog events to multiple destinations

The recipe

import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createDatadogDrain } from 'evlog/datadog'
import { createSentryDrain } from 'evlog/sentry'
import { createFsDrain } from 'evlog/fs'
import type { DrainContext } from 'evlog'

const pipeline = createDrainPipeline<DrainContext>({
  batch: { size: 50, intervalMs: 5000 },
  retry: { maxAttempts: 3 },
  maxBufferSize: 1000,
})

const axiom = createAxiomDrain()
const datadog = createDatadogDrain()
const sentry = createSentryDrain({ minLevel: 'error' })
const fs = createFsDrain({ dir: '.evlog/logs', maxFiles: 14 })

export const drain = pipeline(async (batch) => {
  await Promise.allSettled([
    axiom(batch),
    datadog(batch),
    sentry(batch),
    fs(batch),
  ])
})

Then register drain wherever your framework integration takes a drain — nitroApp.hooks.hook('evlog:drain', drain) for Nitro, initLogger({ drain }) for Next.js / standalone, etc.

What you get

  • Parallel dispatch — every destination receives the batch concurrently via Promise.allSettled
  • Tolerant fanout — if Datadog's API throws, Axiom / Sentry / fs still complete; the pipeline only retries the whole batch when the wrapping function rejects
  • Shared backpressure — the buffer is sized once for the whole pipeline; if the wrapping drain falls behind, the oldest events are dropped consistently for every destination

Per-drain filtering

Wrap a destination drain so it only sees events you care about:

import type { DrainContext } from 'evlog'

const sentry = createSentryDrain({ dsn: process.env.SENTRY_DSN! })

async function sentryErrorsOnly(batch: DrainContext[]): Promise<void> {
  const errors = batch.filter(c => c.event?.level === 'error')
  if (errors.length > 0) await sentry(errors)
}

export const drain = pipeline(async (batch) => {
  await Promise.allSettled([
    axiom(batch),
    sentryErrorsOnly(batch),
  ])
})

Most built-in drains expose minLevel directly, so you only need this pattern for non-level filters (path, custom field, etc.).

What not to do

  • Don't run one pipeline per drain unless you need per-destination retries — sharing one pipeline keeps batching + buffering coherent
  • Don't forget drain.flush() on shutdown — events buffered for fanout are lost on abrupt exit
  • Don't fan out to a serverless-incompatible target without checking — the stream server reaches every connected client through the in-process stream; it's not a drain