diff --git a/ingestion/.env.example b/ingestion/.env.example index 1362ae5..e3bc6b4 100644 --- a/ingestion/.env.example +++ b/ingestion/.env.example @@ -25,6 +25,8 @@ INGEST_WRITE_KEY_CACHE_TTL_SECONDS=45s INGEST_LOG_PAYLOAD_ON_SUCCESS=false INGEST_LOG_PAYLOAD_ON_ERROR=true INGEST_SHUTDOWN_TIMEOUT_SECONDS=30s +# 0 = unlimited (load testing only). Production: keep at 100 or per-tier. +INGEST_RATE_LIMIT_RPS=500 # Kafka topics KAFKA_TOPIC_INGEST=events.ingest diff --git a/ingestion/Makefile b/ingestion/Makefile index 3f5bace..16fe3cf 100644 --- a/ingestion/Makefile +++ b/ingestion/Makefile @@ -37,6 +37,7 @@ export CLICKHOUSE_ADDR CLICKHOUSE_DB CLICKHOUSE_USER CLICKHOUSE_PASSWORD CLICKHO export INGEST_HTTP_ADDR INGEST_LOG_LEVEL INGEST_PAYLOAD_LIMIT_KB INGEST_BATCH_LIMIT_KB export INGEST_LATE_EVENT_HOURS INGEST_DEDUP_TTL_HOURS INGEST_WRITE_KEY_CACHE_TTL_SECONDS export INGEST_LOG_PAYLOAD_ON_SUCCESS INGEST_LOG_PAYLOAD_ON_ERROR INGEST_SHUTDOWN_TIMEOUT_SECONDS +export INGEST_RATE_LIMIT_RPS export KAFKA_TOPIC_INGEST KAFKA_TOPIC_DLQ KAFKA_TOPIC_RETRY export BULKER_HTTP_ADDR BULKER_LOG_LEVEL BULKER_KAFKA_GROUP BULKER_BATCH_SIZE export BULKER_BATCH_INTERVAL_SECONDS BULKER_SHUTDOWN_TIMEOUT_SECONDS diff --git a/ingestion/console/src/pages/Live.tsx b/ingestion/console/src/pages/Live.tsx index b1932f8..bfefc99 100644 --- a/ingestion/console/src/pages/Live.tsx +++ b/ingestion/console/src/pages/Live.tsx @@ -1,21 +1,70 @@ -import { useState } from 'react'; +import { useEffect, useRef, useState } from 'react'; import { useMutation } from '@tanstack/react-query'; import { Button } from '@/components/ui/button'; import { Input } from '@/components/ui/input'; +import { Badge } from '@/components/ui/badge'; import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'; import { ApiError, ingest } from '@/api/client'; -interface LogEntry { - ts: string; - ok: boolean; - message: string; +interface LiveEvent { + topic: string; + partition: number; + offset: number; + timestamp: string; + event: { + workspace_id: string; + source_id: string; + message_id: string; + type: string; + user_id?: string; + anonymous_id?: string; + event?: string; + properties?: Record; + traits?: Record; + received_at: string; + }; } +const STREAM_URL = (import.meta.env.VITE_API_BASE_URL ?? '/api/ingest') + '/live/events'; +const MAX_ROWS = 200; + export function LivePage() { const [writeKey, setWriteKey] = useState('cdp_dev_writekey_1234567890'); - const [logs, setLogs] = useState([]); + const [filterType, setFilterType] = useState(''); + const [connected, setConnected] = useState(false); + const [events, setEvents] = useState([]); + const esRef = useRef(null); - const send = useMutation({ + useEffect(() => { + // Tear down any previous connection before opening a new one (handles + // filter changes via the dropdown below). + esRef.current?.close(); + + const params = new URLSearchParams(); + if (filterType) params.set('type', filterType); + const url = STREAM_URL + (params.toString() ? `?${params}` : ''); + + const es = new EventSource(url); + esRef.current = es; + + es.addEventListener('ingest', (e) => { + try { + const payload = JSON.parse((e as MessageEvent).data) as LiveEvent; + setEvents((prev) => [payload, ...prev].slice(0, MAX_ROWS)); + } catch { + /* drop */ + } + }); + es.onopen = () => setConnected(true); + es.onerror = () => setConnected(false); + + return () => { + es.close(); + setConnected(false); + }; + }, [filterType]); + + const sendTest = useMutation({ mutationFn: async () => ingest.track(writeKey, { type: 'track', @@ -24,49 +73,119 @@ export function LivePage() { event: 'Console Test', properties: { source: 'console', at: new Date().toISOString() }, }), - onSuccess: () => - setLogs((prev) => [{ ts: new Date().toLocaleTimeString(), ok: true, message: 'event accepted' }, ...prev].slice(0, 50)), onError: (err: ApiError) => - setLogs((prev) => [{ ts: new Date().toLocaleTimeString(), ok: false, message: `${err.status} ${err.message}` }, ...prev].slice(0, 50)), + // Surface as a fake live row so the user sees feedback even before + // the SSE round-trip lands. + setEvents((prev) => [ + { + topic: '(error)', + partition: -1, + offset: -1, + timestamp: new Date().toISOString(), + event: { + workspace_id: '', + source_id: '', + message_id: '', + type: 'error', + event: `${err.status} ${err.message}`, + received_at: new Date().toISOString(), + }, + }, + ...prev, + ].slice(0, MAX_ROWS)), }); return (
-
-

Live events

-

Send a synthetic event and watch the response.

+
+
+

Live events

+

+ Streaming from Kafka events.ingest via SSE. +

