Files
cdp/ingestion/ingest/internal/kafka/producer.go
2026-05-25 10:16:31 +07:00

118 lines
3.5 KiB
Go

// Package kafka wraps franz-go for the ingest producer.
//
// Design notes:
// - We use ProduceSync only for DLQ writes (rare; correctness > latency).
// - Happy-path Produce is fire-and-forget: we return 200 OK before the
// ack lands. franz-go buffers internally and retries.
// - Partition key = anonymous_id for the happy topic so that all events
// for a single visitor land on the same partition (ordering for stitching).
package kafka
import (
"context"
"encoding/json"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"github.com/dbiz/cdp/ingestion/ingest/internal/model"
)
type Producer struct {
client *kgo.Client
log *zap.Logger
topicIngest string
topicDLQ string
topicRetry string
}
func NewProducer(brokers []string, topicIngest, topicDLQ, topicRetry string, log *zap.Logger) (*Producer, error) {
cl, err := kgo.NewClient(
kgo.SeedBrokers(brokers...),
kgo.ProducerLinger(5_000_000), // 5ms linger -> batch small bursts
kgo.ProducerBatchCompression(kgo.ZstdCompression()),
kgo.MaxBufferedRecords(100_000),
// franz-go enables idempotent writes by default, which requires acks=all.
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.ClientID("cdp-ingest"),
)
if err != nil {
return nil, fmt.Errorf("kafka client: %w", err)
}
if err := cl.Ping(context.Background()); err != nil {
cl.Close()
return nil, fmt.Errorf("kafka ping: %w", err)
}
return &Producer{
client: cl,
log: log,
topicIngest: topicIngest,
topicDLQ: topicDLQ,
topicRetry: topicRetry,
}, nil
}
func (p *Producer) Close() {
p.client.Close()
}
// Produce sends an event to the happy-path topic. Fire-and-forget.
//
// We detach the request's cancellation from the produce call: the HTTP
// handler returns 200 as soon as the record is buffered, after which the
// request context is cancelled. franz-go honours that cancellation and
// drops the buffered record. context.WithoutCancel preserves values for
// tracing but removes the deadline / Done signal.
func (p *Producer) Produce(ctx context.Context, ev *model.IngestedEvent) error {
payload, err := json.Marshal(ev)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
rec := &kgo.Record{
Topic: p.topicIngest,
Key: []byte(ev.PartitionKey()),
Value: payload,
Headers: []kgo.RecordHeader{
{Key: "workspace_id", Value: []byte(ev.WorkspaceID)},
{Key: "source_id", Value: []byte(ev.SourceID)},
{Key: "type", Value: []byte(ev.Type)},
},
}
p.client.Produce(context.WithoutCancel(ctx), rec, func(r *kgo.Record, err error) {
if err != nil {
p.log.Error("kafka produce failed",
zap.String("topic", r.Topic),
zap.String("message_id", ev.MessageID),
zap.Error(err))
}
})
return nil
}
// ProduceDLQ writes a failed event to the DLQ topic synchronously so we know
// it landed before responding to the user with the error.
func (p *Producer) ProduceDLQ(ctx context.Context, workspaceID, sourceID, messageID, reason, field string, raw []byte) error {
envelope := map[string]any{
"workspace_id": workspaceID,
"source_id": sourceID,
"message_id": messageID,
"reason": reason,
"field": field,
"raw_payload": string(raw),
}
payload, _ := json.Marshal(envelope)
rec := &kgo.Record{
Topic: p.topicDLQ,
Key: []byte(workspaceID),
Value: payload,
Headers: []kgo.RecordHeader{
{Key: "reason", Value: []byte(reason)},
},
}
if err := p.client.ProduceSync(ctx, rec).FirstErr(); err != nil {
return fmt.Errorf("dlq produce: %w", err)
}
return nil
}