Sinks
A drain is the terminal step of evlog's pipeline: a function that receives wide events and ships them somewhere — an HTTP API, a message queue, a database, a webhook, a local file. evlog ships built-in drains for popular providers (Adapters overview). This page covers the three things you do on top of them in production:
| You want to… | Use |
|---|---|
| Write a drain for a backend without a built-in adapter | Custom drains |
| Batch + retry every drain so one HTTP call per event isn't a thing | Drain pipeline |
| Send each event to several destinations in parallel | Fanout |
Custom drains
When you need a destination that isn't covered by a built-in adapter, write your own with defineDrain (any transport) or defineHttpDrain (HTTP backends with auto timeouts + retries + identity headers).
Build a custom evlog drain
A 5-minute example — internal Loki drain
import { defineHttpDrain } from 'evlog/toolkit'
export function createLokiDrain(overrides?: { url?: string; token?: string }) {
return defineHttpDrain<{ url: string; token: string }>({
name: 'loki',
resolve: () => ({
url: overrides?.url ?? process.env.LOKI_URL!,
token: overrides?.token ?? process.env.LOKI_TOKEN!,
}),
encode: (events, config) => ({
url: `${config.url}/loki/api/v1/push`,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${config.token}`,
},
body: JSON.stringify({
streams: events.map(e => ({
stream: { service: e.service, level: e.level },
values: [[String(Date.parse(e.timestamp) * 1e6), JSON.stringify(e)]],
})),
}),
}),
})
}
defineHttpDrain handles abort timeouts, exponential backoff on 5xx / network errors, and automatic identity headers — you only describe how to encode events into the request.
The full reference — defineDrain for arbitrary transports, configuration resolution patterns, error handling — lives at Custom adapters.
Drain pipeline
Every drain in production should be wrapped in createDrainPipeline(). It batches events, retries on transient failures, and drops the oldest events when the buffer overflows. Without it, you make one HTTP request per emitted event, which doesn't scale beyond local dev.
Wrap an evlog drain in batch + retry pipeline
Quick example
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import type { DrainContext } from 'evlog'
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 50, intervalMs: 5000 },
retry: { maxAttempts: 3 },
maxBufferSize: 1000,
})
const axiom = createAxiomDrain()
const drain = pipeline(async (batch) => {
await axiom(batch)
})
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
The full reference — pipeline options (batch, retry, maxBufferSize, onDropped), lifecycle (flush() and pending) — lives at Drain pipeline.
Common pitfalls
- Don't forget
drain.flush()on shutdown — buffered events are lost otherwise - Tune
batch.sizeto match your provider's recommended payload — too small wastes overhead, too big risks rejection
Fanout
{
level: "info",
method: "POST",
path: "/checkout",
duration: 234,
user: {...},
cart: {...}
}- axiompending
- otlppending
- sentrypending
- nuxthubpending
- fspending
In production, the same wide event often needs to reach more than one destination: a long-term store (Axiom), a metrics tool (Datadog), an error tracker (Sentry), and a local fs drain for incident replay. The pipeline batches once, then your drain fans out the batch to every destination in parallel via Promise.allSettled so a single slow / failing destination cannot block the others.
Fan out evlog events to multiple destinations
The recipe
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createDatadogDrain } from 'evlog/datadog'
import { createSentryDrain } from 'evlog/sentry'
import { createFsDrain } from 'evlog/fs'
import type { DrainContext } from 'evlog'
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 50, intervalMs: 5000 },
retry: { maxAttempts: 3 },
maxBufferSize: 1000,
})
const axiom = createAxiomDrain()
const datadog = createDatadogDrain()
const sentry = createSentryDrain({ minLevel: 'error' })
const fs = createFsDrain({ dir: '.evlog/logs', maxFiles: 14 })
export const drain = pipeline(async (batch) => {
await Promise.allSettled([
axiom(batch),
datadog(batch),
sentry(batch),
fs(batch),
])
})
Then register drain wherever your framework integration takes a drain — nitroApp.hooks.hook('evlog:drain', drain) for Nitro, initLogger({ drain }) for Next.js / standalone, etc.
What you get
- Parallel dispatch — every destination receives the batch concurrently via
Promise.allSettled - Tolerant fanout — if Datadog's API throws, Axiom / Sentry / fs still complete; the pipeline only retries the whole batch when the wrapping function rejects
- Shared backpressure — the buffer is sized once for the whole pipeline; if the wrapping drain falls behind, the oldest events are dropped consistently for every destination
Per-drain filtering
Wrap a destination drain so it only sees events you care about:
import type { DrainContext } from 'evlog'
const sentry = createSentryDrain({ dsn: process.env.SENTRY_DSN! })
async function sentryErrorsOnly(batch: DrainContext[]): Promise<void> {
const errors = batch.filter(c => c.event?.level === 'error')
if (errors.length > 0) await sentry(errors)
}
export const drain = pipeline(async (batch) => {
await Promise.allSettled([
axiom(batch),
sentryErrorsOnly(batch),
])
})
Most built-in drains expose minLevel directly, so you only need this pattern for non-level filters (path, custom field, etc.).
What not to do
- Don't run one pipeline per drain unless you need per-destination retries — sharing one pipeline keeps batching + buffering coherent
- Don't forget
drain.flush()on shutdown — events buffered for fanout are lost on abrupt exit - Don't fan out to a serverless-incompatible target without checking — the stream server reaches every connected client through the in-process stream; it's not a drain
Pipeline extension
Hook into the evlog pipeline — plugins for cross-cutting concerns, enrichers for derived context, tail sampling for outcome-aware keep decisions, identity headers on every drain request.
Framework integration
Build evlog support for an HTTP framework that doesn't ship a built-in integration — Medusa, AdonisJS, h3 directly, custom dispatchers, queue workers.