10 KiB
CLAUDE.md — CDP Ingestion Service
You are a senior software engineer building the Data Ingestion Service for a self-hosted CDP platform, inspired by Jitsu. Focus: event streaming, JS functions, identity stitching.
Scope boundary: Ingestion only. Analytics & Customer 360 live in a separate service (
cdp-analytics).
What This Service Does
Collects events from any source → validates, deduplicates, transforms via JS Functions → stores in ClickHouse and exports to external warehouses. Segment-compatible API for easy migration.
Repository Layout
cdp-ingestion/
├── ingest/ # Go — HTTP API, auth, validate, dedup, push to Kafka (port 3049)
├── rotor/ # Node.js — JS functions runner, V8 isolate (port 3401)
├── bulker/ # Go — Kafka consumer, batch write to ClickHouse/warehouses (port 3042)
├── console/ # React + Vite + shadcn/ui + Tailwind — management UI (port 3000)
└── infra/
├── docker/
├── clickhouse/ # ClickHouse DDL / migrations
└── migrations/ # PostgreSQL migrations (golang-migrate)
Tech Stack
Go Services (ingest, bulker)
| Concern | Library | Notes |
|---|---|---|
| HTTP router | chi |
Lightweight, stdlib-compatible middleware |
| Logger | zap |
Structured logging, fastest |
| PostgreSQL | pgx/v5 |
Native driver, no database/sql wrapper |
| Kafka | franz-go |
Pure Go, no CGO, best Redpanda support |
| Redis | rueidis |
Modern client, faster than go-redis |
| Config | caarlos0/env |
Parse env vars into structs, zero deps |
| Validation | go-playground/validator/v10 |
Struct tags validation |
| Migration | golang-migrate + pgx driver |
CLI only — never auto-migrate on startup |
| Test assertion | testify |
assert + require + mock |
| Integration test | testcontainers-go |
Real PG / Redis / ClickHouse in tests |
React Console (console/)
| Concern | Library |
|---|---|
| Build | Vite |
| UI components | shadcn/ui + Tailwind |
| Routing | React Router v6 |
| Server state | TanStack Query |
| Client state | Zustand |
| Forms | react-hook-form + zod |
| Charts | Recharts |
| Icons | lucide-react |
No new technology without discussion. All additions must justify why existing stack cannot handle it.
Go Project Structure
Every Go service follows this layout:
ingest/
├── cmd/
│ └── server/
│ └── main.go # wire everything, start server
└── internal/
├── handler/ # HTTP handlers — parse request, call service, write response
├── service/ # business logic — no HTTP, no DB concerns
├── repo/ # DB queries — PostgreSQL via pgx, ClickHouse
├── kafka/ # producer (ingest) / consumer (bulker)
├── middleware/ # auth, rate limit, request ID, logging
└── config/ # env parsing via caarlos0/env
Rules:
handlerdepends onservice.servicedepends onrepo. Never reverse.handlernever touches DB directly.servicenever importschior any HTTP package.reporeturns domain types, never rawpgx.Rows.
Error Handling
Use AppError for all domain errors. Never return raw pgx or stdlib errors to handlers.
// internal/apperr/apperr.go
type AppError struct {
Code int // HTTP status code to return
Message string // user-facing message (safe to expose)
Field string // optional: which field caused the error (schema conflict, validation)
Err error // original error for logging (not exposed to user)
}
func (e *AppError) Error() string { return e.Message }
func (e *AppError) Unwrap() error { return e.Err }
// Constructors
func BadRequest(msg, field string, err error) *AppError
func Conflict(msg string, err error) *AppError
func TooManyRequests(retryAfter int) *AppError
func UnprocessableEntity(msg string) *AppError
func Internal(err error) *AppError
Handler pattern — one place to handle all errors:
func writeError(w http.ResponseWriter, err error) {
var appErr *apperr.AppError
if errors.As(err, &appErr) {
// log appErr.Err internally, return appErr.Message to user
render.JSON(w, appErr.Code, ErrorResponse{Error: appErr.Message, Field: appErr.Field})
return
}
// unexpected — log full error, return generic 500
render.JSON(w, 500, ErrorResponse{Error: "internal server error"})
}
Testing Strategy
Unit tests — handler + service layer
- Mock interfaces with
testify/mock - No real DB, no real Redis, no real Kafka
- File:
foo_test.goalongside the file being tested
type EventServiceMock struct { mock.Mock }
func (m *EventServiceMock) Track(ctx context.Context, e *Event) error {
return m.Called(ctx, e).Error(0)
}
Integration tests — repo layer only
- Use
testcontainers-goto spin up real PostgreSQL, Redis, ClickHouse - File:
internal/repo/event_repo_test.go - Tag:
//go:build integration - Run:
make test/integration
make test # unit only (fast, no containers)
make test/integration # repo layer with real DBs (slower, CI)
Migration Workflow
# Create new migration
make migrate/new name=add_segment_memberships
# Apply
make migrate/up
# Rollback one step
make migrate/down
# Check status
make migrate/status
- Migration files live in
infra/migrations/ - Format:
{version}_{name}.up.sql+{version}_{name}.down.sql - Never auto-run migrations on server startup
- Every PostgreSQL schema change must have a migration file — no exceptions
Ingest Pipeline (Step-by-Step)
HTTP Request
1. Auth — Write Key → PostgreSQL lookup, cached in Redis (TTL 30–60s + pub/sub invalidation)
2. Payload validate — size ≤ PAYLOAD_LIMIT_KB (default 100KB), struct + validator tags
3. Rate limit — Redis sliding window per workspace_id; 429 + Retry-After on breach
4. Timestamp — received_at = server time; client time preserved as sent_at
5. Late event check — (received_at − sent_at) > 24h → 422 drop
6. Deduplication — Redis SETNX message_id, TTL 24h
7. JSON flatten — {"a":{"b":1}} → {"a_b":1}
8. Schema validate — type conflict → 400 + field name → push to DLQ
9. Push Kafka — partition key = anonymous_id (ordering for identity stitching)
10. Return 200 OK — fire-and-forget, do not wait for Kafka ack
Kafka Topics
| Topic | Purpose |
|---|---|
events.ingest |
Happy path — valid events |
events.dlq |
Failed events — schema conflict, validation error, function crash |
events.retry |
Replay from DLQ after fix |
Key Design Decisions
| Problem | Decision |
|---|---|
| Late events (> 24h) | Drop, return 422 Unprocessable |
| Schema conflict | Reject 400, include field name in response, push to DLQ |
| Timestamp authority | Server wins (received_at); client time kept as sent_at |
| Payload limit | Configurable, default 100KB; batch has separate limit |
| Partition key | anonymous_id — guarantees ordering for identity stitching |
| Enrich mode | Async by default — store raw event first, worker enriches after |
| Identity backfill | Async + lock per anonymous_id to avoid race condition |
| Write Key cache | Redis TTL 30–60s + pub/sub invalidation on revoke |
| Graceful shutdown | Drain in-flight requests on SIGTERM before exit |
| Migration | CLI only — never auto-migrate on startup |
Rate Limits
| Tier | RPS | Events/day | Burst (5s) |
|---|---|---|---|
| Default | 100 | 1M | 500 |
| Pro | 500 | 10M | 2,500 |
| Enterprise | custom | custom | custom |
Rate limit key: rate:{workspace_id} — per workspace, not per IP.
Data Conventions
- Field names:
snake_case; sanitize on ingest (remove spaces, special chars) - Timestamps:
received_at(server),sent_at(client),timestamp(event time for analytics) - Dedup key:
message_id, Redis SETNX, TTL 24h - Nested objects: auto-flatten before schema check
- Type coercion: none — type conflict → reject immediately
- Write Key: never log raw; always masked in logs
Logging Policy (zap)
Happy path → metadata only, no payload LOG_PAYLOAD_ON_SUCCESS=false (default)
Error/reject → full payload logged LOG_PAYLOAD_ON_ERROR=true (default)
Write Key → always masked, never raw
Fields logged on every request: workspace_id, source_id, message_id, event_type, duration_ms, status_code.
API Endpoints (Ingest)
| Method | Path | Description |
|---|---|---|
POST |
/track |
Single event |
POST |
/batch |
Batch events (Segment-compatible) |
POST |
/identify |
Identify call |
POST |
/page |
Page call |
POST |
/group |
Group call |
GET |
/health |
Health check |
GET |
/ready |
Readiness check |
Every endpoint must have a request struct with validate tags. Validation runs before any business logic.
Coding Rules
- Do not write code unless asked — discuss architecture/features first
- Ask when scope is unclear, especially when multiple valid approaches exist
- YAGNI + KISS — do not build what is not needed yet
- Correctness before performance — optimize only when profiling proves it necessary
- Every ClickHouse schema change must have a migration file in
infra/clickhouse/ - Every PostgreSQL schema change must have a migration file in
infra/migrations/ - Every API endpoint must have a request struct with
validatetags - Never write raw events from analytics layer — ingestion is the sole writer
- Discuss in Vietnamese, write code and comments in English
Common Pitfalls
- Do not skip dedup check even for bulk imports — use a different TTL bucket if needed
- Do not change partition key from
anonymous_id— breaks identity stitching ordering - Do not cache Write Keys without the pub/sub invalidation path — revoked keys must propagate within TTL
rotoris Node.js, not Go — cross-service calls go over HTTP, never in-process- DLQ events must be replayable — never mutate DLQ topic; write to
events.retryfor replay - Do not return raw
pgxerrors to HTTP handlers — always wrap withAppError - Do not run migrations on server startup — use
make migrate/upexplicitly servicelayer must never importnet/httporchi— keep HTTP concerns inhandleronly