init ingestion

This commit is contained in:
2026-05-24 22:59:24 +07:00
commit 4e8c11d545
80 changed files with 5639 additions and 0 deletions

View File

@@ -0,0 +1,107 @@
// Package batcher accumulates events from the consumer until either the
// size cap or the time cap is hit, then flushes them to the writer.
//
// Flush semantics:
// - on size cap: flush immediately
// - on time cap: flush whatever is buffered (even 0 events: no-op)
// - on shutdown: flush whatever is buffered, then return
package batcher
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/dbiz/cdp/ingestion/bulker/internal/model"
)
type FlushFunc func(ctx context.Context, events []*model.IngestedEvent) error
type Batcher struct {
size int
interval time.Duration
flush FlushFunc
log *zap.Logger
mu sync.Mutex
buffer []*model.IngestedEvent
}
func New(size int, interval time.Duration, flush FlushFunc, log *zap.Logger) *Batcher {
return &Batcher{
size: size,
interval: interval,
flush: flush,
log: log,
buffer: make([]*model.IngestedEvent, 0, size),
}
}
// Add appends an event. If the size cap is reached we flush synchronously
// before returning so the consumer commit can rely on durability.
func (b *Batcher) Add(ctx context.Context, e *model.IngestedEvent) error {
b.mu.Lock()
b.buffer = append(b.buffer, e)
if len(b.buffer) < b.size {
b.mu.Unlock()
return nil
}
batch := b.swap()
b.mu.Unlock()
return b.doFlush(ctx, batch)
}
// Run blocks until ctx is cancelled, flushing the buffer every interval.
func (b *Batcher) Run(ctx context.Context) {
t := time.NewTicker(b.interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
b.FlushNow(context.Background())
return
case <-t.C:
if err := b.FlushNow(ctx); err != nil {
b.log.Warn("batch flush failed", zap.Error(err))
}
}
}
}
// FlushNow swaps the buffer and flushes synchronously.
func (b *Batcher) FlushNow(ctx context.Context) error {
b.mu.Lock()
batch := b.swap()
b.mu.Unlock()
return b.doFlush(ctx, batch)
}
// swap returns the current buffer and replaces it with a fresh slice.
// Caller must hold b.mu.
func (b *Batcher) swap() []*model.IngestedEvent {
if len(b.buffer) == 0 {
return nil
}
out := b.buffer
b.buffer = make([]*model.IngestedEvent, 0, b.size)
return out
}
func (b *Batcher) doFlush(ctx context.Context, batch []*model.IngestedEvent) error {
if len(batch) == 0 {
return nil
}
start := time.Now()
if err := b.flush(ctx, batch); err != nil {
b.log.Error("flush failed",
zap.Int("count", len(batch)),
zap.Error(err))
return err
}
b.log.Info("flushed",
zap.Int("count", len(batch)),
zap.Int64("duration_ms", time.Since(start).Milliseconds()))
return nil
}

View File

@@ -0,0 +1,56 @@
package batcher
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/dbiz/cdp/ingestion/bulker/internal/model"
)
func TestBatcher_FlushesOnSizeCap(t *testing.T) {
var flushed int32
flush := func(_ context.Context, evs []*model.IngestedEvent) error {
atomic.AddInt32(&flushed, int32(len(evs)))
return nil
}
b := New(3, time.Hour, flush, zap.NewNop())
for i := 0; i < 3; i++ {
_ = b.Add(context.Background(), &model.IngestedEvent{MessageID: "x"})
}
assert.Equal(t, int32(3), atomic.LoadInt32(&flushed))
}
func TestBatcher_FlushNow_NoOpOnEmpty(t *testing.T) {
var called int32
flush := func(_ context.Context, _ []*model.IngestedEvent) error {
atomic.AddInt32(&called, 1)
return nil
}
b := New(10, time.Hour, flush, zap.NewNop())
_ = b.FlushNow(context.Background())
assert.Equal(t, int32(0), atomic.LoadInt32(&called))
}
func TestBatcher_FlushesOnTimer(t *testing.T) {
var flushed int32
flush := func(_ context.Context, evs []*model.IngestedEvent) error {
atomic.AddInt32(&flushed, int32(len(evs)))
return nil
}
b := New(1000, 50*time.Millisecond, flush, zap.NewNop())
ctx, cancel := context.WithCancel(context.Background())
go b.Run(ctx)
_ = b.Add(context.Background(), &model.IngestedEvent{MessageID: "a"})
_ = b.Add(context.Background(), &model.IngestedEvent{MessageID: "b"})
time.Sleep(120 * time.Millisecond)
cancel()
time.Sleep(10 * time.Millisecond)
assert.Equal(t, int32(2), atomic.LoadInt32(&flushed))
}

