301 lines
10 KiB
Markdown
301 lines
10 KiB
Markdown
# CLAUDE.md — CDP Ingestion Service
|
||
|
||
> You are a senior software engineer building the **Data Ingestion Service** for a self-hosted CDP platform,
|
||
> inspired by Jitsu. Focus: event streaming, JS functions, identity stitching.
|
||
>
|
||
> **Scope boundary**: Ingestion only. Analytics & Customer 360 live in a separate service (`cdp-analytics`).
|
||
|
||
---
|
||
|
||
## What This Service Does
|
||
|
||
Collects events from any source → validates, deduplicates, transforms via JS Functions → stores in ClickHouse
|
||
and exports to external warehouses. Segment-compatible API for easy migration.
|
||
|
||
---
|
||
|
||
## Repository Layout
|
||
|
||
```
|
||
cdp-ingestion/
|
||
├── ingest/ # Go — HTTP API, auth, validate, dedup, push to Kafka (port 3049)
|
||
├── rotor/ # Node.js — JS functions runner, V8 isolate (port 3401)
|
||
├── bulker/ # Go — Kafka consumer, batch write to ClickHouse/warehouses (port 3042)
|
||
├── console/ # React + Vite + shadcn/ui + Tailwind — management UI (port 3000)
|
||
└── infra/
|
||
├── docker/
|
||
├── clickhouse/ # ClickHouse DDL / migrations
|
||
└── migrations/ # PostgreSQL migrations (golang-migrate)
|
||
```
|
||
|
||
---
|
||
|
||
## Tech Stack
|
||
|
||
### Go Services (ingest, bulker)
|
||
|
||
| Concern | Library | Notes |
|
||
|---------|---------|-------|
|
||
| HTTP router | `chi` | Lightweight, stdlib-compatible middleware |
|
||
| Logger | `zap` | Structured logging, fastest |
|
||
| PostgreSQL | `pgx/v5` | Native driver, no database/sql wrapper |
|
||
| Kafka | `franz-go` | Pure Go, no CGO, best Redpanda support |
|
||
| Redis | `rueidis` | Modern client, faster than go-redis |
|
||
| Config | `caarlos0/env` | Parse env vars into structs, zero deps |
|
||
| Validation | `go-playground/validator/v10` | Struct tags validation |
|
||
| Migration | `golang-migrate` + pgx driver | CLI only — never auto-migrate on startup |
|
||
| Test assertion | `testify` | assert + require + mock |
|
||
| Integration test | `testcontainers-go` | Real PG / Redis / ClickHouse in tests |
|
||
|
||
### React Console (console/)
|
||
|
||
| Concern | Library |
|
||
|---------|---------|
|
||
| Build | Vite |
|
||
| UI components | shadcn/ui + Tailwind |
|
||
| Routing | React Router v6 |
|
||
| Server state | TanStack Query |
|
||
| Client state | Zustand |
|
||
| Forms | react-hook-form + zod |
|
||
| Charts | Recharts |
|
||
| Icons | lucide-react |
|
||
|
||
> **No new technology** without discussion. All additions must justify why existing stack cannot handle it.
|
||
|
||
---
|
||
|
||
## Go Project Structure
|
||
|
||
Every Go service follows this layout:
|
||
|
||
```
|
||
ingest/
|
||
├── cmd/
|
||
│ └── server/
|
||
│ └── main.go # wire everything, start server
|
||
└── internal/
|
||
├── handler/ # HTTP handlers — parse request, call service, write response
|
||
├── service/ # business logic — no HTTP, no DB concerns
|
||
├── repo/ # DB queries — PostgreSQL via pgx, ClickHouse
|
||
├── kafka/ # producer (ingest) / consumer (bulker)
|
||
├── middleware/ # auth, rate limit, request ID, logging
|
||
└── config/ # env parsing via caarlos0/env
|
||
```
|
||
|
||
Rules:
|
||
- `handler` depends on `service`. `service` depends on `repo`. Never reverse.
|
||
- `handler` never touches DB directly.
|
||
- `service` never imports `chi` or any HTTP package.
|
||
- `repo` returns domain types, never raw `pgx.Rows`.
|
||
|
||
---
|
||
|
||
## Error Handling
|
||
|
||
Use `AppError` for all domain errors. Never return raw `pgx` or stdlib errors to handlers.
|
||
|
||
```go
|
||
// internal/apperr/apperr.go
|
||
|
||
type AppError struct {
|
||
Code int // HTTP status code to return
|
||
Message string // user-facing message (safe to expose)
|
||
Field string // optional: which field caused the error (schema conflict, validation)
|
||
Err error // original error for logging (not exposed to user)
|
||
}
|
||
|
||
func (e *AppError) Error() string { return e.Message }
|
||
func (e *AppError) Unwrap() error { return e.Err }
|
||
|
||
// Constructors
|
||
func BadRequest(msg, field string, err error) *AppError
|
||
func Conflict(msg string, err error) *AppError
|
||
func TooManyRequests(retryAfter int) *AppError
|
||
func UnprocessableEntity(msg string) *AppError
|
||
func Internal(err error) *AppError
|
||
```
|
||
|
||
Handler pattern — one place to handle all errors:
|
||
|
||
```go
|
||
func writeError(w http.ResponseWriter, err error) {
|
||
var appErr *apperr.AppError
|
||
if errors.As(err, &appErr) {
|
||
// log appErr.Err internally, return appErr.Message to user
|
||
render.JSON(w, appErr.Code, ErrorResponse{Error: appErr.Message, Field: appErr.Field})
|
||
return
|
||
}
|
||
// unexpected — log full error, return generic 500
|
||
render.JSON(w, 500, ErrorResponse{Error: "internal server error"})
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Testing Strategy
|
||
|
||
### Unit tests — handler + service layer
|
||
- Mock interfaces with `testify/mock`
|
||
- No real DB, no real Redis, no real Kafka
|
||
- File: `foo_test.go` alongside the file being tested
|
||
|
||
```go
|
||
type EventServiceMock struct { mock.Mock }
|
||
func (m *EventServiceMock) Track(ctx context.Context, e *Event) error {
|
||
return m.Called(ctx, e).Error(0)
|
||
}
|
||
```
|
||
|
||
### Integration tests — repo layer only
|
||
- Use `testcontainers-go` to spin up real PostgreSQL, Redis, ClickHouse
|
||
- File: `internal/repo/event_repo_test.go`
|
||
- Tag: `//go:build integration`
|
||
- Run: `make test/integration`
|
||
|
||
```bash
|
||
make test # unit only (fast, no containers)
|
||
make test/integration # repo layer with real DBs (slower, CI)
|
||
```
|
||
|
||
---
|
||
|
||
## Migration Workflow
|
||
|
||
```bash
|
||
# Create new migration
|
||
make migrate/new name=add_segment_memberships
|
||
|
||
# Apply
|
||
make migrate/up
|
||
|
||
# Rollback one step
|
||
make migrate/down
|
||
|
||
# Check status
|
||
make migrate/status
|
||
```
|
||
|
||
- Migration files live in `infra/migrations/`
|
||
- Format: `{version}_{name}.up.sql` + `{version}_{name}.down.sql`
|
||
- **Never** auto-run migrations on server startup
|
||
- Every PostgreSQL schema change **must** have a migration file — no exceptions
|
||
|
||
---
|
||
|
||
## Ingest Pipeline (Step-by-Step)
|
||
|
||
```
|
||
HTTP Request
|
||
1. Auth — Write Key → PostgreSQL lookup, cached in Redis (TTL 30–60s + pub/sub invalidation)
|
||
2. Payload validate — size ≤ PAYLOAD_LIMIT_KB (default 100KB), struct + validator tags
|
||
3. Rate limit — Redis sliding window per workspace_id; 429 + Retry-After on breach
|
||
4. Timestamp — received_at = server time; client time preserved as sent_at
|
||
5. Late event check — (received_at − sent_at) > 24h → 422 drop
|
||
6. Deduplication — Redis SETNX message_id, TTL 24h
|
||
7. JSON flatten — {"a":{"b":1}} → {"a_b":1}
|
||
8. Schema validate — type conflict → 400 + field name → push to DLQ
|
||
9. Push Kafka — partition key = anonymous_id (ordering for identity stitching)
|
||
10. Return 200 OK — fire-and-forget, do not wait for Kafka ack
|
||
```
|
||
|
||
## Kafka Topics
|
||
|
||
| Topic | Purpose |
|
||
|-------|---------|
|
||
| `events.ingest` | Happy path — valid events |
|
||
| `events.dlq` | Failed events — schema conflict, validation error, function crash |
|
||
| `events.retry` | Replay from DLQ after fix |
|
||
|
||
---
|
||
|
||
## Key Design Decisions
|
||
|
||
| Problem | Decision |
|
||
|---------|---------|
|
||
| Late events (> 24h) | Drop, return `422 Unprocessable` |
|
||
| Schema conflict | Reject `400`, include field name in response, push to DLQ |
|
||
| Timestamp authority | Server wins (`received_at`); client time kept as `sent_at` |
|
||
| Payload limit | Configurable, default 100KB; batch has separate limit |
|
||
| Partition key | `anonymous_id` — guarantees ordering for identity stitching |
|
||
| Enrich mode | Async by default — store raw event first, worker enriches after |
|
||
| Identity backfill | Async + lock per `anonymous_id` to avoid race condition |
|
||
| Write Key cache | Redis TTL 30–60s + pub/sub invalidation on revoke |
|
||
| Graceful shutdown | Drain in-flight requests on SIGTERM before exit |
|
||
| Migration | CLI only — never auto-migrate on startup |
|
||
|
||
## Rate Limits
|
||
|
||
| Tier | RPS | Events/day | Burst (5s) |
|
||
|------|-----|-----------|-----------|
|
||
| Default | 100 | 1M | 500 |
|
||
| Pro | 500 | 10M | 2,500 |
|
||
| Enterprise | custom | custom | custom |
|
||
|
||
Rate limit key: `rate:{workspace_id}` — per workspace, not per IP.
|
||
|
||
---
|
||
|
||
## Data Conventions
|
||
|
||
- **Field names**: `snake_case`; sanitize on ingest (remove spaces, special chars)
|
||
- **Timestamps**: `received_at` (server), `sent_at` (client), `timestamp` (event time for analytics)
|
||
- **Dedup key**: `message_id`, Redis SETNX, TTL 24h
|
||
- **Nested objects**: auto-flatten before schema check
|
||
- **Type coercion**: none — type conflict → reject immediately
|
||
- **Write Key**: never log raw; always masked in logs
|
||
|
||
---
|
||
|
||
## Logging Policy (zap)
|
||
|
||
```
|
||
Happy path → metadata only, no payload LOG_PAYLOAD_ON_SUCCESS=false (default)
|
||
Error/reject → full payload logged LOG_PAYLOAD_ON_ERROR=true (default)
|
||
Write Key → always masked, never raw
|
||
```
|
||
|
||
Fields logged on every request: `workspace_id`, `source_id`, `message_id`, `event_type`, `duration_ms`, `status_code`.
|
||
|
||
---
|
||
|
||
## API Endpoints (Ingest)
|
||
|
||
| Method | Path | Description |
|
||
|--------|------|-------------|
|
||
| `POST` | `/track` | Single event |
|
||
| `POST` | `/batch` | Batch events (Segment-compatible) |
|
||
| `POST` | `/identify` | Identify call |
|
||
| `POST` | `/page` | Page call |
|
||
| `POST` | `/group` | Group call |
|
||
| `GET` | `/health` | Health check |
|
||
| `GET` | `/ready` | Readiness check |
|
||
|
||
Every endpoint must have a request struct with `validate` tags. Validation runs before any business logic.
|
||
|
||
---
|
||
|
||
## Coding Rules
|
||
|
||
- **Do not write code unless asked** — discuss architecture/features first
|
||
- **Ask when scope is unclear**, especially when multiple valid approaches exist
|
||
- **YAGNI + KISS** — do not build what is not needed yet
|
||
- **Correctness before performance** — optimize only when profiling proves it necessary
|
||
- **Every ClickHouse schema change must have a migration file** in `infra/clickhouse/`
|
||
- **Every PostgreSQL schema change must have a migration file** in `infra/migrations/`
|
||
- **Every API endpoint must have a request struct with `validate` tags**
|
||
- **Never write raw events from analytics layer** — ingestion is the sole writer
|
||
- Discuss in **Vietnamese**, write code and comments in **English**
|
||
|
||
---
|
||
|
||
## Common Pitfalls
|
||
|
||
- Do not skip dedup check even for bulk imports — use a different TTL bucket if needed
|
||
- Do not change partition key from `anonymous_id` — breaks identity stitching ordering
|
||
- Do not cache Write Keys without the pub/sub invalidation path — revoked keys must propagate within TTL
|
||
- `rotor` is Node.js, not Go — cross-service calls go over HTTP, never in-process
|
||
- DLQ events must be replayable — never mutate DLQ topic; write to `events.retry` for replay
|
||
- Do not return raw `pgx` errors to HTTP handlers — always wrap with `AppError`
|
||
- Do not run migrations on server startup — use `make migrate/up` explicitly
|
||
- `service` layer must never import `net/http` or `chi` — keep HTTP concerns in `handler` only
|