Files
2026-05-24 22:59:24 +07:00

131 lines
3.3 KiB
Go

// Package consumer reads from the ingest Kafka topics and feeds the batcher.
//
// We use franz-go's manual commit mode: commit only after a successful
// batcher flush. Combined with at-least-once semantics from the producer
// and idempotent inserts at the analytics layer this is sufficient.
package consumer
import (
"context"
"encoding/json"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"github.com/dbiz/cdp/ingestion/bulker/internal/batcher"
"github.com/dbiz/cdp/ingestion/bulker/internal/model"
"github.com/dbiz/cdp/ingestion/bulker/internal/writer"
)
type Consumer struct {
client *kgo.Client
log *zap.Logger
batcher *batcher.Batcher
writer *writer.ClickHouse
dlqTopic string
ingestTopic string
}
type Config struct {
Brokers []string
Group string
IngestTopic string
DLQTopic string
}
func New(c Config, b *batcher.Batcher, w *writer.ClickHouse, log *zap.Logger) (*Consumer, error) {
cl, err := kgo.NewClient(
kgo.SeedBrokers(c.Brokers...),
kgo.ConsumerGroup(c.Group),
kgo.ConsumeTopics(c.IngestTopic, c.DLQTopic),
kgo.DisableAutoCommit(),
kgo.ClientID("cdp-bulker"),
kgo.SessionTimeout(45_000_000_000), // 45s
kgo.FetchMaxBytes(50 * 1024 * 1024),
)
if err != nil {
return nil, fmt.Errorf("kafka client: %w", err)
}
return &Consumer{
client: cl,
log: log,
batcher: b,
writer: w,
dlqTopic: c.DLQTopic,
ingestTopic: c.IngestTopic,
}, nil
}
func (c *Consumer) Close() {
c.client.Close()
}
// Run polls Kafka until ctx is cancelled. One iteration:
// 1. PollFetches
// 2. For each record, parse JSON and route to the right destination
// 3. Commit offsets only after the batch flush succeeded
func (c *Consumer) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
c.log.Info("consumer stopping")
return nil
default:
}
fetches := c.client.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
c.log.Warn("fetch error",
zap.String("topic", e.Topic),
zap.Int32("partition", e.Partition),
zap.Error(e.Err))
}
}
var dlqBatch []*model.DLQRecord
fetches.EachRecord(func(r *kgo.Record) {
switch r.Topic {
case c.ingestTopic:
var ev model.IngestedEvent
if err := json.Unmarshal(r.Value, &ev); err != nil {
c.log.Warn("ingest decode failed",
zap.String("topic", r.Topic),
zap.Error(err))
return
}
if err := c.batcher.Add(ctx, &ev); err != nil {
c.log.Error("batcher add failed", zap.Error(err))
}
case c.dlqTopic:
var d model.DLQRecord
if err := json.Unmarshal(r.Value, &d); err != nil {
return
}
if d.ReceivedAt.IsZero() {
d.ReceivedAt = r.Timestamp
}
dlqBatch = append(dlqBatch, &d)
}
})
if len(dlqBatch) > 0 {
if err := c.writer.WriteDLQ(ctx, dlqBatch); err != nil {
c.log.Error("dlq write failed", zap.Error(err))
}
}
// Force a flush before committing so committed offsets reflect what's
// actually persisted. The batcher is idempotent for empty buffers.
if err := c.batcher.FlushNow(ctx); err != nil {
c.log.Warn("flush before commit failed", zap.Error(err))
continue // do not commit -- retry on next poll
}
if err := c.client.CommitUncommittedOffsets(ctx); err != nil {
c.log.Warn("commit failed", zap.Error(err))
}
}
}