232 lines
6.8 KiB
Go
232 lines
6.8 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/apperr"
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/dedup"
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/model"
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/ratelimit"
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/repo"
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/schema"
|
|
)
|
|
|
|
// Producer is the small surface IngestService needs from the Kafka client.
|
|
// Defined here so it can be stubbed in tests without pulling in franz-go.
|
|
type Producer interface {
|
|
Produce(ctx context.Context, ev *model.IngestedEvent) error
|
|
ProduceDLQ(ctx context.Context, workspaceID, sourceID, messageID, reason, field string, raw []byte) error
|
|
}
|
|
|
|
// IngestService is the core pipeline: validate → ratelimit → timestamp normalize
|
|
// → late-check → dedup → flatten → schema-conflict → push Kafka.
|
|
type IngestService struct {
|
|
producer Producer
|
|
limiter ratelimit.Limiter
|
|
dedup dedup.Dedup
|
|
schema repo.SchemaRepo
|
|
log *zap.Logger
|
|
lateAfter time.Duration
|
|
rateLimitRPS int // 0 = unlimited
|
|
}
|
|
|
|
// IngestDeps groups dependencies for cleaner construction.
|
|
type IngestDeps struct {
|
|
Producer Producer
|
|
Limiter ratelimit.Limiter
|
|
Dedup dedup.Dedup
|
|
Schema repo.SchemaRepo
|
|
Log *zap.Logger
|
|
LateAfter time.Duration
|
|
RateLimitRPS int
|
|
}
|
|
|
|
func NewIngestService(d IngestDeps) *IngestService {
|
|
rps := d.RateLimitRPS
|
|
if rps < 0 {
|
|
rps = 0
|
|
}
|
|
return &IngestService{
|
|
producer: d.Producer,
|
|
limiter: d.Limiter,
|
|
dedup: d.Dedup,
|
|
schema: d.Schema,
|
|
log: d.Log,
|
|
lateAfter: d.LateAfter,
|
|
rateLimitRPS: rps,
|
|
}
|
|
}
|
|
|
|
// IngestContext carries per-request data set by middleware.
|
|
type IngestContext struct {
|
|
WorkspaceID string
|
|
SourceID string
|
|
IP string
|
|
UserAgent string
|
|
RawBody []byte // original body, used for DLQ payload
|
|
}
|
|
|
|
// Ingest runs the full pipeline for a single event.
|
|
func (s *IngestService) Ingest(ctx context.Context, ictx IngestContext, raw *model.RawEvent) error {
|
|
now := time.Now().UTC()
|
|
|
|
// 3. rate limit per workspace (skip when RateLimitRPS == 0 -- intended
|
|
// for load testing only; do not run unbounded in production).
|
|
if s.rateLimitRPS > 0 {
|
|
dec, err := s.limiter.Allow(ctx, ictx.WorkspaceID, s.rateLimitRPS, time.Second)
|
|
if err != nil {
|
|
return apperr.Internal(err)
|
|
}
|
|
if !dec.Allowed {
|
|
retry := (dec.RetryAfterMS / 1000) + 1
|
|
return apperr.TooManyRequests(retry)
|
|
}
|
|
}
|
|
|
|
// 4-5. timestamps + late-event check
|
|
sentAt := derefTime(raw.SentAt, now)
|
|
if now.Sub(sentAt) > s.lateAfter {
|
|
return apperr.UnprocessableEntity("event too old (>24h)")
|
|
}
|
|
timestamp := derefTime(raw.Timestamp, sentAt)
|
|
|
|
// 6. dedup
|
|
if raw.MessageID == "" {
|
|
return apperr.BadRequest("messageId required", "messageId", nil)
|
|
}
|
|
fresh, err := s.dedup.CheckAndSet(ctx, ictx.WorkspaceID, raw.MessageID)
|
|
if err != nil {
|
|
return apperr.Internal(err)
|
|
}
|
|
if !fresh {
|
|
// silently drop -- duplicate message
|
|
return nil
|
|
}
|
|
|
|
// 7. flatten properties / traits / context
|
|
props, err := decodeAndFlatten(raw.Properties)
|
|
if err != nil {
|
|
_ = s.toDLQ(ctx, ictx, raw, "properties_invalid_json", "properties")
|
|
return apperr.BadRequest("properties is not valid JSON object", "properties", err)
|
|
}
|
|
traits, err := decodeAndFlatten(raw.Traits)
|
|
if err != nil {
|
|
_ = s.toDLQ(ctx, ictx, raw, "traits_invalid_json", "traits")
|
|
return apperr.BadRequest("traits is not valid JSON object", "traits", err)
|
|
}
|
|
contextMap, err := decodeAndFlatten(raw.Context)
|
|
if err != nil {
|
|
// context is best-effort: keep going without it
|
|
contextMap = nil
|
|
}
|
|
|
|
// 8. schema validation -- type conflict detection (best-effort, async upsert)
|
|
if err := s.checkSchema(ctx, ictx.WorkspaceID, string(raw.Type), props); err != nil {
|
|
_ = s.toDLQ(ctx, ictx, raw, "schema_conflict", "")
|
|
return err
|
|
}
|
|
|
|
ev := &model.IngestedEvent{
|
|
WorkspaceID: ictx.WorkspaceID,
|
|
SourceID: ictx.SourceID,
|
|
MessageID: raw.MessageID,
|
|
Type: raw.Type,
|
|
AnonymousID: raw.AnonymousID,
|
|
UserID: raw.UserID,
|
|
GroupID: raw.GroupID,
|
|
Event: raw.Event,
|
|
Name: raw.Name,
|
|
Category: raw.Category,
|
|
Properties: props,
|
|
Traits: traits,
|
|
Context: contextMap,
|
|
IP: ictx.IP,
|
|
UserAgent: ictx.UserAgent,
|
|
Timestamp: timestamp,
|
|
SentAt: sentAt,
|
|
ReceivedAt: now,
|
|
}
|
|
|
|
// 9. push Kafka -- fire-and-forget
|
|
if err := s.producer.Produce(ctx, ev); err != nil {
|
|
return apperr.Internal(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IngestBatch processes a batch envelope; each failure is recorded but the
|
|
// good events still ship. Returns the first error so the handler can pick a
|
|
// status; in practice batch endpoints return 200 with per-event status.
|
|
func (s *IngestService) IngestBatch(ctx context.Context, ictx IngestContext, batch []model.RawEvent) []error {
|
|
errs := make([]error, len(batch))
|
|
for i := range batch {
|
|
errs[i] = s.Ingest(ctx, ictx, &batch[i])
|
|
}
|
|
return errs
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func derefTime(p *time.Time, fallback time.Time) time.Time {
|
|
if p == nil || p.IsZero() {
|
|
return fallback
|
|
}
|
|
return p.UTC()
|
|
}
|
|
|
|
func decodeAndFlatten(raw json.RawMessage) (map[string]any, error) {
|
|
if len(raw) == 0 {
|
|
return nil, nil
|
|
}
|
|
var m map[string]any
|
|
if err := json.Unmarshal(raw, &m); err != nil {
|
|
return nil, err
|
|
}
|
|
if m == nil {
|
|
return nil, nil
|
|
}
|
|
return schema.Flatten(m), nil
|
|
}
|
|
|
|
// checkSchema looks up the recorded type per (workspace, event_type, field)
|
|
// and rejects with 400 on conflict. New fields are recorded asynchronously --
|
|
// we do not block the request waiting on the DB write.
|
|
func (s *IngestService) checkSchema(ctx context.Context, workspaceID, eventType string, props map[string]any) error {
|
|
for field, v := range props {
|
|
dt := string(schema.Classify(v))
|
|
if dt == string(schema.TypeNull) {
|
|
continue
|
|
}
|
|
existing, err := s.schema.GetType(ctx, workspaceID, eventType, field)
|
|
if err != nil {
|
|
// soft-fail: don't block ingest on schema DB errors
|
|
s.log.Warn("schema lookup failed", zap.String("field", field), zap.Error(err))
|
|
continue
|
|
}
|
|
if existing == "" {
|
|
// fire-and-forget upsert
|
|
go func(f, t string) {
|
|
if err := s.schema.UpsertField(context.Background(), workspaceID, eventType, f, t); err != nil {
|
|
s.log.Warn("schema upsert failed", zap.String("field", f), zap.Error(err))
|
|
}
|
|
}(field, dt)
|
|
continue
|
|
}
|
|
if existing != dt {
|
|
return apperr.BadRequest("schema type conflict", field, nil)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *IngestService) toDLQ(ctx context.Context, ictx IngestContext, raw *model.RawEvent, reason, field string) error {
|
|
return s.producer.ProduceDLQ(ctx,
|
|
ictx.WorkspaceID, ictx.SourceID, raw.MessageID, reason, field, ictx.RawBody)
|
|
}
|