+
+ + {connected ? 'connected' : 'disconnected'} +
Send test event - Uses the dev write key by default. + Posts to /v1/track; the resulting event will appear below.
setWriteKey(e.target.value)} placeholder="write key" /> -
- - Log - - -
- {logs.length === 0 &&
— no events yet —
} - {logs.map((l, i) => ( -
- {l.ts} - {l.ok ? 'OK' : 'ERR'} - {l.message} -
- ))} + +
+ Stream + Newest first. Buffer max {MAX_ROWS} rows.
+
+ + +
+
+ + {events.length === 0 ? ( +
— waiting for events —
+ ) : ( + + + + + + + + + + + + + {events.map((row, i) => ( + + + + + + + + + ))} + +
timetypeeventuser / anonpartitionmessage_id
+ {new Date(row.timestamp).toLocaleTimeString()} + + + {row.event.type} + + {row.event.event ?? ''} + {row.event.user_id ?? row.event.anonymous_id ?? ''} + {row.partition === -1 ? '-' : `${row.partition}@${row.offset}`}{row.event.message_id}
+ )}
diff --git a/ingestion/ingest/cmd/server/main.go b/ingestion/ingest/cmd/server/main.go index c55c531..0bf916a 100644 --- a/ingestion/ingest/cmd/server/main.go +++ b/ingestion/ingest/cmd/server/main.go @@ -19,6 +19,7 @@ import ( "github.com/dbiz/cdp/ingestion/ingest/internal/dedup" "github.com/dbiz/cdp/ingestion/ingest/internal/handler" "github.com/dbiz/cdp/ingestion/ingest/internal/kafka" + "github.com/dbiz/cdp/ingestion/ingest/internal/live" mw "github.com/dbiz/cdp/ingestion/ingest/internal/middleware" "github.com/dbiz/cdp/ingestion/ingest/internal/ratelimit" "github.com/dbiz/cdp/ingestion/ingest/internal/repo" @@ -74,15 +75,19 @@ func run() error { authSvc := service.NewAuthService(writeKeyRepo, redisClient, cfg.WriteKeyCacheTTL, logger) ingestSvc := service.NewIngestService(service.IngestDeps{ - Producer: producer, - Limiter: ratelimit.New(redisClient), - Dedup: dedup.New(redisClient, time.Duration(cfg.DedupTTLHours)*time.Hour), - Schema: schemaRepo, - Log: logger, - LateAfter: time.Duration(cfg.LateEventHours) * time.Hour, + Producer: producer, + Limiter: ratelimit.New(redisClient), + Dedup: dedup.New(redisClient, time.Duration(cfg.DedupTTLHours)*time.Hour), + Schema: schemaRepo, + Log: logger, + LateAfter: time.Duration(cfg.LateEventHours) * time.Hour, + RateLimitRPS: cfg.RateLimitRPS, }) evHandler := handler.NewEventHandler(ingestSvc, logger) + liveStreamer := live.New(cfg.KafkaBrokers, cfg.KafkaTopicIngest, logger) + liveHandler := handler.NewLiveHandler(liveStreamer, logger) + // ---- HTTP router ------------------------------------------------------ r := chi.NewRouter() r.Use(mw.RequestID) @@ -95,6 +100,11 @@ func run() error { r.Get("/health", evHandler.Health) r.Get("/ready", evHandler.Ready) + // SSE stream of events flowing through Kafka. Intentionally outside the + // auth group so the console can subscribe without forwarding the write + // key; lock this down before production. + r.Get("/live/events", liveHandler.Stream) + // authenticated routes r.Group(func(rr chi.Router) { rr.Use(mw.Auth(authSvc)) @@ -121,8 +131,10 @@ func run() error { Handler: r, ReadHeaderTimeout: 5 * time.Second, ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 120 * time.Second, + // WriteTimeout intentionally 0 (no deadline) -- /live/events is a + // long-lived SSE stream. Per-handler deadlines apply via ctx. + WriteTimeout: 0, + IdleTimeout: 120 * time.Second, } // ---- graceful shutdown ------------------------------------------------ diff --git a/ingestion/ingest/internal/config/config.go b/ingestion/ingest/internal/config/config.go index 7d8d8e4..f2bcbce 100644 --- a/ingestion/ingest/internal/config/config.go +++ b/ingestion/ingest/internal/config/config.go @@ -23,6 +23,10 @@ type Config struct { LogPayloadOnSuccess bool `env:"INGEST_LOG_PAYLOAD_ON_SUCCESS" envDefault:"false"` LogPayloadOnError bool `env:"INGEST_LOG_PAYLOAD_ON_ERROR" envDefault:"true"` + // RateLimitRPS caps requests per workspace per second. 0 disables the + // limiter entirely (use for load tests; never in production). + RateLimitRPS int `env:"INGEST_RATE_LIMIT_RPS" envDefault:"100"` + PostgresDSN string `env:"POSTGRES_DSN,required"` RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"` diff --git a/ingestion/ingest/internal/handler/live.go b/ingestion/ingest/internal/handler/live.go new file mode 100644 index 0000000..a33ae9a --- /dev/null +++ b/ingestion/ingest/internal/handler/live.go @@ -0,0 +1,39 @@ +package handler + +import ( + "net/http" + + "go.uber.org/zap" + + "github.com/dbiz/cdp/ingestion/ingest/internal/live" +) + +// LiveHandler streams Kafka events over SSE for the console's Live page. +// +// Note: this endpoint is unauthenticated in the scaffold. Wire it behind the +// console session / a workspace token before exposing it publicly. +type LiveHandler struct { + stream *live.Streamer + log *zap.Logger +} + +func NewLiveHandler(s *live.Streamer, log *zap.Logger) *LiveHandler { + return &LiveHandler{stream: s, log: log} +} + +// Stream handles GET /live/events. Optional query params: +// +// ?workspace_id=... filter by workspace +// ?source_id=... filter by source +// ?type=track filter by event type (track|identify|page|group|...) +func (h *LiveHandler) Stream(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + flt := live.Filter{ + WorkspaceID: q.Get("workspace_id"), + SourceID: q.Get("source_id"), + EventType: q.Get("type"), + } + if err := h.stream.Stream(r.Context(), w, flt); err != nil { + h.log.Warn("live stream ended", zap.Error(err)) + } +} diff --git a/ingestion/ingest/internal/live/stream.go b/ingestion/ingest/internal/live/stream.go new file mode 100644 index 0000000..caac670 --- /dev/null +++ b/ingestion/ingest/internal/live/stream.go @@ -0,0 +1,176 @@ +// Package live streams events from the Kafka ingest topic over Server-Sent +// Events so the console can show what is flowing through the pipeline in +// real time. Each SSE connection spins up its own consumer group so the +// bulker's offsets are untouched. +package live + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/google/uuid" + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/zap" +) + +type Streamer struct { + brokers []string + topic string + log *zap.Logger +} + +func New(brokers []string, topic string, log *zap.Logger) *Streamer { + return &Streamer{brokers: brokers, topic: topic, log: log} +} + +// Filter narrows which records are forwarded. Empty values mean "no filter". +type Filter struct { + WorkspaceID string + SourceID string + EventType string // track | identify | page | group +} + +// Stream writes SSE frames to w until the request context is cancelled. It +// joins a fresh consumer group seeded at the latest offset so the client +// sees events that arrive *after* subscription (no replay of history). +func (s *Streamer) Stream(ctx context.Context, w http.ResponseWriter, flt Filter) error { + flusher, ok := w.(http.Flusher) + if !ok { + return fmt.Errorf("response writer does not support flushing") + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // disable nginx proxy buffering + w.WriteHeader(http.StatusOK) + + // Tell the client we're alive. + fmt.Fprintf(w, ": connected\n\n") + flusher.Flush() + + groupID := "live-" + uuid.NewString() + cl, err := kgo.NewClient( + kgo.SeedBrokers(s.brokers...), + kgo.ConsumerGroup(groupID), + kgo.ConsumeTopics(s.topic), + kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), + kgo.DisableAutoCommit(), // tail mode -- never commit + kgo.ClientID("cdp-live"), + ) + if err != nil { + return fmt.Errorf("kafka client: %w", err) + } + defer cl.Close() + + // keep-alive comments every 25s so proxies don't time out the connection. + keepAlive := time.NewTicker(25 * time.Second) + defer keepAlive.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-keepAlive.C: + fmt.Fprintf(w, ": keepalive\n\n") + flusher.Flush() + default: + } + + // Short poll so we react quickly to ctx cancel. + pollCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + fetches := cl.PollFetches(pollCtx) + cancel() + + if ctx.Err() != nil { + return nil + } + if errs := fetches.Errors(); len(errs) > 0 { + for _, e := range errs { + if e.Err == context.DeadlineExceeded || e.Err == context.Canceled { + continue + } + s.log.Warn("live fetch error", zap.Error(e.Err)) + } + } + + var stopped bool + fetches.EachRecord(func(r *kgo.Record) { + if stopped { + return + } + if !matches(r, flt) { + return + } + frame, err := sseFrame(r) + if err != nil { + return + } + if _, werr := w.Write(frame); werr != nil { + stopped = true + return + } + flusher.Flush() + }) + if stopped { + return nil + } + } +} + +// matches returns true when the record passes the filter. +func matches(r *kgo.Record, flt Filter) bool { + if flt.WorkspaceID == "" && flt.SourceID == "" && flt.EventType == "" { + return true + } + get := func(key string) string { + for _, h := range r.Headers { + if h.Key == key { + return string(h.Value) + } + } + return "" + } + if flt.WorkspaceID != "" && get("workspace_id") != flt.WorkspaceID { + return false + } + if flt.SourceID != "" && get("source_id") != flt.SourceID { + return false + } + if flt.EventType != "" && !strings.EqualFold(get("type"), flt.EventType) { + return false + } + return true +} + +// sseFrame builds an `event: ...\ndata: ...\n\n` block from a Kafka record. +func sseFrame(r *kgo.Record) ([]byte, error) { + // We pass the raw event value through; the console decodes it. + // Each frame also carries Kafka metadata under `meta`. + envelope := struct { + Topic string `json:"topic"` + Partition int32 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp time.Time `json:"timestamp"` + Event json.RawMessage `json:"event"` + }{ + Topic: r.Topic, + Partition: r.Partition, + Offset: r.Offset, + Timestamp: r.Timestamp, + Event: r.Value, + } + body, err := json.Marshal(envelope) + if err != nil { + return nil, err + } + out := make([]byte, 0, len(body)+16) + out = append(out, "event: ingest\ndata: "...) + out = append(out, body...) + out = append(out, '\n', '\n') + return out, nil +} diff --git a/ingestion/ingest/internal/middleware/middleware.go b/ingestion/ingest/internal/middleware/middleware.go index 20e1ed9..a211720 100644 --- a/ingestion/ingest/internal/middleware/middleware.go +++ b/ingestion/ingest/internal/middleware/middleware.go @@ -191,3 +191,11 @@ func (s *statusRecorder) WriteHeader(code int) { s.ResponseWriter.WriteHeader(code) } +// Flush delegates so SSE handlers can still call w.(http.Flusher).Flush() +// after the Logger middleware wraps the original ResponseWriter. +func (s *statusRecorder) Flush() { + if f, ok := s.ResponseWriter.(http.Flusher); ok { + f.Flush() + } +} + diff --git a/ingestion/ingest/internal/repo/schema_repo.go b/ingestion/ingest/internal/repo/schema_repo.go index 5452be6..cbc3134 100644 --- a/ingestion/ingest/internal/repo/schema_repo.go +++ b/ingestion/ingest/internal/repo/schema_repo.go @@ -2,19 +2,25 @@ package repo import ( "context" + "errors" "fmt" + "sync" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/dbiz/cdp/ingestion/ingest/internal/apperr" ) // SchemaRepo records the data type observed for each (workspace, event_type, field) -// triple. The bulker / analytics layer uses this to detect type conflicts. +// triple. The ingest hot path calls GetType per field to detect type conflicts, +// so we wrap PG with an in-memory cache. Cache misses fall through to PG; the +// resolved type (including the "not seen yet" empty string) is memoised. // -// In the ingest hot path we only *check* for conflict via UpsertField; the -// rebuild of the cached map is left to a background loader. We do not block -// the request waiting for upsert -- it is fire-and-forget. +// Cache invalidation: UpsertField writes through, so the writer also refreshes. +// Other ingest instances are eventually consistent -- a tier-1 PG conflict will +// surface on the next request that re-fetches. Acceptable for an append-only +// schema registry. type SchemaRepo interface { // GetType returns the recorded type, or "" if the field has never been seen. GetType(ctx context.Context, workspaceID, eventType, field string) (string, error) @@ -23,26 +29,71 @@ type SchemaRepo interface { } type schemaRepo struct { - db *pgxpool.Pool + db *pgxpool.Pool + cache *schemaCache } func NewSchemaRepo(db *pgxpool.Pool) SchemaRepo { - return &schemaRepo{db: db} + return &schemaRepo{ + db: db, + cache: newSchemaCache(), + } } +// --------------------------------------------------------------------------- +// cache +// --------------------------------------------------------------------------- + +type schemaCache struct { + mu sync.RWMutex + // "" means "looked up, never seen" -- distinct from "absent from cache". + data map[string]string +} + +func newSchemaCache() *schemaCache { + return &schemaCache{data: make(map[string]string, 256)} +} + +func (c *schemaCache) key(ws, et, field string) string { + return ws + "|" + et + "|" + field +} + +func (c *schemaCache) get(ws, et, field string) (string, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + v, ok := c.data[c.key(ws, et, field)] + return v, ok +} + +func (c *schemaCache) set(ws, et, field, dataType string) { + c.mu.Lock() + defer c.mu.Unlock() + c.data[c.key(ws, et, field)] = dataType +} + +// --------------------------------------------------------------------------- +// repo methods +// --------------------------------------------------------------------------- + func (r *schemaRepo) GetType(ctx context.Context, workspaceID, eventType, field string) (string, error) { + if v, ok := r.cache.get(workspaceID, eventType, field); ok { + return v, nil + } + const q = ` SELECT data_type FROM schema_fields WHERE workspace_id = $1::uuid AND event_type = $2 AND field = $3` var t string err := r.db.QueryRow(ctx, q, workspaceID, eventType, field).Scan(&t) + if errors.Is(err, pgx.ErrNoRows) { + // negative cache: avoid hammering PG for fields that don't exist yet. + r.cache.set(workspaceID, eventType, field, "") + return "", nil + } if err != nil { - // pgx.ErrNoRows → return "" with nil error so caller treats as new field - if err.Error() == "no rows in result set" { - return "", nil - } return "", apperr.Internal(fmt.Errorf("schema get: %w", err)) } + r.cache.set(workspaceID, eventType, field, t) return t, nil } @@ -53,9 +104,10 @@ func (r *schemaRepo) UpsertField(ctx context.Context, workspaceID, eventType, fi ON CONFLICT (workspace_id, event_type, field) DO UPDATE SET last_seen_at = now(), sample_count = schema_fields.sample_count + 1` - _, err := r.db.Exec(ctx, q, workspaceID, eventType, field, dataType) - if err != nil { + if _, err := r.db.Exec(ctx, q, workspaceID, eventType, field, dataType); err != nil { return apperr.Internal(fmt.Errorf("schema upsert: %w", err)) } + // Write-through: keep the local cache consistent with what we just stored. + r.cache.set(workspaceID, eventType, field, dataType) return nil } diff --git a/ingestion/ingest/internal/service/ingest.go b/ingestion/ingest/internal/service/ingest.go index cbaf839..c658ca3 100644 --- a/ingestion/ingest/internal/service/ingest.go +++ b/ingestion/ingest/internal/service/ingest.go @@ -25,32 +25,39 @@ type Producer interface { // IngestService is the core pipeline: validate → ratelimit → timestamp normalize // → late-check → dedup → flatten → schema-conflict → push Kafka. type IngestService struct { - producer Producer - limiter ratelimit.Limiter - dedup dedup.Dedup - schema repo.SchemaRepo - log *zap.Logger - lateAfter time.Duration + producer Producer + limiter ratelimit.Limiter + dedup dedup.Dedup + schema repo.SchemaRepo + log *zap.Logger + lateAfter time.Duration + rateLimitRPS int // 0 = unlimited } // IngestDeps groups dependencies for cleaner construction. type IngestDeps struct { - Producer Producer - Limiter ratelimit.Limiter - Dedup dedup.Dedup - Schema repo.SchemaRepo - Log *zap.Logger - LateAfter time.Duration + Producer Producer + Limiter ratelimit.Limiter + Dedup dedup.Dedup + Schema repo.SchemaRepo + Log *zap.Logger + LateAfter time.Duration + RateLimitRPS int } func NewIngestService(d IngestDeps) *IngestService { + rps := d.RateLimitRPS + if rps < 0 { + rps = 0 + } return &IngestService{ - producer: d.Producer, - limiter: d.Limiter, - dedup: d.Dedup, - schema: d.Schema, - log: d.Log, - lateAfter: d.LateAfter, + producer: d.Producer, + limiter: d.Limiter, + dedup: d.Dedup, + schema: d.Schema, + log: d.Log, + lateAfter: d.LateAfter, + rateLimitRPS: rps, } } @@ -67,14 +74,17 @@ type IngestContext struct { func (s *IngestService) Ingest(ctx context.Context, ictx IngestContext, raw *model.RawEvent) error { now := time.Now().UTC() - // 3. rate limit per workspace - dec, err := s.limiter.Allow(ctx, ictx.WorkspaceID, defaultTierLimit, time.Second) - if err != nil { - return apperr.Internal(err) - } - if !dec.Allowed { - retry := (dec.RetryAfterMS / 1000) + 1 - return apperr.TooManyRequests(retry) + // 3. rate limit per workspace (skip when RateLimitRPS == 0 -- intended + // for load testing only; do not run unbounded in production). + if s.rateLimitRPS > 0 { + dec, err := s.limiter.Allow(ctx, ictx.WorkspaceID, s.rateLimitRPS, time.Second) + if err != nil { + return apperr.Internal(err) + } + if !dec.Allowed { + retry := (dec.RetryAfterMS / 1000) + 1 + return apperr.TooManyRequests(retry) + } } // 4-5. timestamps + late-event check @@ -163,8 +173,6 @@ func (s *IngestService) IngestBatch(ctx context.Context, ictx IngestContext, bat // helpers // --------------------------------------------------------------------------- -const defaultTierLimit = 100 // rps; per-tier override comes from workspace.tier later - func derefTime(p *time.Time, fallback time.Time) time.Time { if p == nil || p.IsZero() { return fallback diff --git a/ingestion/rotor/package-lock.json b/ingestion/rotor/package-lock.json new file mode 100644 index 0000000..b289349 --- /dev/null +++ b/ingestion/rotor/package-lock.json @@ -0,0 +1,953 @@ +{ + "name": "cdp-rotor", + "version": "0.1.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "cdp-rotor", + "version": "0.1.0", + "dependencies": { + "fastify": "^4.28.1", + "isolated-vm": "^5.0.1", + "pino": "^9.4.0", + "zod": "^3.23.8" + }, + "devDependencies": {}, + "engines": { + "node": ">=20" + } + }, + "node_modules/@fastify/ajv-compiler": { + "version": "3.6.0", + "resolved": "https://registry.npmjs.org/@fastify/ajv-compiler/-/ajv-compiler-3.6.0.tgz", + "integrity": "sha512-LwdXQJjmMD+GwLOkP7TVC68qa+pSSogeWWmznRJ/coyTcfe9qA05AHFSe1eZFwK6q+xVRpChnvFUkf1iYaSZsQ==", + "license": "MIT", + "dependencies": { + "ajv": "^8.11.0", + "ajv-formats": "^2.1.1", + "fast-uri": "^2.0.0" + } + }, + "node_modules/@fastify/error": { + "version": "3.4.1", + "resolved": "https://registry.npmjs.org/@fastify/error/-/error-3.4.1.tgz", + "integrity": "sha512-wWSvph+29GR783IhmvdwWnN4bUxTD01Vm5Xad4i7i1VuAOItLvbPAb69sb0IQ2N57yprvhNIwAP5B6xfKTmjmQ==", + "license": "MIT" + }, + "node_modules/@fastify/fast-json-stringify-compiler": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/@fastify/fast-json-stringify-compiler/-/fast-json-stringify-compiler-4.3.0.tgz", + "integrity": "sha512-aZAXGYo6m22Fk1zZzEUKBvut/CIIQe/BapEORnxiD5Qr0kPHqqI69NtEMCme74h+at72sPhbkb4ZrLd1W3KRLA==", + "license": "MIT", + "dependencies": { + "fast-json-stringify": "^5.7.0" + } + }, + "node_modules/@fastify/merge-json-schemas": { + "version": "0.1.1", + "resolved": "https://registry.npmjs.org/@fastify/merge-json-schemas/-/merge-json-schemas-0.1.1.tgz", + "integrity": "sha512-fERDVz7topgNjtXsJTTW1JKLy0rhuLRcquYqNR9rF7OcVpCa2OVW49ZPDIhaRRCaUuvVxI+N416xUoF76HNSXA==", + "license": "MIT", + "dependencies": { + "fast-deep-equal": "^3.1.3" + } + }, + "node_modules/@pinojs/redact": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@pinojs/redact/-/redact-0.4.0.tgz", + "integrity": "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg==", + "license": "MIT" + }, + "node_modules/abstract-logging": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/abstract-logging/-/abstract-logging-2.0.1.tgz", + "integrity": "sha512-2BjRTZxTPvheOvGbBslFSYOUkr+SjPtOnrLP33f+VIWLzezQpZcqVg7ja3L4dBXmzzgwT+a029jRx5PCi3JuiA==", + "license": "MIT" + }, + "node_modules/ajv": { + "version": "8.20.0", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.20.0.tgz", + "integrity": "sha512-Thbli+OlOj+iMPYFBVBfJ3OmCAnaSyNn4M1vz9T6Gka5Jt9ba/HIR56joy65tY6kx/FCF5VXNB819Y7/GUrBGA==", + "license": "MIT", + "dependencies": { + "fast-deep-equal": "^3.1.3", + "fast-uri": "^3.0.1", + "json-schema-traverse": "^1.0.0", + "require-from-string": "^2.0.2" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/epoberezkin" + } + }, + "node_modules/ajv-formats": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/ajv-formats/-/ajv-formats-2.1.1.tgz", + "integrity": "sha512-Wx0Kx52hxE7C18hkMEggYlEifqWZtYaRgouJor+WMdPnQyEK13vgEWyVNup7SoeeoLMsr4kf5h6dOW11I15MUA==", + "license": "MIT", + "dependencies": { + "ajv": "^8.0.0" + }, + "peerDependencies": { + "ajv": "^8.0.0" + }, + "peerDependenciesMeta": { + "ajv": { + "optional": true + } + } + }, + "node_modules/ajv/node_modules/fast-uri": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/fast-uri/-/fast-uri-3.1.2.tgz", + "integrity": "sha512-rVjf7ArG3LTk+FS6Yw81V1DLuZl1bRbNrev6Tmd/9RaroeeRRJhAt7jg/6YFxbvAQXUCavSoZhPPj6oOx+5KjQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "BSD-3-Clause" + }, + "node_modules/atomic-sleep": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/atomic-sleep/-/atomic-sleep-1.0.0.tgz", + "integrity": "sha512-kNOjDqAh7px0XWNI+4QbzoiR/nTkHAWNud2uvnJquD1/x5a7EQZMJT0AczqK0Qn67oY/TTQ1LbUKajZpp3I9tQ==", + "license": "MIT", + "engines": { + "node": ">=8.0.0" + } + }, + "node_modules/avvio": { + "version": "8.4.0", + "resolved": "https://registry.npmjs.org/avvio/-/avvio-8.4.0.tgz", + "integrity": "sha512-CDSwaxINFy59iNwhYnkvALBwZiTydGkOecZyPkqBpABYR1KqGEsET0VOOYDwtleZSUIdeY36DC2bSZ24CO1igA==", + "license": "MIT", + "dependencies": { + "@fastify/error": "^3.3.0", + "fastq": "^1.17.1" + } + }, + "node_modules/base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, + "node_modules/bl": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", + "integrity": "sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==", + "license": "MIT", + "dependencies": { + "buffer": "^5.5.0", + "inherits": "^2.0.4", + "readable-stream": "^3.4.0" + } + }, + "node_modules/buffer": { + "version": "5.7.1", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", + "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT", + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.1.13" + } + }, + "node_modules/chownr": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", + "integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==", + "license": "ISC" + }, + "node_modules/cookie": { + "version": "0.7.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.7.2.tgz", + "integrity": "sha512-yki5XnKuf750l50uGTllt6kKILY4nQ1eNIQatoXEByZ5dWgnKqbnqmTrBE5B4N7lrMJKQ2ytWMiTO2o0v6Ew/w==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/decompress-response": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/decompress-response/-/decompress-response-6.0.0.tgz", + "integrity": "sha512-aW35yZM6Bb/4oJlZncMH2LCoZtJXTRxES17vE3hoRiowU2kWHaJKFkSBDnDR+cm9J+9QhXmREyIfv0pji9ejCQ==", + "license": "MIT", + "dependencies": { + "mimic-response": "^3.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/deep-extend": { + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/deep-extend/-/deep-extend-0.6.0.tgz", + "integrity": "sha512-LOHxIOaPYdHlJRtCQfDIVZtfw/ufM8+rVj649RIHzcm/vGwQRXFt6OPqIFWsm2XEMrNIEtWR64sY1LEKD2vAOA==", + "license": "MIT", + "engines": { + "node": ">=4.0.0" + } + }, + "node_modules/detect-libc": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.1.2.tgz", + "integrity": "sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==", + "license": "Apache-2.0", + "engines": { + "node": ">=8" + } + }, + "node_modules/end-of-stream": { + "version": "1.4.5", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.5.tgz", + "integrity": "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==", + "license": "MIT", + "dependencies": { + "once": "^1.4.0" + } + }, + "node_modules/expand-template": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/expand-template/-/expand-template-2.0.3.tgz", + "integrity": "sha512-XYfuKMvj4O35f/pOXLObndIRvyQ+/+6AhODh+OKWj9S9498pHHn/IMszH+gt0fBCRWMNfk1ZSp5x3AifmnI2vg==", + "license": "(MIT OR WTFPL)", + "engines": { + "node": ">=6" + } + }, + "node_modules/fast-content-type-parse": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/fast-content-type-parse/-/fast-content-type-parse-1.1.0.tgz", + "integrity": "sha512-fBHHqSTFLVnR61C+gltJuE5GkVQMV0S2nqUO8TJ+5Z3qAKG8vAx4FKai1s5jq/inV1+sREynIWSuQ6HgoSXpDQ==", + "license": "MIT" + }, + "node_modules/fast-decode-uri-component": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/fast-decode-uri-component/-/fast-decode-uri-component-1.0.1.tgz", + "integrity": "sha512-WKgKWg5eUxvRZGwW8FvfbaH7AXSh2cL+3j5fMGzUMCxWBJ3dV3a7Wz8y2f/uQ0e3B6WmodD3oS54jTQ9HVTIIg==", + "license": "MIT" + }, + "node_modules/fast-deep-equal": { + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", + "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", + "license": "MIT" + }, + "node_modules/fast-json-stringify": { + "version": "5.16.1", + "resolved": "https://registry.npmjs.org/fast-json-stringify/-/fast-json-stringify-5.16.1.tgz", + "integrity": "sha512-KAdnLvy1yu/XrRtP+LJnxbBGrhN+xXu+gt3EUvZhYGKCr3lFHq/7UFJHHFgmJKoqlh6B40bZLEv7w46B0mqn1g==", + "license": "MIT", + "dependencies": { + "@fastify/merge-json-schemas": "^0.1.0", + "ajv": "^8.10.0", + "ajv-formats": "^3.0.1", + "fast-deep-equal": "^3.1.3", + "fast-uri": "^2.1.0", + "json-schema-ref-resolver": "^1.0.1", + "rfdc": "^1.2.0" + } + }, + "node_modules/fast-json-stringify/node_modules/ajv-formats": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/ajv-formats/-/ajv-formats-3.0.1.tgz", + "integrity": "sha512-8iUql50EUR+uUcdRQ3HDqa6EVyo3docL8g5WJ3FNcWmu62IbkGUue/pEyLBW8VGKKucTPgqeks4fIU1DA4yowQ==", + "license": "MIT", + "dependencies": { + "ajv": "^8.0.0" + }, + "peerDependencies": { + "ajv": "^8.0.0" + }, + "peerDependenciesMeta": { + "ajv": { + "optional": true + } + } + }, + "node_modules/fast-querystring": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/fast-querystring/-/fast-querystring-1.1.2.tgz", + "integrity": "sha512-g6KuKWmFXc0fID8WWH0jit4g0AGBoJhCkJMb1RmbsSEUNvQ+ZC8D6CUZ+GtF8nMzSPXnhiePyyqqipzNNEnHjg==", + "license": "MIT", + "dependencies": { + "fast-decode-uri-component": "^1.0.1" + } + }, + "node_modules/fast-uri": { + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/fast-uri/-/fast-uri-2.4.0.tgz", + "integrity": "sha512-ypuAmmMKInk5q7XcepxlnUWDLWv4GFtaJqAzWKqn62IpQ3pejtr5dTVbt3vwqVaMKmkNR55sTT+CqUKIaT21BA==", + "license": "MIT" + }, + "node_modules/fastify": { + "version": "4.29.1", + "resolved": "https://registry.npmjs.org/fastify/-/fastify-4.29.1.tgz", + "integrity": "sha512-m2kMNHIG92tSNWv+Z3UeTR9AWLLuo7KctC7mlFPtMEVrfjIhmQhkQnT9v15qA/BfVq3vvj134Y0jl9SBje3jXQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT", + "dependencies": { + "@fastify/ajv-compiler": "^3.5.0", + "@fastify/error": "^3.4.0", + "@fastify/fast-json-stringify-compiler": "^4.3.0", + "abstract-logging": "^2.0.1", + "avvio": "^8.3.0", + "fast-content-type-parse": "^1.1.0", + "fast-json-stringify": "^5.8.0", + "find-my-way": "^8.0.0", + "light-my-request": "^5.11.0", + "pino": "^9.0.0", + "process-warning": "^3.0.0", + "proxy-addr": "^2.0.7", + "rfdc": "^1.3.0", + "secure-json-parse": "^2.7.0", + "semver": "^7.5.4", + "toad-cache": "^3.3.0" + } + }, + "node_modules/fastq": { + "version": "1.20.1", + "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.20.1.tgz", + "integrity": "sha512-GGToxJ/w1x32s/D2EKND7kTil4n8OVk/9mycTc4VDza13lOvpUZTGX3mFSCtV9ksdGBVzvsyAVLM6mHFThxXxw==", + "license": "ISC", + "dependencies": { + "reusify": "^1.0.4" + } + }, + "node_modules/find-my-way": { + "version": "8.2.2", + "resolved": "https://registry.npmjs.org/find-my-way/-/find-my-way-8.2.2.tgz", + "integrity": "sha512-Dobi7gcTEq8yszimcfp/R7+owiT4WncAJ7VTTgFH1jYJ5GaG1FbhjwDG820hptN0QDFvzVY3RfCzdInvGPGzjA==", + "license": "MIT", + "dependencies": { + "fast-deep-equal": "^3.1.3", + "fast-querystring": "^1.0.0", + "safe-regex2": "^3.1.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/forwarded": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/forwarded/-/forwarded-0.2.0.tgz", + "integrity": "sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/fs-constants": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", + "integrity": "sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow==", + "license": "MIT" + }, + "node_modules/github-from-package": { + "version": "0.0.0", + "resolved": "https://registry.npmjs.org/github-from-package/-/github-from-package-0.0.0.tgz", + "integrity": "sha512-SyHy3T1v2NUXn29OsWdxmK6RwHD+vkj3v8en8AOBZ1wBQ/hCAQ5bAQTD02kW4W9tUp/3Qh6J8r9EvntiyCmOOw==", + "license": "MIT" + }, + "node_modules/ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "BSD-3-Clause" + }, + "node_modules/inherits": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", + "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", + "license": "ISC" + }, + "node_modules/ini": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/ini/-/ini-1.3.8.tgz", + "integrity": "sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew==", + "license": "ISC" + }, + "node_modules/ipaddr.js": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", + "integrity": "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==", + "license": "MIT", + "engines": { + "node": ">= 0.10" + } + }, + "node_modules/isolated-vm": { + "version": "5.0.4", + "resolved": "https://registry.npmjs.org/isolated-vm/-/isolated-vm-5.0.4.tgz", + "integrity": "sha512-RYUf/JC4ldWz/oi2BVs8a1XIprQ71q6eQPBwySaF5Apu0KMyf2gIpElbCyPh2OEmRT+FYw1GOKSdkv7jw2KLxw==", + "hasInstallScript": true, + "license": "ISC", + "dependencies": { + "prebuild-install": "^7.1.2" + }, + "engines": { + "node": ">=18.0.0" + } + }, + "node_modules/json-schema-ref-resolver": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/json-schema-ref-resolver/-/json-schema-ref-resolver-1.0.1.tgz", + "integrity": "sha512-EJAj1pgHc1hxF6vo2Z3s69fMjO1INq6eGHXZ8Z6wCQeldCuwxGK9Sxf4/cScGn3FZubCVUehfWtcDM/PLteCQw==", + "license": "MIT", + "dependencies": { + "fast-deep-equal": "^3.1.3" + } + }, + "node_modules/json-schema-traverse": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", + "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==", + "license": "MIT" + }, + "node_modules/light-my-request": { + "version": "5.14.0", + "resolved": "https://registry.npmjs.org/light-my-request/-/light-my-request-5.14.0.tgz", + "integrity": "sha512-aORPWntbpH5esaYpGOOmri0OHDOe3wC5M2MQxZ9dvMLZm6DnaAn0kJlcbU9hwsQgLzmZyReKwFwwPkR+nHu5kA==", + "license": "BSD-3-Clause", + "dependencies": { + "cookie": "^0.7.0", + "process-warning": "^3.0.0", + "set-cookie-parser": "^2.4.1" + } + }, + "node_modules/mimic-response": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/mimic-response/-/mimic-response-3.1.0.tgz", + "integrity": "sha512-z0yWI+4FDrrweS8Zmt4Ej5HdJmky15+L2e6Wgn3+iK5fWzb6T3fhNFq2+MeTRb064c6Wr4N/wv0DzQTjNzHNGQ==", + "license": "MIT", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/minimist": { + "version": "1.2.8", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz", + "integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==", + "license": "MIT", + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, + "node_modules/mkdirp-classic": { + "version": "0.5.3", + "resolved": "https://registry.npmjs.org/mkdirp-classic/-/mkdirp-classic-0.5.3.tgz", + "integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==", + "license": "MIT" + }, + "node_modules/napi-build-utils": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/napi-build-utils/-/napi-build-utils-2.0.0.tgz", + "integrity": "sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA==", + "license": "MIT" + }, + "node_modules/node-abi": { + "version": "3.92.0", + "resolved": "https://registry.npmjs.org/node-abi/-/node-abi-3.92.0.tgz", + "integrity": "sha512-KdHvFWZjEKDf0cakgFjebl371GPsISX2oZHcuyKqM7DtogIsHrqKeLTo8wBHxaXRAQlY2PsPlZmfo+9ZCxEREQ==", + "license": "MIT", + "dependencies": { + "semver": "^7.3.5" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/on-exit-leak-free": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/on-exit-leak-free/-/on-exit-leak-free-2.1.2.tgz", + "integrity": "sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA==", + "license": "MIT", + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/once": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", + "integrity": "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==", + "license": "ISC", + "dependencies": { + "wrappy": "1" + } + }, + "node_modules/pino": { + "version": "9.14.0", + "resolved": "https://registry.npmjs.org/pino/-/pino-9.14.0.tgz", + "integrity": "sha512-8OEwKp5juEvb/MjpIc4hjqfgCNysrS94RIOMXYvpYCdm/jglrKEiAYmiumbmGhCvs+IcInsphYDFwqrjr7398w==", + "license": "MIT", + "dependencies": { + "@pinojs/redact": "^0.4.0", + "atomic-sleep": "^1.0.0", + "on-exit-leak-free": "^2.1.0", + "pino-abstract-transport": "^2.0.0", + "pino-std-serializers": "^7.0.0", + "process-warning": "^5.0.0", + "quick-format-unescaped": "^4.0.3", + "real-require": "^0.2.0", + "safe-stable-stringify": "^2.3.1", + "sonic-boom": "^4.0.1", + "thread-stream": "^3.0.0" + }, + "bin": { + "pino": "bin.js" + } + }, + "node_modules/pino-abstract-transport": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/pino-abstract-transport/-/pino-abstract-transport-2.0.0.tgz", + "integrity": "sha512-F63x5tizV6WCh4R6RHyi2Ml+M70DNRXt/+HANowMflpgGFMAym/VKm6G7ZOQRjqN7XbGxK1Lg9t6ZrtzOaivMw==", + "license": "MIT", + "dependencies": { + "split2": "^4.0.0" + } + }, + "node_modules/pino-std-serializers": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/pino-std-serializers/-/pino-std-serializers-7.1.0.tgz", + "integrity": "sha512-BndPH67/JxGExRgiX1dX0w1FvZck5Wa4aal9198SrRhZjH3GxKQUKIBnYJTdj2HDN3UQAS06HlfcSbQj2OHmaw==", + "license": "MIT" + }, + "node_modules/pino/node_modules/process-warning": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/process-warning/-/process-warning-5.0.0.tgz", + "integrity": "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT" + }, + "node_modules/prebuild-install": { + "version": "7.1.3", + "resolved": "https://registry.npmjs.org/prebuild-install/-/prebuild-install-7.1.3.tgz", + "integrity": "sha512-8Mf2cbV7x1cXPUILADGI3wuhfqWvtiLA1iclTDbFRZkgRQS0NqsPZphna9V+HyTEadheuPmjaJMsbzKQFOzLug==", + "deprecated": "No longer maintained. Please contact the author of the relevant native addon; alternatives are available.", + "license": "MIT", + "dependencies": { + "detect-libc": "^2.0.0", + "expand-template": "^2.0.3", + "github-from-package": "0.0.0", + "minimist": "^1.2.3", + "mkdirp-classic": "^0.5.3", + "napi-build-utils": "^2.0.0", + "node-abi": "^3.3.0", + "pump": "^3.0.0", + "rc": "^1.2.7", + "simple-get": "^4.0.0", + "tar-fs": "^2.0.0", + "tunnel-agent": "^0.6.0" + }, + "bin": { + "prebuild-install": "bin.js" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/process-warning": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/process-warning/-/process-warning-3.0.0.tgz", + "integrity": "sha512-mqn0kFRl0EoqhnL0GQ0veqFHyIN1yig9RHh/InzORTUiZHFRAur+aMtRkELNwGs9aNwKS6tg/An4NYBPGwvtzQ==", + "license": "MIT" + }, + "node_modules/proxy-addr": { + "version": "2.0.7", + "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", + "integrity": "sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg==", + "license": "MIT", + "dependencies": { + "forwarded": "0.2.0", + "ipaddr.js": "1.9.1" + }, + "engines": { + "node": ">= 0.10" + } + }, + "node_modules/pump": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.4.tgz", + "integrity": "sha512-VS7sjc6KR7e1ukRFhQSY5LM2uBWAUPiOPa/A3mkKmiMwSmRFUITt0xuj+/lesgnCv+dPIEYlkzrcyXgquIHMcA==", + "license": "MIT", + "dependencies": { + "end-of-stream": "^1.1.0", + "once": "^1.3.1" + } + }, + "node_modules/quick-format-unescaped": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/quick-format-unescaped/-/quick-format-unescaped-4.0.4.tgz", + "integrity": "sha512-tYC1Q1hgyRuHgloV/YXs2w15unPVh8qfu/qCTfhTYamaw7fyhumKa2yGpdSo87vY32rIclj+4fWYQXUMs9EHvg==", + "license": "MIT" + }, + "node_modules/rc": { + "version": "1.2.8", + "resolved": "https://registry.npmjs.org/rc/-/rc-1.2.8.tgz", + "integrity": "sha512-y3bGgqKj3QBdxLbLkomlohkvsA8gdAiUQlSBJnBhfn+BPxg4bc62d8TcBW15wavDfgexCgccckhcZvywyQYPOw==", + "license": "(BSD-2-Clause OR MIT OR Apache-2.0)", + "dependencies": { + "deep-extend": "^0.6.0", + "ini": "~1.3.0", + "minimist": "^1.2.0", + "strip-json-comments": "~2.0.1" + }, + "bin": { + "rc": "cli.js" + } + }, + "node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "license": "MIT", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/real-require": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/real-require/-/real-require-0.2.0.tgz", + "integrity": "sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==", + "license": "MIT", + "engines": { + "node": ">= 12.13.0" + } + }, + "node_modules/require-from-string": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz", + "integrity": "sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/ret": { + "version": "0.4.3", + "resolved": "https://registry.npmjs.org/ret/-/ret-0.4.3.tgz", + "integrity": "sha512-0f4Memo5QP7WQyUEAYUO3esD/XjOc3Zjjg5CPsAq1p8sIu0XPeMbHJemKA0BO7tV0X7+A0FoEpbmHXWxPyD3wQ==", + "license": "MIT", + "engines": { + "node": ">=10" + } + }, + "node_modules/reusify": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.1.0.tgz", + "integrity": "sha512-g6QUff04oZpHs0eG5p83rFLhHeV00ug/Yf9nZM6fLeUrPguBTkTQOdpAWWspMh55TZfVQDPaN3NQJfbVRAxdIw==", + "license": "MIT", + "engines": { + "iojs": ">=1.0.0", + "node": ">=0.10.0" + } + }, + "node_modules/rfdc": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.4.1.tgz", + "integrity": "sha512-q1b3N5QkRUWUl7iyylaaj3kOpIT0N2i9MqIEQXP73GVsN9cw3fdx8X63cEmWhJGi2PPCF23Ijp7ktmd39rawIA==", + "license": "MIT" + }, + "node_modules/safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, + "node_modules/safe-regex2": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/safe-regex2/-/safe-regex2-3.1.0.tgz", + "integrity": "sha512-RAAZAGbap2kBfbVhvmnTFv73NWLMvDGOITFYTZBAaY8eR+Ir4ef7Up/e7amo+y1+AH+3PtLkrt9mvcTsG9LXug==", + "license": "MIT", + "dependencies": { + "ret": "~0.4.0" + } + }, + "node_modules/safe-stable-stringify": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/safe-stable-stringify/-/safe-stable-stringify-2.5.0.tgz", + "integrity": "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA==", + "license": "MIT", + "engines": { + "node": ">=10" + } + }, + "node_modules/secure-json-parse": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/secure-json-parse/-/secure-json-parse-2.7.0.tgz", + "integrity": "sha512-6aU+Rwsezw7VR8/nyvKTx8QpWH9FrcYiXXlqC4z5d5XQBDRqtbfsRjnwGyqbi3gddNtWHuEk9OANUotL26qKUw==", + "license": "BSD-3-Clause" + }, + "node_modules/semver": { + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.8.1.tgz", + "integrity": "sha512-rkVq3IXh+4FDGch+KwzX3aV9W3kO54GyEgpvBzSyctDA6Xtd7RJQV1xmXbeQp5v7+VzLOfVqiutSE6GICgPFvg==", + "license": "ISC", + "bin": { + "semver": "bin/semver.js" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/set-cookie-parser": { + "version": "2.7.2", + "resolved": "https://registry.npmjs.org/set-cookie-parser/-/set-cookie-parser-2.7.2.tgz", + "integrity": "sha512-oeM1lpU/UvhTxw+g3cIfxXHyJRc/uidd3yK1P242gzHds0udQBYzs3y8j4gCCW+ZJ7ad0yctld8RYO+bdurlvw==", + "license": "MIT" + }, + "node_modules/simple-concat": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/simple-concat/-/simple-concat-1.0.1.tgz", + "integrity": "sha512-cSFtAPtRhljv69IK0hTVZQ+OfE9nePi/rtJmw5UjHeVyVroEqJXP1sFztKUy1qU+xvz3u/sfYJLa947b7nAN2Q==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, + "node_modules/simple-get": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-4.0.1.tgz", + "integrity": "sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT", + "dependencies": { + "decompress-response": "^6.0.0", + "once": "^1.3.1", + "simple-concat": "^1.0.0" + } + }, + "node_modules/sonic-boom": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-4.2.1.tgz", + "integrity": "sha512-w6AxtubXa2wTXAUsZMMWERrsIRAdrK0Sc+FUytWvYAhBJLyuI4llrMIC1DtlNSdI99EI86KZum2MMq3EAZlF9Q==", + "license": "MIT", + "dependencies": { + "atomic-sleep": "^1.0.0" + } + }, + "node_modules/split2": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz", + "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==", + "license": "ISC", + "engines": { + "node": ">= 10.x" + } + }, + "node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "license": "MIT", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, + "node_modules/strip-json-comments": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-2.0.1.tgz", + "integrity": "sha512-4gB8na07fecVVkOI6Rs4e7T6NOTki5EmL7TUduTs6bu3EdnSycntVJ4re8kgZA+wx9IueI2Y11bfbgwtzuE0KQ==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/tar-fs": { + "version": "2.1.4", + "resolved": "https://registry.npmjs.org/tar-fs/-/tar-fs-2.1.4.tgz", + "integrity": "sha512-mDAjwmZdh7LTT6pNleZ05Yt65HC3E+NiQzl672vQG38jIrehtJk/J3mNwIg+vShQPcLF/LV7CMnDW6vjj6sfYQ==", + "license": "MIT", + "dependencies": { + "chownr": "^1.1.1", + "mkdirp-classic": "^0.5.2", + "pump": "^3.0.0", + "tar-stream": "^2.1.4" + } + }, + "node_modules/tar-stream": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/tar-stream/-/tar-stream-2.2.0.tgz", + "integrity": "sha512-ujeqbceABgwMZxEJnk2HDY2DlnUZ+9oEcb1KzTVfYHio0UE6dG71n60d8D2I4qNvleWrrXpmjpt7vZeF1LnMZQ==", + "license": "MIT", + "dependencies": { + "bl": "^4.0.3", + "end-of-stream": "^1.4.1", + "fs-constants": "^1.0.0", + "inherits": "^2.0.3", + "readable-stream": "^3.1.1" + }, + "engines": { + "node": ">=6" + } + }, + "node_modules/thread-stream": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/thread-stream/-/thread-stream-3.1.0.tgz", + "integrity": "sha512-OqyPZ9u96VohAyMfJykzmivOrY2wfMSf3C5TtFJVgN+Hm6aj+voFhlK+kZEIv2FBh1X6Xp3DlnCOfEQ3B2J86A==", + "license": "MIT", + "dependencies": { + "real-require": "^0.2.0" + } + }, + "node_modules/toad-cache": { + "version": "3.7.1", + "resolved": "https://registry.npmjs.org/toad-cache/-/toad-cache-3.7.1.tgz", + "integrity": "sha512-5DXWzE4Vz7xNHsv+xQ+MGfJYyC78Aok3tEr0MNwHoRf7vZnga1mQXZ4/Nsodld4VR6Wd+VhfmqnNrsRJyYPfrQ==", + "license": "MIT", + "engines": { + "node": ">=20" + } + }, + "node_modules/tunnel-agent": { + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/tunnel-agent/-/tunnel-agent-0.6.0.tgz", + "integrity": "sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w==", + "license": "Apache-2.0", + "dependencies": { + "safe-buffer": "^5.0.1" + }, + "engines": { + "node": "*" + } + }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", + "license": "MIT" + }, + "node_modules/wrappy": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", + "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", + "license": "ISC" + }, + "node_modules/zod": { + "version": "3.25.76", + "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", + "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", + "license": "MIT", + "funding": { + "url": "https://github.com/sponsors/colinhacks" + } + } + } +} diff --git a/ingestion/tests/k6/track.js b/ingestion/tests/k6/track.js new file mode 100644 index 0000000..79d3970 --- /dev/null +++ b/ingestion/tests/k6/track.js @@ -0,0 +1,83 @@ +// k6 load test — POST /v1/track against the local cdp-ingest service. +// +// Usage: +// brew install k6 # one-time +// k6 run tests/k6/track.js # defaults: 50 CCU, 1m +// +// Override at the CLI: +// k6 run -e WRITE_KEY=xxx -e BASE=http://localhost:3049 \ +// -e VUS=100 -e DURATION=2m tests/k6/track.js + +import http from 'k6/http'; +import { check } from 'k6'; +import encoding from 'k6/encoding'; + +const BASE = __ENV.BASE ?? 'http://localhost:3049'; +const WRITE_KEY = __ENV.WRITE_KEY ?? 'cdp_dev_writekey_1234567890'; +const VUS = parseInt(__ENV.VUS ?? '50', 10); +const DURATION = __ENV.DURATION ?? '1m'; + +// Segment-compatible auth: Basic base64(writeKey + ":") +const AUTH = 'Basic ' + encoding.b64encode(`${WRITE_KEY}:`); + +export const options = { + scenarios: { + constant_load: { + executor: 'constant-vus', + vus: VUS, + duration: DURATION, + }, + }, + thresholds: { + http_req_failed: ['rate<0.01'], // < 1% errors + http_req_duration: ['p(95)<300', 'p(99)<800'], + checks: ['rate>0.99'], + }, +}; + +export default function () { + const now = new Date().toISOString(); + const messageId = `k6_${__VU}_${__ITER}_${Date.now()}`; + + const payload = JSON.stringify({ + type: 'track', + messageId, + anonymousId: `anon_${__VU}`, + userId: `user_${__VU}@example.com`, + event: 'k6 Test Event', + properties: { + testProp: 'load test', + vu: __VU, + iter: __ITER, + price: 42.5, + }, + timestamp: now, + sentAt: now, + context: { + library_name: 'k6', + library_version: '0.1.0', + ip: '127.0.0.1', + userAgent: 'k6/loadtest', + locale: 'en-US', + page: { + path: '/', + host: 'example.com', + title: 'Example page', + url: 'https://example.com/', + }, + }, + }); + + const res = http.post(`${BASE}/v1/track`, payload, { + headers: { + 'Content-Type': 'application/json', + Authorization: AUTH, + }, + }); + + check(res, { + 'status 200': (r) => r.status === 200, + 'body ok': (r) => r.json('ok') === true, + 'fast (<500ms)': (r) => r.timings.duration < 500, + }); +}