Custom Drains
A drain is the terminal step of evlog's pipeline: a function that receives wide events and ships them somewhere — an HTTP API, a message queue, a database, a webhook, a local file. evlog ships built-in drains for popular providers (Adapters overview). When you need a destination that isn't covered, you write your own.
Two factories cover every case:
| You have… | Use |
|---|---|
An HTTP backend (REST, JSON ingest, vendor /v1/logs endpoint) | defineHttpDrain |
| A non-HTTP transport (gRPC, WebSocket, vendor SDK, queue, raw socket) | defineDrain |
Both come from evlog/toolkit and are the exact factories every built-in adapter uses.
Build a custom evlog drain
defineHttpDrain (the HTTP recipe)
The recipe every built-in adapter follows. Two pure functions: resolve() returns the config (or null to skip), encode() returns the HTTP request payload.
import {
defineHttpDrain,
resolveAdapterConfig,
type ConfigField,
} from 'evlog/toolkit'
interface MyServiceConfig {
apiKey: string
endpoint?: string
timeout?: number
}
const FIELDS: ConfigField<MyServiceConfig>[] = [
{ key: 'apiKey', env: ['MYSERVICE_API_KEY'] },
{ key: 'endpoint', env: ['MYSERVICE_ENDPOINT'] },
{ key: 'timeout' },
]
export function createMyServiceDrain(overrides?: Partial<MyServiceConfig>) {
return defineHttpDrain<MyServiceConfig>({
name: 'myservice',
resolve: async () => {
const cfg = await resolveAdapterConfig<MyServiceConfig>('myservice', FIELDS, overrides)
if (!cfg.apiKey) {
console.error('[evlog/myservice] Missing apiKey')
return null
}
return cfg as MyServiceConfig
},
encode: (events, cfg) => ({
url: `${cfg.endpoint ?? 'https://api.myservice.com'}/v1/ingest`,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${cfg.apiKey}`,
},
body: JSON.stringify(events),
}),
})
}
That's it. defineHttpDrain handles batching, retries (default 2), timeouts (default 5000ms), error isolation, and the identity headers (User-Agent: evlog/<version> + X-Evlog-Source: <name>). Your app pipeline keeps running even if your destination is down.
A 5-minute example — internal Loki drain
A complete working drain in 25 lines, with no external config helper:
import { defineHttpDrain } from 'evlog/toolkit'
export function createLokiDrain(overrides?: { url?: string, token?: string }) {
return defineHttpDrain<{ url: string, token: string }>({
name: 'loki',
resolve: () => ({
url: overrides?.url ?? process.env.LOKI_URL!,
token: overrides?.token ?? process.env.LOKI_TOKEN!,
}),
encode: (events, config) => ({
url: `${config.url}/loki/api/v1/push`,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${config.token}`,
},
body: JSON.stringify({
streams: events.map(e => ({
stream: { service: e.service, level: e.level },
values: [[String(Date.parse(e.timestamp) * 1e6), JSON.stringify(e)]],
})),
}),
}),
})
}
Standardized config priority
resolveAdapterConfig(namespace, fields, overrides) walks the standard chain so users get the same configuration UX as built-in adapters:
- Explicit
overridespassed to your factory runtimeConfig.evlog.<namespace>(Nuxt/Nitro)runtimeConfig.<namespace>(legacy Nuxt/Nitro)NUXT_<NS>_<FIELD>env vars<NS>_<FIELD>env vars
Field names should follow the project conventions: apiKey, endpoint, serviceName, timeout. If you're renaming an existing field (e.g. token → apiKey), keep both as ConfigField entries for one major version — see axiom.ts and better-stack.ts for the deprecation pattern.
Wiring the drain into your framework
Once createMyServiceDrain() returns the drain, wire it like any other:
// server/plugins/evlog-drain.ts
import { createMyServiceDrain } from '~/server/utils/my-drain'
const drain = createMyServiceDrain()
export default defineNitroPlugin((nitroApp) => {
nitroApp.hooks.hook('evlog:drain', drain)
})
// lib/evlog.ts
import { createEvlog } from 'evlog/next'
import { createMyServiceDrain } from './my-drain'
export const { withEvlog, useLogger, log, createError } = createEvlog({
service: 'my-app',
drain: createMyServiceDrain(),
})
import { createMyServiceDrain } from './my-drain'
app.use(evlog({ drain: createMyServiceDrain() }))
await app.register(evlog, { drain: createMyServiceDrain() })
EvlogModule.forRoot({ drain: createMyServiceDrain() })
import { initLogger } from 'evlog'
import { createMyServiceDrain } from './my-drain'
initLogger({ drain: createMyServiceDrain() })
For production, wrap it once in createDrainPipeline so events are batched and retried.
Filtering and transforming events
encode() receives the full batch of WideEvent[] plus the resolved config. Filter or transform inline — returning null is a clean opt-out for that batch:
encode: (events, cfg) => {
const filtered = events.filter(e => e.level === 'error' && e.path !== '/health')
if (filtered.length === 0) return null
const payload = filtered.map(e => ({
ts: new Date(e.timestamp).getTime(),
severity: e.level.toUpperCase(),
attributes: { method: e.method, path: e.path, status: e.status, duration: e.duration },
}))
return {
url: `${cfg.endpoint}/v1/push`,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
}
}
defineDrain (non-HTTP transports)
If your destination requires gRPC, a vendor SDK, a queue client, a WebSocket, or a raw socket, drop one level lower with defineDrain. You own the transport; the toolkit still gives you config resolution, error isolation, and a consistent shape.
import { defineDrain } from 'evlog/toolkit'
export const createCustomTransportDrain = () =>
defineDrain<{ apiKey: string }>({
name: 'custom',
resolve: async () => ({ apiKey: process.env.MY_KEY! }),
send: async (events, cfg) => {
await myVendorSdk.publish(events, { token: cfg.apiKey })
},
})
When you fall back to defineDrain, follow the same rules manually that defineHttpDrain enforces: wrap the transport in try/catch, log with console.error('[evlog/<name>] …'), and never re-throw.
DrainContext reference
When evlog calls your drain through evlog:drain, it passes a DrainContext per event:
interface DrainContext {
/** The complete wide event with all accumulated context */
event: WideEvent
/** Request metadata */
request?: {
method: string
path: string
requestId: string
}
/** Safe HTTP headers (sensitive headers filtered) */
headers?: Record<string, string>
}
interface WideEvent {
timestamp: string
level: 'debug' | 'info' | 'warn' | 'error'
service: string
environment?: string
version?: string
region?: string
commitHash?: string
requestId?: string
// ... plus all fields added via log.set()
[key: string]: unknown
}
In the batched form your encode() / send() receives, you get WideEvent[] directly (the toolkit unwraps event from each context).
Toolkit helpers
evlog/toolkit exposes the same helpers every built-in adapter uses. The ones relevant to drains:
| Export | Purpose |
|---|---|
defineHttpDrain(spec) | The HTTP recipe — auto retries, timeouts, identity headers, error isolation |
defineDrain(spec) | Same contract for non-HTTP transports |
resolveAdapterConfig(ns, fields, overrides) | Standard config priority chain (overrides → runtimeConfig.evlog.<ns> → env) |
httpPost(opts) | The retried POST helper used by every built-in HTTP adapter — handles timeout, retries, redacted error messages |
composeDrains(drains) | Combine multiple drains into one (errors isolated, runs concurrently with Promise.allSettled) |
toTypedAttributeValue(value) | Convert any value to the typed attribute shape used by Axiom / Sentry |
toOtlpAttributeValue(value) | Convert any value to the OTLP AnyValue shape (used by OTLP / HyperDX / PostHog logs) |
OTEL_SEVERITY_NUMBER, OTEL_SEVERITY_TEXT | OTEL log severity tables |
Identity headers
defineHttpDrain automatically tags every request with two headers so receivers can identify the traffic:
| Header | Value |
|---|---|
User-Agent | evlog/<version> (Node / server runtimes only — browsers strip this header) |
X-Evlog-Source | The drain name you provided |
If you build a drain on top of httpPost directly, you can override or suppress them — see Identity headers.
Error handling — already done for you
defineHttpDrain enforces every best practice automatically:
- Never throws — failures are caught and logged with the
[evlog/<name>]prefix. - Retries — defaults to 2 attempts on transient errors (configurable via
retries). - Timeouts — defaults to 5000ms (configurable via
timeout). - Graceful degradation —
resolve()returningnullmakes the drain a no-op.
If you fall back to defineDrain, follow the same rules manually.
Publishing as a community package
Recommended structure for a community drain:
my-evlog-drain/
├─ src/
│ ├─ drain.ts # createMyDrain via defineHttpDrain
│ └─ index.ts # re-exports
├─ test/ # vitest, mock fetch
├─ package.json # peerDependency: "evlog"
└─ README.md
Add evlog as a peerDependency (not a dependency) — your package shouldn't pull in a copy of evlog at install time.
Next steps
- Drain Pipeline — wrap your drain in batch + retry + fanout for production
- Adapters Overview — see how the built-in adapters use
defineHttpDrain - Custom Enrichers — same toolkit shape for derived event fields
- Custom Framework Integration — same toolkit shape for HTTP frameworks
- Best Practices — security and production tips
Identity headers
Every drain request sent by evlog is tagged with User-Agent and X-Evlog-Source headers so receivers can identify and triage the traffic. Override or suppress them when your custom drain needs different identity.
Drain pipeline
Batch events, retry on failure, fan out to multiple destinations, and ship browser logs to your server. The shared pipeline that wraps every drain in production.