This commit is contained in:
2026-05-25 11:00:13 +07:00
parent c5e980aa52
commit 81ba67f346
12 changed files with 1534 additions and 77 deletions

View File

@@ -23,6 +23,10 @@ type Config struct {
LogPayloadOnSuccess bool `env:"INGEST_LOG_PAYLOAD_ON_SUCCESS" envDefault:"false"`
LogPayloadOnError bool `env:"INGEST_LOG_PAYLOAD_ON_ERROR" envDefault:"true"`
// RateLimitRPS caps requests per workspace per second. 0 disables the
// limiter entirely (use for load tests; never in production).
RateLimitRPS int `env:"INGEST_RATE_LIMIT_RPS" envDefault:"100"`
PostgresDSN string `env:"POSTGRES_DSN,required"`
RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"`

View File

@@ -0,0 +1,39 @@
package handler
import (
"net/http"
"go.uber.org/zap"
"github.com/dbiz/cdp/ingestion/ingest/internal/live"
)
// LiveHandler streams Kafka events over SSE for the console's Live page.
//
// Note: this endpoint is unauthenticated in the scaffold. Wire it behind the
// console session / a workspace token before exposing it publicly.
type LiveHandler struct {
stream *live.Streamer
log *zap.Logger
}
func NewLiveHandler(s *live.Streamer, log *zap.Logger) *LiveHandler {
return &LiveHandler{stream: s, log: log}
}
// Stream handles GET /live/events. Optional query params:
//
// ?workspace_id=... filter by workspace
// ?source_id=... filter by source
// ?type=track filter by event type (track|identify|page|group|...)
func (h *LiveHandler) Stream(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
flt := live.Filter{
WorkspaceID: q.Get("workspace_id"),
SourceID: q.Get("source_id"),
EventType: q.Get("type"),
}
if err := h.stream.Stream(r.Context(), w, flt); err != nil {
h.log.Warn("live stream ended", zap.Error(err))
}
}

View File

@@ -0,0 +1,176 @@
// Package live streams events from the Kafka ingest topic over Server-Sent
// Events so the console can show what is flowing through the pipeline in
// real time. Each SSE connection spins up its own consumer group so the
// bulker's offsets are untouched.
package live
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
)
type Streamer struct {
brokers []string
topic string
log *zap.Logger
}
func New(brokers []string, topic string, log *zap.Logger) *Streamer {
return &Streamer{brokers: brokers, topic: topic, log: log}
}
// Filter narrows which records are forwarded. Empty values mean "no filter".
type Filter struct {
WorkspaceID string
SourceID string
EventType string // track | identify | page | group
}
// Stream writes SSE frames to w until the request context is cancelled. It
// joins a fresh consumer group seeded at the latest offset so the client
// sees events that arrive *after* subscription (no replay of history).
func (s *Streamer) Stream(ctx context.Context, w http.ResponseWriter, flt Filter) error {
flusher, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("response writer does not support flushing")
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // disable nginx proxy buffering
w.WriteHeader(http.StatusOK)
// Tell the client we're alive.
fmt.Fprintf(w, ": connected\n\n")
flusher.Flush()
groupID := "live-" + uuid.NewString()
cl, err := kgo.NewClient(
kgo.SeedBrokers(s.brokers...),
kgo.ConsumerGroup(groupID),
kgo.ConsumeTopics(s.topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
kgo.DisableAutoCommit(), // tail mode -- never commit
kgo.ClientID("cdp-live"),
)
if err != nil {
return fmt.Errorf("kafka client: %w", err)
}
defer cl.Close()
// keep-alive comments every 25s so proxies don't time out the connection.
keepAlive := time.NewTicker(25 * time.Second)
defer keepAlive.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-keepAlive.C:
fmt.Fprintf(w, ": keepalive\n\n")
flusher.Flush()
default:
}
// Short poll so we react quickly to ctx cancel.
pollCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
fetches := cl.PollFetches(pollCtx)
cancel()
if ctx.Err() != nil {
return nil
}
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if e.Err == context.DeadlineExceeded || e.Err == context.Canceled {
continue
}
s.log.Warn("live fetch error", zap.Error(e.Err))
}
}
var stopped bool
fetches.EachRecord(func(r *kgo.Record) {
if stopped {
return
}
if !matches(r, flt) {
return
}
frame, err := sseFrame(r)
if err != nil {
return
}
if _, werr := w.Write(frame); werr != nil {
stopped = true
return
}
flusher.Flush()
})
if stopped {
return nil
}
}
}
// matches returns true when the record passes the filter.
func matches(r *kgo.Record, flt Filter) bool {
if flt.WorkspaceID == "" && flt.SourceID == "" && flt.EventType == "" {
return true
}
get := func(key string) string {
for _, h := range r.Headers {
if h.Key == key {
return string(h.Value)
}
}
return ""
}
if flt.WorkspaceID != "" && get("workspace_id") != flt.WorkspaceID {
return false
}
if flt.SourceID != "" && get("source_id") != flt.SourceID {
return false
}
if flt.EventType != "" && !strings.EqualFold(get("type"), flt.EventType) {
return false
}
return true
}
// sseFrame builds an `event: ...\ndata: ...\n\n` block from a Kafka record.
func sseFrame(r *kgo.Record) ([]byte, error) {
// We pass the raw event value through; the console decodes it.
// Each frame also carries Kafka metadata under `meta`.
envelope := struct {
Topic string `json:"topic"`
Partition int32 `json:"partition"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
Event json.RawMessage `json:"event"`
}{
Topic: r.Topic,
Partition: r.Partition,
Offset: r.Offset,
Timestamp: r.Timestamp,
Event: r.Value,
}
body, err := json.Marshal(envelope)
if err != nil {
return nil, err
}
out := make([]byte, 0, len(body)+16)
out = append(out, "event: ingest\ndata: "...)
out = append(out, body...)
out = append(out, '\n', '\n')
return out, nil
}

View File

@@ -191,3 +191,11 @@ func (s *statusRecorder) WriteHeader(code int) {
s.ResponseWriter.WriteHeader(code)
}
// Flush delegates so SSE handlers can still call w.(http.Flusher).Flush()
// after the Logger middleware wraps the original ResponseWriter.
func (s *statusRecorder) Flush() {
if f, ok := s.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}

View File

@@ -2,19 +2,25 @@ package repo
import (
"context"
"errors"
"fmt"
"sync"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/dbiz/cdp/ingestion/ingest/internal/apperr"
)
// SchemaRepo records the data type observed for each (workspace, event_type, field)
// triple. The bulker / analytics layer uses this to detect type conflicts.
// triple. The ingest hot path calls GetType per field to detect type conflicts,
// so we wrap PG with an in-memory cache. Cache misses fall through to PG; the
// resolved type (including the "not seen yet" empty string) is memoised.
//
// In the ingest hot path we only *check* for conflict via UpsertField; the
// rebuild of the cached map is left to a background loader. We do not block
// the request waiting for upsert -- it is fire-and-forget.
// Cache invalidation: UpsertField writes through, so the writer also refreshes.
// Other ingest instances are eventually consistent -- a tier-1 PG conflict will
// surface on the next request that re-fetches. Acceptable for an append-only
// schema registry.
type SchemaRepo interface {
// GetType returns the recorded type, or "" if the field has never been seen.
GetType(ctx context.Context, workspaceID, eventType, field string) (string, error)
@@ -23,26 +29,71 @@ type SchemaRepo interface {
}
type schemaRepo struct {
db *pgxpool.Pool
db *pgxpool.Pool
cache *schemaCache
}
func NewSchemaRepo(db *pgxpool.Pool) SchemaRepo {
return &schemaRepo{db: db}
return &schemaRepo{
db: db,
cache: newSchemaCache(),
}
}
// ---------------------------------------------------------------------------
// cache
// ---------------------------------------------------------------------------
type schemaCache struct {
mu sync.RWMutex
// "" means "looked up, never seen" -- distinct from "absent from cache".
data map[string]string
}
func newSchemaCache() *schemaCache {
return &schemaCache{data: make(map[string]string, 256)}
}
func (c *schemaCache) key(ws, et, field string) string {
return ws + "|" + et + "|" + field
}
func (c *schemaCache) get(ws, et, field string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.data[c.key(ws, et, field)]
return v, ok
}
func (c *schemaCache) set(ws, et, field, dataType string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[c.key(ws, et, field)] = dataType
}
// ---------------------------------------------------------------------------
// repo methods
// ---------------------------------------------------------------------------
func (r *schemaRepo) GetType(ctx context.Context, workspaceID, eventType, field string) (string, error) {
if v, ok := r.cache.get(workspaceID, eventType, field); ok {
return v, nil
}
const q = `
SELECT data_type FROM schema_fields
WHERE workspace_id = $1::uuid AND event_type = $2 AND field = $3`
var t string
err := r.db.QueryRow(ctx, q, workspaceID, eventType, field).Scan(&t)
if errors.Is(err, pgx.ErrNoRows) {
// negative cache: avoid hammering PG for fields that don't exist yet.
r.cache.set(workspaceID, eventType, field, "")
return "", nil
}
if err != nil {
// pgx.ErrNoRows → return "" with nil error so caller treats as new field
if err.Error() == "no rows in result set" {
return "", nil
}
return "", apperr.Internal(fmt.Errorf("schema get: %w", err))
}
r.cache.set(workspaceID, eventType, field, t)
return t, nil
}
@@ -53,9 +104,10 @@ func (r *schemaRepo) UpsertField(ctx context.Context, workspaceID, eventType, fi
ON CONFLICT (workspace_id, event_type, field) DO UPDATE
SET last_seen_at = now(),
sample_count = schema_fields.sample_count + 1`
_, err := r.db.Exec(ctx, q, workspaceID, eventType, field, dataType)
if err != nil {
if _, err := r.db.Exec(ctx, q, workspaceID, eventType, field, dataType); err != nil {
return apperr.Internal(fmt.Errorf("schema upsert: %w", err))
}
// Write-through: keep the local cache consistent with what we just stored.
r.cache.set(workspaceID, eventType, field, dataType)
return nil
}

View File

@@ -25,32 +25,39 @@ type Producer interface {
// IngestService is the core pipeline: validate → ratelimit → timestamp normalize
// → late-check → dedup → flatten → schema-conflict → push Kafka.
type IngestService struct {
producer Producer
limiter ratelimit.Limiter
dedup dedup.Dedup
schema repo.SchemaRepo
log *zap.Logger
lateAfter time.Duration
producer Producer
limiter ratelimit.Limiter
dedup dedup.Dedup
schema repo.SchemaRepo
log *zap.Logger
lateAfter time.Duration
rateLimitRPS int // 0 = unlimited
}
// IngestDeps groups dependencies for cleaner construction.
type IngestDeps struct {
Producer Producer
Limiter ratelimit.Limiter
Dedup dedup.Dedup
Schema repo.SchemaRepo
Log *zap.Logger
LateAfter time.Duration
Producer Producer
Limiter ratelimit.Limiter
Dedup dedup.Dedup
Schema repo.SchemaRepo
Log *zap.Logger
LateAfter time.Duration
RateLimitRPS int
}
func NewIngestService(d IngestDeps) *IngestService {
rps := d.RateLimitRPS
if rps < 0 {
rps = 0
}
return &IngestService{
producer: d.Producer,
limiter: d.Limiter,
dedup: d.Dedup,
schema: d.Schema,
log: d.Log,
lateAfter: d.LateAfter,
producer: d.Producer,
limiter: d.Limiter,
dedup: d.Dedup,
schema: d.Schema,
log: d.Log,
lateAfter: d.LateAfter,
rateLimitRPS: rps,
}
}
@@ -67,14 +74,17 @@ type IngestContext struct {
func (s *IngestService) Ingest(ctx context.Context, ictx IngestContext, raw *model.RawEvent) error {
now := time.Now().UTC()
// 3. rate limit per workspace
dec, err := s.limiter.Allow(ctx, ictx.WorkspaceID, defaultTierLimit, time.Second)
if err != nil {
return apperr.Internal(err)
}
if !dec.Allowed {
retry := (dec.RetryAfterMS / 1000) + 1
return apperr.TooManyRequests(retry)
// 3. rate limit per workspace (skip when RateLimitRPS == 0 -- intended
// for load testing only; do not run unbounded in production).
if s.rateLimitRPS > 0 {
dec, err := s.limiter.Allow(ctx, ictx.WorkspaceID, s.rateLimitRPS, time.Second)
if err != nil {
return apperr.Internal(err)
}
if !dec.Allowed {
retry := (dec.RetryAfterMS / 1000) + 1
return apperr.TooManyRequests(retry)
}
}
// 4-5. timestamps + late-event check
@@ -163,8 +173,6 @@ func (s *IngestService) IngestBatch(ctx context.Context, ictx IngestContext, bat
// helpers
// ---------------------------------------------------------------------------
const defaultTierLimit = 100 // rps; per-tier override comes from workspace.tier later
func derefTime(p *time.Time, fallback time.Time) time.Time {
if p == nil || p.IsZero() {
return fallback