View File

@@ -0,0 +1,35 @@
// Package config loads bulker runtime config from env.
package config
import (
"fmt"
"time"
"github.com/caarlos0/env/v11"
)
type Config struct {
HTTPAddr string `env:"BULKER_HTTP_ADDR" envDefault:":3042"`
LogLevel string `env:"BULKER_LOG_LEVEL" envDefault:"info"`
KafkaGroup string `env:"BULKER_KAFKA_GROUP" envDefault:"bulker"`
BatchSize int `env:"BULKER_BATCH_SIZE" envDefault:"1000"`
BatchInterval time.Duration `env:"BULKER_BATCH_INTERVAL_SECONDS" envDefault:"5s"`
ShutdownTimeout time.Duration `env:"BULKER_SHUTDOWN_TIMEOUT_SECONDS" envDefault:"60s"`
KafkaBrokers []string `env:"KAFKA_BROKERS" envSeparator:"," envDefault:"localhost:9092"`
KafkaTopicIngest string `env:"KAFKA_TOPIC_INGEST" envDefault:"events.ingest"`
KafkaTopicDLQ string `env:"KAFKA_TOPIC_DLQ" envDefault:"events.dlq"`
ClickHouseAddr string `env:"CLICKHOUSE_ADDR" envDefault:"localhost:9000"`
ClickHouseDB string `env:"CLICKHOUSE_DB" envDefault:"cdp"`
ClickHouseUser string `env:"CLICKHOUSE_USER" envDefault:"default"`
ClickHousePassword string `env:"CLICKHOUSE_PASSWORD" envDefault:""`
}
func Load() (*Config, error) {
cfg := &Config{}
if err := env.Parse(cfg); err != nil {
return nil, fmt.Errorf("config load: %w", err)
}
return cfg, nil
}

View File

@@ -0,0 +1,130 @@
// Package consumer reads from the ingest Kafka topics and feeds the batcher.
//
// We use franz-go's manual commit mode: commit only after a successful
// batcher flush. Combined with at-least-once semantics from the producer
// and idempotent inserts at the analytics layer this is sufficient.
package consumer
import (
"context"
"encoding/json"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"github.com/dbiz/cdp/ingestion/bulker/internal/batcher"
"github.com/dbiz/cdp/ingestion/bulker/internal/model"
"github.com/dbiz/cdp/ingestion/bulker/internal/writer"
)
type Consumer struct {
client *kgo.Client
log *zap.Logger
batcher *batcher.Batcher
writer *writer.ClickHouse
dlqTopic string
ingestTopic string
}
type Config struct {
Brokers []string
Group string
IngestTopic string
DLQTopic string
}
func New(c Config, b *batcher.Batcher, w *writer.ClickHouse, log *zap.Logger) (*Consumer, error) {
cl, err := kgo.NewClient(
kgo.SeedBrokers(c.Brokers...),
kgo.ConsumerGroup(c.Group),
kgo.ConsumeTopics(c.IngestTopic, c.DLQTopic),
kgo.DisableAutoCommit(),
kgo.ClientID("cdp-bulker"),
kgo.SessionTimeout(45_000_000_000), // 45s
kgo.FetchMaxBytes(50 * 1024 * 1024),
)
if err != nil {
return nil, fmt.Errorf("kafka client: %w", err)
}
return &Consumer{
client: cl,
log: log,
batcher: b,
writer: w,
dlqTopic: c.DLQTopic,
ingestTopic: c.IngestTopic,
}, nil
}
func (c *Consumer) Close() {
c.client.Close()
}
// Run polls Kafka until ctx is cancelled. One iteration:
// 1. PollFetches
// 2. For each record, parse JSON and route to the right destination
// 3. Commit offsets only after the batch flush succeeded
func (c *Consumer) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
c.log.Info("consumer stopping")
return nil
default:
}
fetches := c.client.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
c.log.Warn("fetch error",
zap.String("topic", e.Topic),
zap.Int32("partition", e.Partition),
zap.Error(e.Err))
}
}
var dlqBatch []*model.DLQRecord
fetches.EachRecord(func(r *kgo.Record) {
switch r.Topic {
case c.ingestTopic:
var ev model.IngestedEvent
if err := json.Unmarshal(r.Value, &ev); err != nil {
c.log.Warn("ingest decode failed",
zap.String("topic", r.Topic),
zap.Error(err))
return
}
if err := c.batcher.Add(ctx, &ev); err != nil {
c.log.Error("batcher add failed", zap.Error(err))
}
case c.dlqTopic:
var d model.DLQRecord
if err := json.Unmarshal(r.Value, &d); err != nil {
return
}
if d.ReceivedAt.IsZero() {
d.ReceivedAt = r.Timestamp
}
dlqBatch = append(dlqBatch, &d)
}
})
if len(dlqBatch) > 0 {
if err := c.writer.WriteDLQ(ctx, dlqBatch); err != nil {
c.log.Error("dlq write failed", zap.Error(err))
}
}
// Force a flush before committing so committed offsets reflect what's
// actually persisted. The batcher is idempotent for empty buffers.
if err := c.batcher.FlushNow(ctx); err != nil {
c.log.Warn("flush before commit failed", zap.Error(err))
continue // do not commit -- retry on next poll
}
if err := c.client.CommitUncommittedOffsets(ctx); err != nil {
c.log.Warn("commit failed", zap.Error(err))
}
}
}

