131 lines
3.3 KiB
Go
131 lines
3.3 KiB
Go
// Package consumer reads from the ingest Kafka topics and feeds the batcher.
|
|
//
|
|
// We use franz-go's manual commit mode: commit only after a successful
|
|
// batcher flush. Combined with at-least-once semantics from the producer
|
|
// and idempotent inserts at the analytics layer this is sufficient.
|
|
package consumer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/dbiz/cdp/ingestion/bulker/internal/batcher"
|
|
"github.com/dbiz/cdp/ingestion/bulker/internal/model"
|
|
"github.com/dbiz/cdp/ingestion/bulker/internal/writer"
|
|
)
|
|
|
|
type Consumer struct {
|
|
client *kgo.Client
|
|
log *zap.Logger
|
|
batcher *batcher.Batcher
|
|
writer *writer.ClickHouse
|
|
dlqTopic string
|
|
ingestTopic string
|
|
}
|
|
|
|
type Config struct {
|
|
Brokers []string
|
|
Group string
|
|
IngestTopic string
|
|
DLQTopic string
|
|
}
|
|
|
|
func New(c Config, b *batcher.Batcher, w *writer.ClickHouse, log *zap.Logger) (*Consumer, error) {
|
|
cl, err := kgo.NewClient(
|
|
kgo.SeedBrokers(c.Brokers...),
|
|
kgo.ConsumerGroup(c.Group),
|
|
kgo.ConsumeTopics(c.IngestTopic, c.DLQTopic),
|
|
kgo.DisableAutoCommit(),
|
|
kgo.ClientID("cdp-bulker"),
|
|
kgo.SessionTimeout(45_000_000_000), // 45s
|
|
kgo.FetchMaxBytes(50 * 1024 * 1024),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka client: %w", err)
|
|
}
|
|
return &Consumer{
|
|
client: cl,
|
|
log: log,
|
|
batcher: b,
|
|
writer: w,
|
|
dlqTopic: c.DLQTopic,
|
|
ingestTopic: c.IngestTopic,
|
|
}, nil
|
|
}
|
|
|
|
func (c *Consumer) Close() {
|
|
c.client.Close()
|
|
}
|
|
|
|
// Run polls Kafka until ctx is cancelled. One iteration:
|
|
// 1. PollFetches
|
|
// 2. For each record, parse JSON and route to the right destination
|
|
// 3. Commit offsets only after the batch flush succeeded
|
|
func (c *Consumer) Run(ctx context.Context) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
c.log.Info("consumer stopping")
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
fetches := c.client.PollFetches(ctx)
|
|
if errs := fetches.Errors(); len(errs) > 0 {
|
|
for _, e := range errs {
|
|
c.log.Warn("fetch error",
|
|
zap.String("topic", e.Topic),
|
|
zap.Int32("partition", e.Partition),
|
|
zap.Error(e.Err))
|
|
}
|
|
}
|
|
|
|
var dlqBatch []*model.DLQRecord
|
|
fetches.EachRecord(func(r *kgo.Record) {
|
|
switch r.Topic {
|
|
case c.ingestTopic:
|
|
var ev model.IngestedEvent
|
|
if err := json.Unmarshal(r.Value, &ev); err != nil {
|
|
c.log.Warn("ingest decode failed",
|
|
zap.String("topic", r.Topic),
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
if err := c.batcher.Add(ctx, &ev); err != nil {
|
|
c.log.Error("batcher add failed", zap.Error(err))
|
|
}
|
|
case c.dlqTopic:
|
|
var d model.DLQRecord
|
|
if err := json.Unmarshal(r.Value, &d); err != nil {
|
|
return
|
|
}
|
|
if d.ReceivedAt.IsZero() {
|
|
d.ReceivedAt = r.Timestamp
|
|
}
|
|
dlqBatch = append(dlqBatch, &d)
|
|
}
|
|
})
|
|
|
|
if len(dlqBatch) > 0 {
|
|
if err := c.writer.WriteDLQ(ctx, dlqBatch); err != nil {
|
|
c.log.Error("dlq write failed", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// Force a flush before committing so committed offsets reflect what's
|
|
// actually persisted. The batcher is idempotent for empty buffers.
|
|
if err := c.batcher.FlushNow(ctx); err != nil {
|
|
c.log.Warn("flush before commit failed", zap.Error(err))
|
|
continue // do not commit -- retry on next poll
|
|
}
|
|
|
|
if err := c.client.CommitUncommittedOffsets(ctx); err != nil {
|
|
c.log.Warn("commit failed", zap.Error(err))
|
|
}
|
|
}
|
|
}
|