package service import ( "context" "encoding/json" "time" "go.uber.org/zap" "github.com/dbiz/cdp/ingestion/ingest/internal/apperr" "github.com/dbiz/cdp/ingestion/ingest/internal/dedup" "github.com/dbiz/cdp/ingestion/ingest/internal/model" "github.com/dbiz/cdp/ingestion/ingest/internal/ratelimit" "github.com/dbiz/cdp/ingestion/ingest/internal/repo" "github.com/dbiz/cdp/ingestion/ingest/internal/schema" ) // Producer is the small surface IngestService needs from the Kafka client. // Defined here so it can be stubbed in tests without pulling in franz-go. type Producer interface { Produce(ctx context.Context, ev *model.IngestedEvent) error ProduceDLQ(ctx context.Context, workspaceID, sourceID, messageID, reason, field string, raw []byte) error } // IngestService is the core pipeline: validate → ratelimit → timestamp normalize // → late-check → dedup → flatten → schema-conflict → push Kafka. type IngestService struct { producer Producer limiter ratelimit.Limiter dedup dedup.Dedup schema repo.SchemaRepo log *zap.Logger lateAfter time.Duration rateLimitRPS int // 0 = unlimited } // IngestDeps groups dependencies for cleaner construction. type IngestDeps struct { Producer Producer Limiter ratelimit.Limiter Dedup dedup.Dedup Schema repo.SchemaRepo Log *zap.Logger LateAfter time.Duration RateLimitRPS int } func NewIngestService(d IngestDeps) *IngestService { rps := d.RateLimitRPS if rps < 0 { rps = 0 } return &IngestService{ producer: d.Producer, limiter: d.Limiter, dedup: d.Dedup, schema: d.Schema, log: d.Log, lateAfter: d.LateAfter, rateLimitRPS: rps, } } // IngestContext carries per-request data set by middleware. type IngestContext struct { WorkspaceID string SourceID string IP string UserAgent string RawBody []byte // original body, used for DLQ payload } // Ingest runs the full pipeline for a single event. func (s *IngestService) Ingest(ctx context.Context, ictx IngestContext, raw *model.RawEvent) error { now := time.Now().UTC() // 3. rate limit per workspace (skip when RateLimitRPS == 0 -- intended // for load testing only; do not run unbounded in production). if s.rateLimitRPS > 0 { dec, err := s.limiter.Allow(ctx, ictx.WorkspaceID, s.rateLimitRPS, time.Second) if err != nil { return apperr.Internal(err) } if !dec.Allowed { retry := (dec.RetryAfterMS / 1000) + 1 return apperr.TooManyRequests(retry) } } // 4-5. timestamps + late-event check sentAt := derefTime(raw.SentAt, now) if now.Sub(sentAt) > s.lateAfter { return apperr.UnprocessableEntity("event too old (>24h)") } timestamp := derefTime(raw.Timestamp, sentAt) // 6. dedup if raw.MessageID == "" { return apperr.BadRequest("messageId required", "messageId", nil) } fresh, err := s.dedup.CheckAndSet(ctx, ictx.WorkspaceID, raw.MessageID) if err != nil { return apperr.Internal(err) } if !fresh { // silently drop -- duplicate message return nil } // 7. flatten properties / traits / context props, err := decodeAndFlatten(raw.Properties) if err != nil { _ = s.toDLQ(ctx, ictx, raw, "properties_invalid_json", "properties") return apperr.BadRequest("properties is not valid JSON object", "properties", err) } traits, err := decodeAndFlatten(raw.Traits) if err != nil { _ = s.toDLQ(ctx, ictx, raw, "traits_invalid_json", "traits") return apperr.BadRequest("traits is not valid JSON object", "traits", err) } contextMap, err := decodeAndFlatten(raw.Context) if err != nil { // context is best-effort: keep going without it contextMap = nil } // 8. schema validation -- type conflict detection (best-effort, async upsert) if err := s.checkSchema(ctx, ictx.WorkspaceID, string(raw.Type), props); err != nil { _ = s.toDLQ(ctx, ictx, raw, "schema_conflict", "") return err } ev := &model.IngestedEvent{ WorkspaceID: ictx.WorkspaceID, SourceID: ictx.SourceID, MessageID: raw.MessageID, Type: raw.Type, AnonymousID: raw.AnonymousID, UserID: raw.UserID, GroupID: raw.GroupID, Event: raw.Event, Name: raw.Name, Category: raw.Category, Properties: props, Traits: traits, Context: contextMap, IP: ictx.IP, UserAgent: ictx.UserAgent, Timestamp: timestamp, SentAt: sentAt, ReceivedAt: now, } // 9. push Kafka -- fire-and-forget if err := s.producer.Produce(ctx, ev); err != nil { return apperr.Internal(err) } return nil } // IngestBatch processes a batch envelope; each failure is recorded but the // good events still ship. Returns the first error so the handler can pick a // status; in practice batch endpoints return 200 with per-event status. func (s *IngestService) IngestBatch(ctx context.Context, ictx IngestContext, batch []model.RawEvent) []error { errs := make([]error, len(batch)) for i := range batch { errs[i] = s.Ingest(ctx, ictx, &batch[i]) } return errs } // --------------------------------------------------------------------------- // helpers // --------------------------------------------------------------------------- func derefTime(p *time.Time, fallback time.Time) time.Time { if p == nil || p.IsZero() { return fallback } return p.UTC() } func decodeAndFlatten(raw json.RawMessage) (map[string]any, error) { if len(raw) == 0 { return nil, nil } var m map[string]any if err := json.Unmarshal(raw, &m); err != nil { return nil, err } if m == nil { return nil, nil } return schema.Flatten(m), nil } // checkSchema looks up the recorded type per (workspace, event_type, field) // and rejects with 400 on conflict. New fields are recorded asynchronously -- // we do not block the request waiting on the DB write. func (s *IngestService) checkSchema(ctx context.Context, workspaceID, eventType string, props map[string]any) error { for field, v := range props { dt := string(schema.Classify(v)) if dt == string(schema.TypeNull) { continue } existing, err := s.schema.GetType(ctx, workspaceID, eventType, field) if err != nil { // soft-fail: don't block ingest on schema DB errors s.log.Warn("schema lookup failed", zap.String("field", field), zap.Error(err)) continue } if existing == "" { // fire-and-forget upsert go func(f, t string) { if err := s.schema.UpsertField(context.Background(), workspaceID, eventType, f, t); err != nil { s.log.Warn("schema upsert failed", zap.String("field", f), zap.Error(err)) } }(field, dt) continue } if existing != dt { return apperr.BadRequest("schema type conflict", field, nil) } } return nil } func (s *IngestService) toDLQ(ctx context.Context, ictx IngestContext, raw *model.RawEvent, reason, field string) error { return s.producer.ProduceDLQ(ctx, ictx.WorkspaceID, ictx.SourceID, raw.MessageID, reason, field, ictx.RawBody) }