View File

@@ -0,0 +1,41 @@
package model
import "time"
// IngestedEvent mirrors the shape ingest publishes onto Kafka.
// Keep these two structs in lock-step (we are intentionally NOT importing
// ingest's package -- bulker compiles standalone).
type IngestedEvent struct {
WorkspaceID string `json:"workspace_id"`
SourceID string `json:"source_id"`
MessageID string `json:"message_id"`
Type string `json:"type"`
AnonymousID string `json:"anonymous_id,omitempty"`
UserID string `json:"user_id,omitempty"`
GroupID string `json:"group_id,omitempty"`
Event string `json:"event,omitempty"`
Name string `json:"name,omitempty"`
Category string `json:"category,omitempty"`
Properties map[string]any `json:"properties,omitempty"`
Traits map[string]any `json:"traits,omitempty"`
Context map[string]any `json:"context,omitempty"`
IP string `json:"ip,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Timestamp time.Time `json:"timestamp"`
SentAt time.Time `json:"sent_at"`
ReceivedAt time.Time `json:"received_at"`
}
// DLQRecord is the JSON shape the bulker reads from the DLQ topic.
type DLQRecord struct {
WorkspaceID string `json:"workspace_id"`
SourceID string `json:"source_id"`
MessageID string `json:"message_id"`
Reason string `json:"reason"`
Field string `json:"field"`
RawPayload string `json:"raw_payload"`
ReceivedAt time.Time `json:"received_at"`
}

View File

@@ -0,0 +1,250 @@
// Package writer wraps the ClickHouse client for batch inserts.
//
// We use the native clickhouse-go v2 client. One PrepareBatch / Append / Send
// cycle per (table, batch). All maps are stringified before insertion -- the
// ClickHouse schema uses Map(String, String) which keeps the table flat and
// avoids column explosion. Analytics queries cast on read.
package writer
import (
"context"
"encoding/json"
"fmt"
"strconv"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/dbiz/cdp/ingestion/bulker/internal/model"
)
type ClickHouse struct {
conn driver.Conn
db string
}
func New(ctx context.Context, addr, db, user, password string) (*ClickHouse, error) {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{addr},
Auth: clickhouse.Auth{
Database: db,
Username: user,
Password: password,
},
Settings: clickhouse.Settings{
"async_insert": 0,
"wait_for_async_insert": 0,
},
})
if err != nil {
return nil, fmt.Errorf("clickhouse open: %w", err)
}
if err := conn.Ping(ctx); err != nil {
return nil, fmt.Errorf("clickhouse ping: %w", err)
}
return &ClickHouse{conn: conn, db: db}, nil
}
func (c *ClickHouse) Close() error { return c.conn.Close() }
// WriteEvents fans out a mixed-type batch into the per-type tables.
// Returns the number of rows successfully inserted across all tables.
func (c *ClickHouse) WriteEvents(ctx context.Context, events []*model.IngestedEvent) (int, error) {
if len(events) == 0 {
return 0, nil
}
// Bucket by event type so each insert hits one table.
buckets := map[string][]*model.IngestedEvent{}
for _, e := range events {
buckets[e.Type] = append(buckets[e.Type], e)
}
total := 0
for t, evs := range buckets {
var err error
switch t {
case "track":
err = c.writeTrack(ctx, evs)
case "identify":
err = c.writeIdentify(ctx, evs)
case "page", "screen":
err = c.writePage(ctx, evs)
case "group":
err = c.writeGroup(ctx, evs)
default:
// alias / unknown types -- write to track for now
err = c.writeTrack(ctx, evs)
}
if err != nil {
return total, fmt.Errorf("write %s: %w", t, err)
}
total += len(evs)
}
return total, nil
}
// ---------------------------------------------------------------------------
// per-table batch inserts
// ---------------------------------------------------------------------------
func (c *ClickHouse) writeTrack(ctx context.Context, evs []*model.IngestedEvent) error {
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_track")
if err != nil {
return err
}
for _, e := range evs {
err := batch.Append(
e.WorkspaceID, e.SourceID, e.MessageID,
e.AnonymousID, e.UserID, e.Event,
e.Timestamp, e.SentAt, e.ReceivedAt,
mapToStr(e.Properties), mapToStr(e.Context),
e.IP, e.UserAgent,
libraryName(e.Context), libraryVersion(e.Context),
)
if err != nil {
return err
}
}
return batch.Send()
}
func (c *ClickHouse) writeIdentify(ctx context.Context, evs []*model.IngestedEvent) error {
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_identify")
if err != nil {
return err
}
for _, e := range evs {
err := batch.Append(
e.WorkspaceID, e.SourceID, e.MessageID,
e.AnonymousID, e.UserID,
e.Timestamp, e.SentAt, e.ReceivedAt,
mapToStr(e.Traits), mapToStr(e.Context),
e.IP, e.UserAgent,
)
if err != nil {
return err
}
}
return batch.Send()
}
func (c *ClickHouse) writePage(ctx context.Context, evs []*model.IngestedEvent) error {
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_page")
if err != nil {
return err
}
for _, e := range evs {
path, _ := e.Properties["path"].(string)
url, _ := e.Properties["url"].(string)
referrer, _ := e.Properties["referrer"].(string)
err := batch.Append(
e.WorkspaceID, e.SourceID, e.MessageID,
e.AnonymousID, e.UserID, e.Name, e.Category,
e.Timestamp, e.SentAt, e.ReceivedAt,
mapToStr(e.Properties), mapToStr(e.Context),
e.IP, e.UserAgent,
referrer, path, url,
)
if err != nil {
return err
}
}
return batch.Send()
}
func (c *ClickHouse) writeGroup(ctx context.Context, evs []*model.IngestedEvent) error {
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_group")
if err != nil {
return err
}
for _, e := range evs {
err := batch.Append(
e.WorkspaceID, e.SourceID, e.MessageID,
e.AnonymousID, e.UserID, e.GroupID,
e.Timestamp, e.SentAt, e.ReceivedAt,
mapToStr(e.Traits), mapToStr(e.Context),
e.IP, e.UserAgent,
)
if err != nil {
return err
}
}
return batch.Send()
}
// WriteDLQ inserts records from the DLQ topic.
func (c *ClickHouse) WriteDLQ(ctx context.Context, recs []*model.DLQRecord) error {
if len(recs) == 0 {
return nil
}
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_dlq")
if err != nil {
return err
}
for _, r := range recs {
if err := batch.Append(
r.WorkspaceID, r.SourceID, r.MessageID, r.ReceivedAt,
r.Reason, r.Field, r.RawPayload,
); err != nil {
return err
}
}
return batch.Send()
}
// ---------------------------------------------------------------------------
// helpers
// ---------------------------------------------------------------------------
// mapToStr converts a map[string]any into the Map(String, String) shape
// ClickHouse expects. Non-string values are JSON-encoded.
func mapToStr(in map[string]any) map[string]string {
if in == nil {
return map[string]string{}
}
out := make(map[string]string, len(in))
for k, v := range in {
out[k] = anyToStr(v)
}
return out
}
func anyToStr(v any) string {
switch x := v.(type) {
case nil:
return ""
case string:
return x
case float64:
return strconv.FormatFloat(x, 'f', -1, 64)
case int:
return strconv.Itoa(x)
case int64:
return strconv.FormatInt(x, 10)
case bool:
return strconv.FormatBool(x)
default:
b, _ := json.Marshal(v)
return string(b)
}
}
func libraryName(ctx map[string]any) string {
if ctx == nil {
return ""
}
if v, ok := ctx["library_name"].(string); ok {
return v
}
return ""
}
func libraryVersion(ctx map[string]any) string {
if ctx == nil {
return ""
}
if v, ok := ctx["library_version"].(string); ok {
return v
}
return ""
}