// Package consumer reads from the ingest Kafka topics and feeds the batcher. // // We use franz-go's manual commit mode: commit only after a successful // batcher flush. Combined with at-least-once semantics from the producer // and idempotent inserts at the analytics layer this is sufficient. package consumer import ( "context" "encoding/json" "fmt" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "github.com/dbiz/cdp/ingestion/bulker/internal/batcher" "github.com/dbiz/cdp/ingestion/bulker/internal/model" "github.com/dbiz/cdp/ingestion/bulker/internal/writer" ) type Consumer struct { client *kgo.Client log *zap.Logger batcher *batcher.Batcher writer *writer.ClickHouse dlqTopic string ingestTopic string } type Config struct { Brokers []string Group string IngestTopic string DLQTopic string } func New(c Config, b *batcher.Batcher, w *writer.ClickHouse, log *zap.Logger) (*Consumer, error) { cl, err := kgo.NewClient( kgo.SeedBrokers(c.Brokers...), kgo.ConsumerGroup(c.Group), kgo.ConsumeTopics(c.IngestTopic, c.DLQTopic), kgo.DisableAutoCommit(), kgo.ClientID("cdp-bulker"), kgo.SessionTimeout(45_000_000_000), // 45s kgo.FetchMaxBytes(50 * 1024 * 1024), ) if err != nil { return nil, fmt.Errorf("kafka client: %w", err) } return &Consumer{ client: cl, log: log, batcher: b, writer: w, dlqTopic: c.DLQTopic, ingestTopic: c.IngestTopic, }, nil } func (c *Consumer) Close() { c.client.Close() } // Run polls Kafka until ctx is cancelled. One iteration: // 1. PollFetches // 2. For each record, parse JSON and route to the right destination // 3. Commit offsets only after the batch flush succeeded func (c *Consumer) Run(ctx context.Context) error { for { select { case <-ctx.Done(): c.log.Info("consumer stopping") return nil default: } fetches := c.client.PollFetches(ctx) if errs := fetches.Errors(); len(errs) > 0 { for _, e := range errs { c.log.Warn("fetch error", zap.String("topic", e.Topic), zap.Int32("partition", e.Partition), zap.Error(e.Err)) } } var dlqBatch []*model.DLQRecord fetches.EachRecord(func(r *kgo.Record) { switch r.Topic { case c.ingestTopic: var ev model.IngestedEvent if err := json.Unmarshal(r.Value, &ev); err != nil { c.log.Warn("ingest decode failed", zap.String("topic", r.Topic), zap.Error(err)) return } if err := c.batcher.Add(ctx, &ev); err != nil { c.log.Error("batcher add failed", zap.Error(err)) } case c.dlqTopic: var d model.DLQRecord if err := json.Unmarshal(r.Value, &d); err != nil { return } if d.ReceivedAt.IsZero() { d.ReceivedAt = r.Timestamp } dlqBatch = append(dlqBatch, &d) } }) if len(dlqBatch) > 0 { if err := c.writer.WriteDLQ(ctx, dlqBatch); err != nil { c.log.Error("dlq write failed", zap.Error(err)) } } // Force a flush before committing so committed offsets reflect what's // actually persisted. The batcher is idempotent for empty buffers. if err := c.batcher.FlushNow(ctx); err != nil { c.log.Warn("flush before commit failed", zap.Error(err)) continue // do not commit -- retry on next poll } if err := c.client.CommitUncommittedOffsets(ctx); err != nil { c.log.Warn("commit failed", zap.Error(err)) } } }