From 4e8c11d545d737029a7cfe9039e6ca337d33c052 Mon Sep 17 00:00:00 2001 From: renolation Date: Sun, 24 May 2026 22:59:24 +0700 Subject: [PATCH] init ingestion --- .gitignore | 83 ++++ data-layer/CLAUDE_analytics.md | 415 ++++++++++++++++++ ingestion/.env.example | 54 +++ ingestion/.gitignore | 47 ++ ingestion/CLAUDE_ingestion.md | 300 +++++++++++++ ingestion/Makefile | 137 ++++++ ingestion/README.md | 33 ++ ingestion/bulker/Dockerfile | 12 + ingestion/bulker/cmd/server/main.go | 136 ++++++ ingestion/bulker/go.mod | 12 + ingestion/bulker/internal/batcher/batcher.go | 107 +++++ .../bulker/internal/batcher/batcher_test.go | 56 +++ ingestion/bulker/internal/config/config.go | 35 ++ .../bulker/internal/consumer/consumer.go | 130 ++++++ ingestion/bulker/internal/model/event.go | 41 ++ .../bulker/internal/writer/clickhouse.go | 250 +++++++++++ ingestion/console/Dockerfile | 11 + ingestion/console/index.html | 13 + ingestion/console/nginx.conf | 23 + ingestion/console/package.json | 44 ++ ingestion/console/postcss.config.js | 6 + ingestion/console/public/favicon.svg | 1 + ingestion/console/src/App.tsx | 32 ++ ingestion/console/src/api/client.ts | 90 ++++ ingestion/console/src/components/AppShell.tsx | 50 +++ ingestion/console/src/components/ui/badge.tsx | 25 ++ .../console/src/components/ui/button.tsx | 48 ++ ingestion/console/src/components/ui/card.tsx | 44 ++ ingestion/console/src/components/ui/input.tsx | 19 + ingestion/console/src/index.css | 46 ++ ingestion/console/src/lib/utils.ts | 6 + ingestion/console/src/main.tsx | 10 + ingestion/console/src/pages/Dashboard.tsx | 81 ++++ ingestion/console/src/pages/Destinations.tsx | 43 ++ ingestion/console/src/pages/Functions.tsx | 101 +++++ ingestion/console/src/pages/Live.tsx | 74 ++++ ingestion/console/src/pages/Settings.tsx | 30 ++ ingestion/console/src/pages/Sources.tsx | 46 ++ ingestion/console/src/stores/workspace.ts | 13 + ingestion/console/tailwind.config.ts | 50 +++ ingestion/console/tsconfig.json | 22 + ingestion/console/vite.config.ts | 20 + .../infra/clickhouse/000001_events.down.sql | 5 + .../infra/clickhouse/000001_events.up.sql | 117 +++++ ingestion/infra/docker/clickhouse-config.xml | 19 + ingestion/infra/docker/docker-compose.yml | 113 +++++ .../infra/migrations/000001_init.down.sql | 12 + ingestion/infra/migrations/000001_init.up.sql | 178 ++++++++ .../infra/migrations/000002_seed_dev.down.sql | 3 + .../infra/migrations/000002_seed_dev.up.sql | 24 + ingestion/infra/scripts/clickhouse_apply.sh | 67 +++ ingestion/ingest/Dockerfile | 12 + ingestion/ingest/cmd/server/main.go | 157 +++++++ ingestion/ingest/go.mod | 15 + ingestion/ingest/internal/apperr/apperr.go | 79 ++++ ingestion/ingest/internal/config/config.go | 41 ++ ingestion/ingest/internal/dedup/dedup.go | 50 +++ ingestion/ingest/internal/handler/handler.go | 209 +++++++++ ingestion/ingest/internal/kafka/producer.go | 110 +++++ .../ingest/internal/middleware/middleware.go | 193 ++++++++ ingestion/ingest/internal/model/event.go | 81 ++++ ingestion/ingest/internal/model/writekey.go | 19 + .../ingest/internal/ratelimit/ratelimit.go | 102 +++++ ingestion/ingest/internal/repo/pool.go | 33 ++ ingestion/ingest/internal/repo/schema_repo.go | 61 +++ .../ingest/internal/repo/writekey_repo.go | 66 +++ ingestion/ingest/internal/schema/flatten.go | 92 ++++ .../ingest/internal/schema/flatten_test.go | 53 +++ ingestion/ingest/internal/service/auth.go | 115 +++++ ingestion/ingest/internal/service/ingest.go | 223 ++++++++++ .../ingest/internal/service/ingest_test.go | 150 +++++++ ingestion/rotor/Dockerfile | 18 + ingestion/rotor/README.md | 45 ++ ingestion/rotor/package.json | 23 + ingestion/rotor/src/api/server.js | 126 ++++++ ingestion/rotor/src/config.js | 9 + ingestion/rotor/src/index.js | 34 ++ ingestion/rotor/src/registry/registry.js | 41 ++ ingestion/rotor/src/runtime/isolate.js | 97 ++++ ingestion/rotor/test/isolate.test.js | 51 +++ 80 files changed, 5639 insertions(+) create mode 100644 .gitignore create mode 100644 data-layer/CLAUDE_analytics.md create mode 100644 ingestion/.env.example create mode 100644 ingestion/.gitignore create mode 100644 ingestion/CLAUDE_ingestion.md create mode 100644 ingestion/Makefile create mode 100644 ingestion/README.md create mode 100644 ingestion/bulker/Dockerfile create mode 100644 ingestion/bulker/cmd/server/main.go create mode 100644 ingestion/bulker/go.mod create mode 100644 ingestion/bulker/internal/batcher/batcher.go create mode 100644 ingestion/bulker/internal/batcher/batcher_test.go create mode 100644 ingestion/bulker/internal/config/config.go create mode 100644 ingestion/bulker/internal/consumer/consumer.go create mode 100644 ingestion/bulker/internal/model/event.go create mode 100644 ingestion/bulker/internal/writer/clickhouse.go create mode 100644 ingestion/console/Dockerfile create mode 100644 ingestion/console/index.html create mode 100644 ingestion/console/nginx.conf create mode 100644 ingestion/console/package.json create mode 100644 ingestion/console/postcss.config.js create mode 100644 ingestion/console/public/favicon.svg create mode 100644 ingestion/console/src/App.tsx create mode 100644 ingestion/console/src/api/client.ts create mode 100644 ingestion/console/src/components/AppShell.tsx create mode 100644 ingestion/console/src/components/ui/badge.tsx create mode 100644 ingestion/console/src/components/ui/button.tsx create mode 100644 ingestion/console/src/components/ui/card.tsx create mode 100644 ingestion/console/src/components/ui/input.tsx create mode 100644 ingestion/console/src/index.css create mode 100644 ingestion/console/src/lib/utils.ts create mode 100644 ingestion/console/src/main.tsx create mode 100644 ingestion/console/src/pages/Dashboard.tsx create mode 100644 ingestion/console/src/pages/Destinations.tsx create mode 100644 ingestion/console/src/pages/Functions.tsx create mode 100644 ingestion/console/src/pages/Live.tsx create mode 100644 ingestion/console/src/pages/Settings.tsx create mode 100644 ingestion/console/src/pages/Sources.tsx create mode 100644 ingestion/console/src/stores/workspace.ts create mode 100644 ingestion/console/tailwind.config.ts create mode 100644 ingestion/console/tsconfig.json create mode 100644 ingestion/console/vite.config.ts create mode 100644 ingestion/infra/clickhouse/000001_events.down.sql create mode 100644 ingestion/infra/clickhouse/000001_events.up.sql create mode 100644 ingestion/infra/docker/clickhouse-config.xml create mode 100644 ingestion/infra/docker/docker-compose.yml create mode 100644 ingestion/infra/migrations/000001_init.down.sql create mode 100644 ingestion/infra/migrations/000001_init.up.sql create mode 100644 ingestion/infra/migrations/000002_seed_dev.down.sql create mode 100644 ingestion/infra/migrations/000002_seed_dev.up.sql create mode 100755 ingestion/infra/scripts/clickhouse_apply.sh create mode 100644 ingestion/ingest/Dockerfile create mode 100644 ingestion/ingest/cmd/server/main.go create mode 100644 ingestion/ingest/go.mod create mode 100644 ingestion/ingest/internal/apperr/apperr.go create mode 100644 ingestion/ingest/internal/config/config.go create mode 100644 ingestion/ingest/internal/dedup/dedup.go create mode 100644 ingestion/ingest/internal/handler/handler.go create mode 100644 ingestion/ingest/internal/kafka/producer.go create mode 100644 ingestion/ingest/internal/middleware/middleware.go create mode 100644 ingestion/ingest/internal/model/event.go create mode 100644 ingestion/ingest/internal/model/writekey.go create mode 100644 ingestion/ingest/internal/ratelimit/ratelimit.go create mode 100644 ingestion/ingest/internal/repo/pool.go create mode 100644 ingestion/ingest/internal/repo/schema_repo.go create mode 100644 ingestion/ingest/internal/repo/writekey_repo.go create mode 100644 ingestion/ingest/internal/schema/flatten.go create mode 100644 ingestion/ingest/internal/schema/flatten_test.go create mode 100644 ingestion/ingest/internal/service/auth.go create mode 100644 ingestion/ingest/internal/service/ingest.go create mode 100644 ingestion/ingest/internal/service/ingest_test.go create mode 100644 ingestion/rotor/Dockerfile create mode 100644 ingestion/rotor/README.md create mode 100644 ingestion/rotor/package.json create mode 100644 ingestion/rotor/src/api/server.js create mode 100644 ingestion/rotor/src/config.js create mode 100644 ingestion/rotor/src/index.js create mode 100644 ingestion/rotor/src/registry/registry.js create mode 100644 ingestion/rotor/src/runtime/isolate.js create mode 100644 ingestion/rotor/test/isolate.test.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..19a16a2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,83 @@ +### macOS +# Finder metadata +.DS_Store + +# Thumbnails +._* + +# Custom folder icons +Icon + +# Volume root files +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +### Go +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out + +# Go workspace file +go.work.sum + +# env file +.env + +### Node +# Dependencies +node_modules/ + +# Logs +*.log + +# Runtime data +*.pid +*.pid.lock + +# Coverage +coverage/ +*.lcov +.nyc_output + +# Build output +dist/ +build/Release + +# TypeScript cache +*.tsbuildinfo + +# Framework build output and caches +.cache +.parcel-cache +.next +out/ +.nuxt + +# dotenv environment variable files +.env +.env.local +.env.*.local + +# npm cache directory +.npm +*.tgz + +# yarn v2 +.yarn/cache +.yarn/unplugged +.yarn/install-state.gz +.pnp.* \ No newline at end of file diff --git a/data-layer/CLAUDE_analytics.md b/data-layer/CLAUDE_analytics.md new file mode 100644 index 0000000..abd7632 --- /dev/null +++ b/data-layer/CLAUDE_analytics.md @@ -0,0 +1,415 @@ +# CLAUDE.md — CDP Analytics Service + +> You are a senior software engineer building the **Analytics & Data Layer** for a self-hosted CDP platform. +> This service focuses on **query, explore, and activate** data already ingested into ClickHouse. +> +> **Scope boundary**: Read-side only. Never write raw events. Ingestion is handled by `cdp-ingestion`. + +--- + +## What This Service Does + +Exposes ingested event data via Query API for exploration and analysis. Computes Traits and Audience +Segments from event history via background workers. Activates segments to external tools via Reverse ETL +and webhooks. + +--- + +## Repository Layout + +``` +cdp-analytics/ +├── api/ # Go — Query API, Profile API (port 4000) +├── workers/ # Go — Background jobs: Computed Traits, Segment refresh +├── console/ # React + Vite + shadcn/ui + Tailwind — Analytics UI +└── infra/ + ├── migrations/ # PostgreSQL migrations (golang-migrate) + └── clickhouse/ # ClickHouse query templates (.sql files) +``` + +--- + +## Tech Stack + +### Go Services (api, workers) + +| Concern | Library | Notes | +|---------|---------|-------| +| HTTP router | `chi` | Lightweight, stdlib-compatible middleware | +| Logger | `zap` | Structured logging, fastest | +| PostgreSQL | `pgx/v5` | Native driver, no database/sql wrapper | +| ClickHouse | `clickhouse-go/v2` | Official driver, native protocol, good batch support | +| Redis | `rueidis` | Modern client, faster than go-redis | +| Job queue | `riverqueue/river` | Postgres-backed, pgx/v5 native, built-in scheduler + retry | +| Config | `caarlos0/env` | Parse env vars into structs, zero deps | +| Validation | `go-playground/validator/v10` | Struct tags validation | +| Migration | `golang-migrate` + pgx driver | CLI only — never auto-migrate on startup | +| Test assertion | `testify` | assert + require + mock | +| Integration test | `testcontainers-go` | Real PG / Redis / ClickHouse in tests | + +### React Console (console/) + +| Concern | Library | +|---------|---------| +| Build | Vite | +| UI components | shadcn/ui + Tailwind | +| Routing | React Router v6 | +| Server state | TanStack Query | +| Client state | Zustand | +| Forms | react-hook-form + zod | +| Charts | Recharts | +| Icons | lucide-react | + +> **No new technology** without discussion. All additions must justify why existing stack cannot handle it. + +--- + +## Go Project Structure + +### api/ + +``` +api/ +├── cmd/ +│ └── server/ +│ └── main.go # wire everything, start server +└── internal/ + ├── handler/ # HTTP handlers — parse request, call service, write response + ├── service/ # business logic — no HTTP, no DB concerns + ├── repo/ # DB queries — PostgreSQL via pgx, ClickHouse via clickhouse-go + ├── middleware/ # auth, request ID, logging + └── config/ # env parsing via caarlos0/env +``` + +### workers/ + +``` +workers/ +├── cmd/ +│ └── worker/ +│ └── main.go # register jobs, start river worker +└── internal/ + ├── job/ # job definitions (ComputeTraitsJob, RefreshSegmentJob, ReverseETLJob) + ├── handler/ # job handlers — business logic per job type + ├── repo/ # DB queries shared across job handlers + └── config/ +``` + +Rules: +- `handler` depends on `service` (api) or `handler` on `repo` (workers). Never reverse. +- `handler` never touches DB directly in api/. +- `service` never imports `chi` or any HTTP package. +- `repo` returns domain types, never raw `pgx.Rows` or `driver.Rows`. +- ClickHouse queries live as `.sql` files in `infra/clickhouse/` — no inline SQL strings for complex queries. + +--- + +## Error Handling + +Same `AppError` pattern as ingestion. Never return raw `pgx` or `clickhouse-go` errors to handlers. + +```go +// internal/apperr/apperr.go + +type AppError struct { + Code int // HTTP status code to return + Message string // user-facing message (safe to expose) + Field string // optional: which field caused the error + Err error // original error for logging (not exposed to user) +} + +func (e *AppError) Error() string { return e.Message } +func (e *AppError) Unwrap() error { return e.Err } + +// Constructors +func BadRequest(msg, field string, err error) *AppError +func NotFound(msg string) *AppError +func Forbidden(msg string) *AppError +func Internal(err error) *AppError +``` + +Handler pattern — one place handles all errors: + +```go +func writeError(w http.ResponseWriter, err error) { + var appErr *apperr.AppError + if errors.As(err, &appErr) { + render.JSON(w, appErr.Code, ErrorResponse{Error: appErr.Message, Field: appErr.Field}) + return + } + render.JSON(w, 500, ErrorResponse{Error: "internal server error"}) +} +``` + +--- + +## ClickHouse Query Pattern + +Use raw SQL only. No query builder — ClickHouse SQL has its own syntax that builders handle poorly. + +``` +infra/clickhouse/ +├── event_explorer.sql +├── funnel_analysis.sql +├── retention_cohort.sql +└── session_analysis.sql +``` + +Load templates at startup, inject parameters safely: + +```go +// Never fmt.Sprintf into SQL — use named parameters +query, err := templates.Load("funnel_analysis.sql") +rows, err := chConn.Query(ctx, query, clickhouse.Named("workspace_id", id), ...) +``` + +Rules: +- All ClickHouse queries must have a corresponding `.sql` file in `infra/clickhouse/` +- No multi-line SQL strings inline in Go code +- Every ClickHouse schema change must have a DDL file in `infra/clickhouse/` + +--- + +## Job Queue (river) + +Background workers use `riverqueue/river` backed by PostgreSQL. + +```go +// Define a job +type ComputeTraitsArgs struct { + WorkspaceID string `json:"workspace_id"` + TraitID string `json:"trait_id"` +} +func (ComputeTraitsArgs) Kind() string { return "compute_traits" } + +// Register handler +river.AddWorker(workers, &ComputeTraitsWorker{repo: repo}) + +// Enqueue +client.Insert(ctx, ComputeTraitsArgs{WorkspaceID: "ws_123", TraitID: "t_456"}, nil) +``` + +Scheduled jobs (periodic): + +```go +// Hourly trait recompute, hourly segment refresh +&river.PeriodicJob{ + ScheduleFunc: river.ScheduleFunc(func(t time.Time) time.Time { + return t.Add(time.Hour) + }), + ConstructorFunc: func() (river.JobArgs, *river.InsertOpts) { + return ComputeTraitsArgs{}, nil + }, +} +``` + +Rules: +- Workers must be idempotent — river may retry on failure +- Use `river`'s built-in retry with exponential backoff, do not implement custom retry +- Log job start, job end, duration, and error with full context (job_id, args) + +--- + +## Cache Strategy (Redis) + +Semantic key structure — allows per-workspace invalidation: + +``` +cache:query:events:{workspace_id}:{hash(params)} TTL 60s +cache:query:funnel:{workspace_id}:{hash(params)} TTL 60s +cache:query:retention:{workspace_id}:{hash(params)} TTL 60s +cache:dashboard:{workspace_id} TTL 60s +cache:profile:{workspace_id}:{profile_id} TTL 30s +``` + +Rules: +- Default TTL: 60s for aggregate queries, 30s for profile lookups +- TTL is configurable per query type via env vars +- On cache miss: query ClickHouse, write result to Redis, return result +- Never cache Custom SQL results — each query is arbitrary + +--- + +## Custom SQL Sandbox + +`POST /query/sql` allows arbitrary SQL on ClickHouse. Two layers of protection: + +**Layer 1 — App-level parse (Go):** +```go +// Reject anything that is not a SELECT statement +func validateReadOnly(sql string) error { + normalized := strings.TrimSpace(strings.ToUpper(sql)) + if !strings.HasPrefix(normalized, "SELECT") { + return apperr.BadRequest("only SELECT statements are allowed", "sql", nil) + } + // Reject common DDL/DML keywords + forbidden := []string{"INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", "TRUNCATE"} + for _, kw := range forbidden { + if strings.Contains(normalized, kw) { + return apperr.BadRequest("statement contains forbidden keyword: "+kw, "sql", nil) + } + } + return nil +} +``` + +**Layer 2 — ClickHouse read-only user:** +- Custom SQL queries run as a separate ClickHouse user with `SELECT`-only grants +- DDL/DML rejected at DB level even if app-level check is bypassed + +--- + +## Testing Strategy + +### Unit tests — handler + service layer +- Mock interfaces with `testify/mock` +- No real DB, no real Redis, no real ClickHouse +- File: `foo_test.go` alongside the file being tested + +### Integration tests — repo layer only +- Use `testcontainers-go` to spin up real PostgreSQL, Redis, ClickHouse +- File: `internal/repo/event_repo_test.go` +- Tag: `//go:build integration` + +```bash +make test # unit only (fast, no containers) +make test/integration # repo layer with real DBs (slower, CI) +``` + +--- + +## Migration Workflow + +```bash +make migrate/new name=add_profile_traits # create up+down files +make migrate/up # apply all pending +make migrate/down # rollback one step +make migrate/status # show current version +``` + +- Migration files: `infra/migrations/{version}_{name}.up.sql` + `.down.sql` +- **Never** auto-run migrations on server startup +- Every PostgreSQL schema change **must** have a migration file + +--- + +## PostgreSQL Schema (Analytics-owned tables) + +```sql +-- Computed trait values per profile +profile_traits ( + profile_id UUID, + trait_key TEXT, + trait_value JSONB, + computed_at TIMESTAMPTZ +) + +-- Segment membership history (used for delta Reverse ETL) +segment_memberships ( + segment_id UUID, + profile_id UUID, + entered_at TIMESTAMPTZ, + exited_at TIMESTAMPTZ -- NULL = currently a member +) +``` + +--- + +## Data Sources (Read-only) + +This service **only reads** data written by `cdp-ingestion`. Never write to these tables. + +| Source | Data | +|--------|------| +| ClickHouse `events` | Flattened, schema-managed raw events | +| PostgreSQL `profiles` | Identity graph, unified profiles | +| PostgreSQL `sources` / `destinations` | Config metadata | +| PostgreSQL `schemas` | Schema registry from ingestion | + +--- + +## Key Design Decisions + +| Problem | Decision | +|---------|---------| +| Job queue | `river` on PostgreSQL — no Temporal, no Celery | +| Computed Traits refresh | Hourly default, configurable per trait | +| Segment re-evaluate | Full re-evaluate — simpler than incremental | +| Query cache | Redis semantic keys, TTL 60s default | +| Custom SQL | App-level SELECT-only check + ClickHouse read-only user | +| Reverse ETL | Delta only (entered/exited) — never push full member list | +| ClickHouse queries | Raw SQL in `.sql` template files — no query builder | +| Scaling | Vertical — increase RAM/CPU, not instances | +| Migration | CLI only — never auto-migrate on startup | + +--- + +## API Endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `POST` | `/query/events` | Filter + query raw events | +| `POST` | `/query/sql` | Custom SQL on ClickHouse (SELECT only) | +| `POST` | `/query/funnel` | Funnel analysis | +| `POST` | `/query/retention` | Retention cohort | +| `GET` | `/profiles/:id` | Unified profile lookup | +| `GET` | `/profiles/:id/events` | User event timeline | +| `GET` | `/segments` | List segments | +| `POST` | `/segments` | Create segment | +| `GET` | `/segments/:id/members` | Segment members | +| `GET` | `/traits/definitions` | List computed trait definitions | +| `GET` | `/health` | Health check | +| `GET` | `/ready` | Readiness check | + +Every endpoint must have a request struct with `validate` tags. Validation runs before any business logic. + +--- + +## Feature Priorities + +| Priority | Features | +|----------|---------| +| **P0** | Event Explorer, Custom SQL, Profile Lookup, Event Timeline, Saved Queries | +| **P1** | Funnel Analysis, Retention Analysis, Session Analysis, Pre-built Dashboards | +| **P2** | Computed Traits, Audience Segments, Background Worker | +| **P3** | Reverse ETL, Webhook Push, Schema Registry, Data Catalog | + +Build in priority order. Do not start P1 before P0 is stable. + +--- + +## Logging Policy (zap) + +``` +Query requests → log workspace_id, query_type, duration_ms, rows_returned, cache_hit +Worker jobs → log job_id, job_kind, args, duration_ms, status (success/error) +Errors → log full error chain with context +``` + +--- + +## Coding Rules + +- **Do not write code unless asked** — discuss architecture/features first +- **Ask when scope is unclear**, especially when multiple valid approaches exist +- **YAGNI + KISS** — do not build what is not needed yet +- **Correctness before performance** — optimize only when profiling proves it necessary +- **Every PostgreSQL schema change must have a migration file** in `infra/migrations/` +- **Every ClickHouse query must have a `.sql` file** in `infra/clickhouse/` +- **Every API endpoint must have a request struct with `validate` tags** +- **Never write raw events** — this service is read-side only +- Discuss in **Vietnamese**, write code and comments in **English** + +--- + +## Common Pitfalls + +- Do not query ClickHouse directly for computed traits at request time — serve from PostgreSQL +- Do not run full segment scans on every API request — that is the worker's job +- Do not cache Custom SQL results — queries are arbitrary, cache would be useless +- Do not inline complex SQL strings in Go — use `.sql` template files +- Do not return raw `pgx` or `clickhouse-go` errors to HTTP handlers — wrap with `AppError` +- Do not run migrations on server startup — use `make migrate/up` explicitly +- Reverse ETL must push delta only (entered/exited), never the full member list per run +- Workers must be idempotent — `river` retries on failure, job may run more than once +- `service` layer must never import `net/http` or `chi` diff --git a/ingestion/.env.example b/ingestion/.env.example new file mode 100644 index 0000000..61939f0 --- /dev/null +++ b/ingestion/.env.example @@ -0,0 +1,54 @@ +# --------------------------------------------------------------------------- +# Shared infrastructure +# --------------------------------------------------------------------------- +POSTGRES_DSN=postgres://cdp:cdp@localhost:5432/cdp?sslmode=disable +REDIS_ADDR=localhost:6379 +KAFKA_BROKERS=localhost:9092 +CLICKHOUSE_ADDR=localhost:9000 +CLICKHOUSE_DB=cdp +CLICKHOUSE_USER=default +CLICKHOUSE_PASSWORD= + +# --------------------------------------------------------------------------- +# Ingest service +# --------------------------------------------------------------------------- +INGEST_HTTP_ADDR=:3049 +INGEST_LOG_LEVEL=info +INGEST_PAYLOAD_LIMIT_KB=100 +INGEST_BATCH_LIMIT_KB=4000 +INGEST_LATE_EVENT_HOURS=24 +INGEST_DEDUP_TTL_HOURS=24 +INGEST_WRITE_KEY_CACHE_TTL_SECONDS=45 +INGEST_LOG_PAYLOAD_ON_SUCCESS=false +INGEST_LOG_PAYLOAD_ON_ERROR=true +INGEST_SHUTDOWN_TIMEOUT_SECONDS=30 + +# Kafka topics +KAFKA_TOPIC_INGEST=events.ingest +KAFKA_TOPIC_DLQ=events.dlq +KAFKA_TOPIC_RETRY=events.retry + +# --------------------------------------------------------------------------- +# Bulker service +# --------------------------------------------------------------------------- +BULKER_HTTP_ADDR=:3042 +BULKER_LOG_LEVEL=info +BULKER_KAFKA_GROUP=bulker +BULKER_BATCH_SIZE=1000 +BULKER_BATCH_INTERVAL_SECONDS=5 +BULKER_SHUTDOWN_TIMEOUT_SECONDS=60 + +# --------------------------------------------------------------------------- +# Rotor (Node.js) +# --------------------------------------------------------------------------- +ROTOR_PORT=3401 +ROTOR_LOG_LEVEL=info +ROTOR_ISOLATE_MEMORY_MB=128 +ROTOR_FUNCTION_TIMEOUT_MS=2000 + +# --------------------------------------------------------------------------- +# Console (Vite) +# --------------------------------------------------------------------------- +VITE_API_BASE_URL=http://localhost:3049 +VITE_BULKER_BASE_URL=http://localhost:3042 +VITE_ROTOR_BASE_URL=http://localhost:3401 diff --git a/ingestion/.gitignore b/ingestion/.gitignore new file mode 100644 index 0000000..e03b90d --- /dev/null +++ b/ingestion/.gitignore @@ -0,0 +1,47 @@ +# Binaries +bin/ +*.exe +*.test +*.out + +# Go workspace +go.work +go.work.sum + +# Node +node_modules/ +dist/ +build/ +.next/ +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +pnpm-debug.log* + +# Env +.env +.env.local +.env.*.local + +# IDE +.vscode/ +.idea/ +*.swp +.DS_Store + +# Coverage +coverage/ +*.cover +*.coverage +coverage.out + +# Docker volumes +infra/docker/data/ + +# Console build output +console/dist/ +console/.vite/ + +# Rotor +rotor/dist/ diff --git a/ingestion/CLAUDE_ingestion.md b/ingestion/CLAUDE_ingestion.md new file mode 100644 index 0000000..24b2b43 --- /dev/null +++ b/ingestion/CLAUDE_ingestion.md @@ -0,0 +1,300 @@ +# CLAUDE.md — CDP Ingestion Service + +> You are a senior software engineer building the **Data Ingestion Service** for a self-hosted CDP platform, +> inspired by Jitsu. Focus: event streaming, JS functions, identity stitching. +> +> **Scope boundary**: Ingestion only. Analytics & Customer 360 live in a separate service (`cdp-analytics`). + +--- + +## What This Service Does + +Collects events from any source → validates, deduplicates, transforms via JS Functions → stores in ClickHouse +and exports to external warehouses. Segment-compatible API for easy migration. + +--- + +## Repository Layout + +``` +cdp-ingestion/ +├── ingest/ # Go — HTTP API, auth, validate, dedup, push to Kafka (port 3049) +├── rotor/ # Node.js — JS functions runner, V8 isolate (port 3401) +├── bulker/ # Go — Kafka consumer, batch write to ClickHouse/warehouses (port 3042) +├── console/ # React + Vite + shadcn/ui + Tailwind — management UI (port 3000) +└── infra/ + ├── docker/ + ├── clickhouse/ # ClickHouse DDL / migrations + └── migrations/ # PostgreSQL migrations (golang-migrate) +``` + +--- + +## Tech Stack + +### Go Services (ingest, bulker) + +| Concern | Library | Notes | +|---------|---------|-------| +| HTTP router | `chi` | Lightweight, stdlib-compatible middleware | +| Logger | `zap` | Structured logging, fastest | +| PostgreSQL | `pgx/v5` | Native driver, no database/sql wrapper | +| Kafka | `franz-go` | Pure Go, no CGO, best Redpanda support | +| Redis | `rueidis` | Modern client, faster than go-redis | +| Config | `caarlos0/env` | Parse env vars into structs, zero deps | +| Validation | `go-playground/validator/v10` | Struct tags validation | +| Migration | `golang-migrate` + pgx driver | CLI only — never auto-migrate on startup | +| Test assertion | `testify` | assert + require + mock | +| Integration test | `testcontainers-go` | Real PG / Redis / ClickHouse in tests | + +### React Console (console/) + +| Concern | Library | +|---------|---------| +| Build | Vite | +| UI components | shadcn/ui + Tailwind | +| Routing | React Router v6 | +| Server state | TanStack Query | +| Client state | Zustand | +| Forms | react-hook-form + zod | +| Charts | Recharts | +| Icons | lucide-react | + +> **No new technology** without discussion. All additions must justify why existing stack cannot handle it. + +--- + +## Go Project Structure + +Every Go service follows this layout: + +``` +ingest/ +├── cmd/ +│ └── server/ +│ └── main.go # wire everything, start server +└── internal/ + ├── handler/ # HTTP handlers — parse request, call service, write response + ├── service/ # business logic — no HTTP, no DB concerns + ├── repo/ # DB queries — PostgreSQL via pgx, ClickHouse + ├── kafka/ # producer (ingest) / consumer (bulker) + ├── middleware/ # auth, rate limit, request ID, logging + └── config/ # env parsing via caarlos0/env +``` + +Rules: +- `handler` depends on `service`. `service` depends on `repo`. Never reverse. +- `handler` never touches DB directly. +- `service` never imports `chi` or any HTTP package. +- `repo` returns domain types, never raw `pgx.Rows`. + +--- + +## Error Handling + +Use `AppError` for all domain errors. Never return raw `pgx` or stdlib errors to handlers. + +```go +// internal/apperr/apperr.go + +type AppError struct { + Code int // HTTP status code to return + Message string // user-facing message (safe to expose) + Field string // optional: which field caused the error (schema conflict, validation) + Err error // original error for logging (not exposed to user) +} + +func (e *AppError) Error() string { return e.Message } +func (e *AppError) Unwrap() error { return e.Err } + +// Constructors +func BadRequest(msg, field string, err error) *AppError +func Conflict(msg string, err error) *AppError +func TooManyRequests(retryAfter int) *AppError +func UnprocessableEntity(msg string) *AppError +func Internal(err error) *AppError +``` + +Handler pattern — one place to handle all errors: + +```go +func writeError(w http.ResponseWriter, err error) { + var appErr *apperr.AppError + if errors.As(err, &appErr) { + // log appErr.Err internally, return appErr.Message to user + render.JSON(w, appErr.Code, ErrorResponse{Error: appErr.Message, Field: appErr.Field}) + return + } + // unexpected — log full error, return generic 500 + render.JSON(w, 500, ErrorResponse{Error: "internal server error"}) +} +``` + +--- + +## Testing Strategy + +### Unit tests — handler + service layer +- Mock interfaces with `testify/mock` +- No real DB, no real Redis, no real Kafka +- File: `foo_test.go` alongside the file being tested + +```go +type EventServiceMock struct { mock.Mock } +func (m *EventServiceMock) Track(ctx context.Context, e *Event) error { + return m.Called(ctx, e).Error(0) +} +``` + +### Integration tests — repo layer only +- Use `testcontainers-go` to spin up real PostgreSQL, Redis, ClickHouse +- File: `internal/repo/event_repo_test.go` +- Tag: `//go:build integration` +- Run: `make test/integration` + +```bash +make test # unit only (fast, no containers) +make test/integration # repo layer with real DBs (slower, CI) +``` + +--- + +## Migration Workflow + +```bash +# Create new migration +make migrate/new name=add_segment_memberships + +# Apply +make migrate/up + +# Rollback one step +make migrate/down + +# Check status +make migrate/status +``` + +- Migration files live in `infra/migrations/` +- Format: `{version}_{name}.up.sql` + `{version}_{name}.down.sql` +- **Never** auto-run migrations on server startup +- Every PostgreSQL schema change **must** have a migration file — no exceptions + +--- + +## Ingest Pipeline (Step-by-Step) + +``` +HTTP Request + 1. Auth — Write Key → PostgreSQL lookup, cached in Redis (TTL 30–60s + pub/sub invalidation) + 2. Payload validate — size ≤ PAYLOAD_LIMIT_KB (default 100KB), struct + validator tags + 3. Rate limit — Redis sliding window per workspace_id; 429 + Retry-After on breach + 4. Timestamp — received_at = server time; client time preserved as sent_at + 5. Late event check — (received_at − sent_at) > 24h → 422 drop + 6. Deduplication — Redis SETNX message_id, TTL 24h + 7. JSON flatten — {"a":{"b":1}} → {"a_b":1} + 8. Schema validate — type conflict → 400 + field name → push to DLQ + 9. Push Kafka — partition key = anonymous_id (ordering for identity stitching) + 10. Return 200 OK — fire-and-forget, do not wait for Kafka ack +``` + +## Kafka Topics + +| Topic | Purpose | +|-------|---------| +| `events.ingest` | Happy path — valid events | +| `events.dlq` | Failed events — schema conflict, validation error, function crash | +| `events.retry` | Replay from DLQ after fix | + +--- + +## Key Design Decisions + +| Problem | Decision | +|---------|---------| +| Late events (> 24h) | Drop, return `422 Unprocessable` | +| Schema conflict | Reject `400`, include field name in response, push to DLQ | +| Timestamp authority | Server wins (`received_at`); client time kept as `sent_at` | +| Payload limit | Configurable, default 100KB; batch has separate limit | +| Partition key | `anonymous_id` — guarantees ordering for identity stitching | +| Enrich mode | Async by default — store raw event first, worker enriches after | +| Identity backfill | Async + lock per `anonymous_id` to avoid race condition | +| Write Key cache | Redis TTL 30–60s + pub/sub invalidation on revoke | +| Graceful shutdown | Drain in-flight requests on SIGTERM before exit | +| Migration | CLI only — never auto-migrate on startup | + +## Rate Limits + +| Tier | RPS | Events/day | Burst (5s) | +|------|-----|-----------|-----------| +| Default | 100 | 1M | 500 | +| Pro | 500 | 10M | 2,500 | +| Enterprise | custom | custom | custom | + +Rate limit key: `rate:{workspace_id}` — per workspace, not per IP. + +--- + +## Data Conventions + +- **Field names**: `snake_case`; sanitize on ingest (remove spaces, special chars) +- **Timestamps**: `received_at` (server), `sent_at` (client), `timestamp` (event time for analytics) +- **Dedup key**: `message_id`, Redis SETNX, TTL 24h +- **Nested objects**: auto-flatten before schema check +- **Type coercion**: none — type conflict → reject immediately +- **Write Key**: never log raw; always masked in logs + +--- + +## Logging Policy (zap) + +``` +Happy path → metadata only, no payload LOG_PAYLOAD_ON_SUCCESS=false (default) +Error/reject → full payload logged LOG_PAYLOAD_ON_ERROR=true (default) +Write Key → always masked, never raw +``` + +Fields logged on every request: `workspace_id`, `source_id`, `message_id`, `event_type`, `duration_ms`, `status_code`. + +--- + +## API Endpoints (Ingest) + +| Method | Path | Description | +|--------|------|-------------| +| `POST` | `/track` | Single event | +| `POST` | `/batch` | Batch events (Segment-compatible) | +| `POST` | `/identify` | Identify call | +| `POST` | `/page` | Page call | +| `POST` | `/group` | Group call | +| `GET` | `/health` | Health check | +| `GET` | `/ready` | Readiness check | + +Every endpoint must have a request struct with `validate` tags. Validation runs before any business logic. + +--- + +## Coding Rules + +- **Do not write code unless asked** — discuss architecture/features first +- **Ask when scope is unclear**, especially when multiple valid approaches exist +- **YAGNI + KISS** — do not build what is not needed yet +- **Correctness before performance** — optimize only when profiling proves it necessary +- **Every ClickHouse schema change must have a migration file** in `infra/clickhouse/` +- **Every PostgreSQL schema change must have a migration file** in `infra/migrations/` +- **Every API endpoint must have a request struct with `validate` tags** +- **Never write raw events from analytics layer** — ingestion is the sole writer +- Discuss in **Vietnamese**, write code and comments in **English** + +--- + +## Common Pitfalls + +- Do not skip dedup check even for bulk imports — use a different TTL bucket if needed +- Do not change partition key from `anonymous_id` — breaks identity stitching ordering +- Do not cache Write Keys without the pub/sub invalidation path — revoked keys must propagate within TTL +- `rotor` is Node.js, not Go — cross-service calls go over HTTP, never in-process +- DLQ events must be replayable — never mutate DLQ topic; write to `events.retry` for replay +- Do not return raw `pgx` errors to HTTP handlers — always wrap with `AppError` +- Do not run migrations on server startup — use `make migrate/up` explicitly +- `service` layer must never import `net/http` or `chi` — keep HTTP concerns in `handler` only diff --git a/ingestion/Makefile b/ingestion/Makefile new file mode 100644 index 0000000..a715211 --- /dev/null +++ b/ingestion/Makefile @@ -0,0 +1,137 @@ +.PHONY: help up down logs migrate/new migrate/up migrate/down migrate/status \ + clickhouse/up clickhouse/down \ + run/ingest run/bulker run/rotor run/console \ + build/ingest build/bulker \ + test test/integration \ + lint fmt tidy + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +POSTGRES_DSN ?= postgres://cdp:cdp@localhost:5432/cdp?sslmode=disable +CLICKHOUSE_DSN ?= clickhouse://default:@localhost:9000/cdp +MIGRATE_BIN ?= migrate +MIGRATIONS_DIR := infra/migrations +CLICKHOUSE_DIR := infra/clickhouse + +# --------------------------------------------------------------------------- +# Help +# --------------------------------------------------------------------------- + +help: + @echo "CDP Ingestion - common tasks" + @echo "" + @echo " make up docker-compose up infra (Postgres, Redis, Kafka, ClickHouse)" + @echo " make down docker-compose down" + @echo " make logs tail logs" + @echo "" + @echo " make migrate/new name=X create new PG migration" + @echo " make migrate/up apply PG migrations" + @echo " make migrate/down rollback one" + @echo " make migrate/status migration status" + @echo "" + @echo " make clickhouse/up apply ClickHouse DDL" + @echo " make clickhouse/down drop ClickHouse schema" + @echo "" + @echo " make run/ingest run ingest service (port 3049)" + @echo " make run/bulker run bulker service (port 3042)" + @echo " make run/rotor run rotor service (port 3401)" + @echo " make run/console run console UI (port 3000)" + @echo "" + @echo " make test unit tests" + @echo " make test/integration integration tests (testcontainers)" + +# --------------------------------------------------------------------------- +# Docker +# --------------------------------------------------------------------------- + +up: + docker compose -f infra/docker/docker-compose.yml up -d + +down: + docker compose -f infra/docker/docker-compose.yml down + +logs: + docker compose -f infra/docker/docker-compose.yml logs -f --tail=200 + +# --------------------------------------------------------------------------- +# PostgreSQL migrations +# --------------------------------------------------------------------------- + +migrate/new: + @if [ -z "$(name)" ]; then echo "usage: make migrate/new name=add_xxx"; exit 1; fi + $(MIGRATE_BIN) create -ext sql -dir $(MIGRATIONS_DIR) -seq $(name) + +migrate/up: + $(MIGRATE_BIN) -path $(MIGRATIONS_DIR) -database "$(POSTGRES_DSN)" up + +migrate/down: + $(MIGRATE_BIN) -path $(MIGRATIONS_DIR) -database "$(POSTGRES_DSN)" down 1 + +migrate/status: + $(MIGRATE_BIN) -path $(MIGRATIONS_DIR) -database "$(POSTGRES_DSN)" version + +# --------------------------------------------------------------------------- +# ClickHouse DDL +# --------------------------------------------------------------------------- + +clickhouse/up: + @bash infra/scripts/clickhouse_apply.sh up + +clickhouse/down: + @bash infra/scripts/clickhouse_apply.sh down + +# --------------------------------------------------------------------------- +# Run services +# --------------------------------------------------------------------------- + +run/ingest: + cd ingest && go run ./cmd/server + +run/bulker: + cd bulker && go run ./cmd/server + +run/rotor: + cd rotor && npm run dev + +run/console: + cd console && npm run dev + +# --------------------------------------------------------------------------- +# Build +# --------------------------------------------------------------------------- + +build/ingest: + cd ingest && CGO_ENABLED=0 go build -o ../bin/ingest ./cmd/server + +build/bulker: + cd bulker && CGO_ENABLED=0 go build -o ../bin/bulker ./cmd/server + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +test: + cd ingest && go test ./... -count=1 + cd bulker && go test ./... -count=1 + +test/integration: + cd ingest && go test -tags=integration ./... -count=1 -timeout=5m + cd bulker && go test -tags=integration ./... -count=1 -timeout=5m + +# --------------------------------------------------------------------------- +# Code quality +# --------------------------------------------------------------------------- + +lint: + cd ingest && golangci-lint run ./... + cd bulker && golangci-lint run ./... + +fmt: + cd ingest && gofmt -w . + cd bulker && gofmt -w . + +tidy: + cd ingest && go mod tidy + cd bulker && go mod tidy diff --git a/ingestion/README.md b/ingestion/README.md new file mode 100644 index 0000000..21f9f4c --- /dev/null +++ b/ingestion/README.md @@ -0,0 +1,33 @@ +# CDP Ingestion + +Self-hosted CDP ingestion platform inspired by Jitsu. Segment-compatible HTTP API. + +## Services + +| Service | Lang | Port | Role | +|---------|------|------|------| +| `ingest` | Go | 3049 | HTTP API → auth, validate, dedup, push to Kafka | +| `rotor` | Node.js | 3401 | JS functions runner (V8 isolates) | +| `bulker` | Go | 3042 | Kafka consumer → batch write ClickHouse / warehouses | +| `console` | React + Vite | 3000 | Management UI | + +## Quick start + +```bash +make up # docker-compose up infra (Postgres, Redis, Kafka, ClickHouse) +make migrate/up # apply PostgreSQL migrations +make clickhouse/up # apply ClickHouse DDL +make run/ingest # start ingest on :3049 +make run/bulker # start bulker on :3042 +make run/rotor # start rotor on :3401 +make run/console # start console on :3000 +``` + +## Testing + +```bash +make test # unit tests (no containers) +make test/integration # repo-layer integration tests (testcontainers) +``` + +See [CLAUDE_ingestion.md](./CLAUDE_ingestion.md) for the full design contract. diff --git a/ingestion/bulker/Dockerfile b/ingestion/bulker/Dockerfile new file mode 100644 index 0000000..6dc5048 --- /dev/null +++ b/ingestion/bulker/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.22-alpine AS build +WORKDIR /src +COPY go.mod go.sum* ./ +RUN go mod download || true +COPY . . +RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/bulker ./cmd/server + +FROM gcr.io/distroless/static-debian12:nonroot +COPY --from=build /out/bulker /bulker +EXPOSE 3042 +USER nonroot:nonroot +ENTRYPOINT ["/bulker"] diff --git a/ingestion/bulker/cmd/server/main.go b/ingestion/bulker/cmd/server/main.go new file mode 100644 index 0000000..f9d8865 --- /dev/null +++ b/ingestion/bulker/cmd/server/main.go @@ -0,0 +1,136 @@ +// Command server runs the CDP bulker -- Kafka consumer that batches events +// into ClickHouse (and other warehouses, when configured). +package main + +import ( + "context" + "encoding/json" + "errors" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/go-chi/chi/v5" + "go.uber.org/zap" + + "github.com/dbiz/cdp/ingestion/bulker/internal/batcher" + "github.com/dbiz/cdp/ingestion/bulker/internal/config" + "github.com/dbiz/cdp/ingestion/bulker/internal/consumer" + "github.com/dbiz/cdp/ingestion/bulker/internal/model" + "github.com/dbiz/cdp/ingestion/bulker/internal/writer" +) + +func main() { + if err := run(); err != nil { + log.Fatal(err) + } +} + +func run() error { + cfg, err := config.Load() + if err != nil { + return err + } + logger, err := newLogger(cfg.LogLevel) + if err != nil { + return err + } + defer func() { _ = logger.Sync() }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // ---- ClickHouse ------------------------------------------------------- + ch, err := writer.New(ctx, cfg.ClickHouseAddr, cfg.ClickHouseDB, cfg.ClickHouseUser, cfg.ClickHousePassword) + if err != nil { + return err + } + defer func() { _ = ch.Close() }() + + // ---- Batcher --------------------------------------------------------- + b := batcher.New(cfg.BatchSize, cfg.BatchInterval, + func(ctx context.Context, evs []*model.IngestedEvent) error { + _, err := ch.WriteEvents(ctx, evs) + return err + }, logger) + + go b.Run(ctx) + + // ---- Consumer -------------------------------------------------------- + cons, err := consumer.New(consumer.Config{ + Brokers: cfg.KafkaBrokers, + Group: cfg.KafkaGroup, + IngestTopic: cfg.KafkaTopicIngest, + DLQTopic: cfg.KafkaTopicDLQ, + }, b, ch, logger) + if err != nil { + return err + } + defer cons.Close() + + consumerErr := make(chan error, 1) + go func() { consumerErr <- cons.Run(ctx) }() + + // ---- HTTP (health) --------------------------------------------------- + r := chi.NewRouter() + r.Get("/health", func(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + }) + r.Get("/ready", func(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) + }) + + srv := &http.Server{ + Addr: cfg.HTTPAddr, + Handler: r, + ReadHeaderTimeout: 5 * time.Second, + } + + httpErr := make(chan error, 1) + go func() { + logger.Info("bulker http listening", zap.String("addr", cfg.HTTPAddr)) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + httpErr <- err + } + }() + + // ---- Signals --------------------------------------------------------- + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + select { + case <-sigCh: + logger.Info("shutdown signal received") + case err := <-consumerErr: + logger.Error("consumer stopped unexpectedly", zap.Error(err)) + case err := <-httpErr: + logger.Error("http stopped unexpectedly", zap.Error(err)) + } + + shutCtx, shutCancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout) + defer shutCancel() + cancel() // stop consumer + batcher + _ = srv.Shutdown(shutCtx) + _ = b.FlushNow(shutCtx) + return nil +} + +func writeJSON(w http.ResponseWriter, status int, body any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(body) +} + +func newLogger(level string) (*zap.Logger, error) { + lvl, err := zap.ParseAtomicLevel(level) + if err != nil { + lvl = zap.NewAtomicLevelAt(zap.InfoLevel) + } + cfg := zap.NewProductionConfig() + cfg.Level = lvl + cfg.EncoderConfig.TimeKey = "ts" + cfg.EncoderConfig.MessageKey = "msg" + return cfg.Build() +} diff --git a/ingestion/bulker/go.mod b/ingestion/bulker/go.mod new file mode 100644 index 0000000..6ab1517 --- /dev/null +++ b/ingestion/bulker/go.mod @@ -0,0 +1,12 @@ +module github.com/dbiz/cdp/ingestion/bulker + +go 1.22 + +require ( + github.com/ClickHouse/clickhouse-go/v2 v2.30.0 + github.com/caarlos0/env/v11 v11.2.2 + github.com/go-chi/chi/v5 v5.1.0 + github.com/stretchr/testify v1.9.0 + github.com/twmb/franz-go v1.17.1 + go.uber.org/zap v1.27.0 +) diff --git a/ingestion/bulker/internal/batcher/batcher.go b/ingestion/bulker/internal/batcher/batcher.go new file mode 100644 index 0000000..995b8d1 --- /dev/null +++ b/ingestion/bulker/internal/batcher/batcher.go @@ -0,0 +1,107 @@ +// Package batcher accumulates events from the consumer until either the +// size cap or the time cap is hit, then flushes them to the writer. +// +// Flush semantics: +// - on size cap: flush immediately +// - on time cap: flush whatever is buffered (even 0 events: no-op) +// - on shutdown: flush whatever is buffered, then return +package batcher + +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/dbiz/cdp/ingestion/bulker/internal/model" +) + +type FlushFunc func(ctx context.Context, events []*model.IngestedEvent) error + +type Batcher struct { + size int + interval time.Duration + flush FlushFunc + log *zap.Logger + + mu sync.Mutex + buffer []*model.IngestedEvent +} + +func New(size int, interval time.Duration, flush FlushFunc, log *zap.Logger) *Batcher { + return &Batcher{ + size: size, + interval: interval, + flush: flush, + log: log, + buffer: make([]*model.IngestedEvent, 0, size), + } +} + +// Add appends an event. If the size cap is reached we flush synchronously +// before returning so the consumer commit can rely on durability. +func (b *Batcher) Add(ctx context.Context, e *model.IngestedEvent) error { + b.mu.Lock() + b.buffer = append(b.buffer, e) + if len(b.buffer) < b.size { + b.mu.Unlock() + return nil + } + batch := b.swap() + b.mu.Unlock() + return b.doFlush(ctx, batch) +} + +// Run blocks until ctx is cancelled, flushing the buffer every interval. +func (b *Batcher) Run(ctx context.Context) { + t := time.NewTicker(b.interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + b.FlushNow(context.Background()) + return + case <-t.C: + if err := b.FlushNow(ctx); err != nil { + b.log.Warn("batch flush failed", zap.Error(err)) + } + } + } +} + +// FlushNow swaps the buffer and flushes synchronously. +func (b *Batcher) FlushNow(ctx context.Context) error { + b.mu.Lock() + batch := b.swap() + b.mu.Unlock() + return b.doFlush(ctx, batch) +} + +// swap returns the current buffer and replaces it with a fresh slice. +// Caller must hold b.mu. +func (b *Batcher) swap() []*model.IngestedEvent { + if len(b.buffer) == 0 { + return nil + } + out := b.buffer + b.buffer = make([]*model.IngestedEvent, 0, b.size) + return out +} + +func (b *Batcher) doFlush(ctx context.Context, batch []*model.IngestedEvent) error { + if len(batch) == 0 { + return nil + } + start := time.Now() + if err := b.flush(ctx, batch); err != nil { + b.log.Error("flush failed", + zap.Int("count", len(batch)), + zap.Error(err)) + return err + } + b.log.Info("flushed", + zap.Int("count", len(batch)), + zap.Int64("duration_ms", time.Since(start).Milliseconds())) + return nil +} diff --git a/ingestion/bulker/internal/batcher/batcher_test.go b/ingestion/bulker/internal/batcher/batcher_test.go new file mode 100644 index 0000000..cfb1fa9 --- /dev/null +++ b/ingestion/bulker/internal/batcher/batcher_test.go @@ -0,0 +1,56 @@ +package batcher + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/dbiz/cdp/ingestion/bulker/internal/model" +) + +func TestBatcher_FlushesOnSizeCap(t *testing.T) { + var flushed int32 + flush := func(_ context.Context, evs []*model.IngestedEvent) error { + atomic.AddInt32(&flushed, int32(len(evs))) + return nil + } + b := New(3, time.Hour, flush, zap.NewNop()) + for i := 0; i < 3; i++ { + _ = b.Add(context.Background(), &model.IngestedEvent{MessageID: "x"}) + } + assert.Equal(t, int32(3), atomic.LoadInt32(&flushed)) +} + +func TestBatcher_FlushNow_NoOpOnEmpty(t *testing.T) { + var called int32 + flush := func(_ context.Context, _ []*model.IngestedEvent) error { + atomic.AddInt32(&called, 1) + return nil + } + b := New(10, time.Hour, flush, zap.NewNop()) + _ = b.FlushNow(context.Background()) + assert.Equal(t, int32(0), atomic.LoadInt32(&called)) +} + +func TestBatcher_FlushesOnTimer(t *testing.T) { + var flushed int32 + flush := func(_ context.Context, evs []*model.IngestedEvent) error { + atomic.AddInt32(&flushed, int32(len(evs))) + return nil + } + b := New(1000, 50*time.Millisecond, flush, zap.NewNop()) + ctx, cancel := context.WithCancel(context.Background()) + go b.Run(ctx) + + _ = b.Add(context.Background(), &model.IngestedEvent{MessageID: "a"}) + _ = b.Add(context.Background(), &model.IngestedEvent{MessageID: "b"}) + + time.Sleep(120 * time.Millisecond) + cancel() + time.Sleep(10 * time.Millisecond) + assert.Equal(t, int32(2), atomic.LoadInt32(&flushed)) +} diff --git a/ingestion/bulker/internal/config/config.go b/ingestion/bulker/internal/config/config.go new file mode 100644 index 0000000..300261c --- /dev/null +++ b/ingestion/bulker/internal/config/config.go @@ -0,0 +1,35 @@ +// Package config loads bulker runtime config from env. +package config + +import ( + "fmt" + "time" + + "github.com/caarlos0/env/v11" +) + +type Config struct { + HTTPAddr string `env:"BULKER_HTTP_ADDR" envDefault:":3042"` + LogLevel string `env:"BULKER_LOG_LEVEL" envDefault:"info"` + KafkaGroup string `env:"BULKER_KAFKA_GROUP" envDefault:"bulker"` + BatchSize int `env:"BULKER_BATCH_SIZE" envDefault:"1000"` + BatchInterval time.Duration `env:"BULKER_BATCH_INTERVAL_SECONDS" envDefault:"5s"` + ShutdownTimeout time.Duration `env:"BULKER_SHUTDOWN_TIMEOUT_SECONDS" envDefault:"60s"` + + KafkaBrokers []string `env:"KAFKA_BROKERS" envSeparator:"," envDefault:"localhost:9092"` + KafkaTopicIngest string `env:"KAFKA_TOPIC_INGEST" envDefault:"events.ingest"` + KafkaTopicDLQ string `env:"KAFKA_TOPIC_DLQ" envDefault:"events.dlq"` + + ClickHouseAddr string `env:"CLICKHOUSE_ADDR" envDefault:"localhost:9000"` + ClickHouseDB string `env:"CLICKHOUSE_DB" envDefault:"cdp"` + ClickHouseUser string `env:"CLICKHOUSE_USER" envDefault:"default"` + ClickHousePassword string `env:"CLICKHOUSE_PASSWORD" envDefault:""` +} + +func Load() (*Config, error) { + cfg := &Config{} + if err := env.Parse(cfg); err != nil { + return nil, fmt.Errorf("config load: %w", err) + } + return cfg, nil +} diff --git a/ingestion/bulker/internal/consumer/consumer.go b/ingestion/bulker/internal/consumer/consumer.go new file mode 100644 index 0000000..c57d6c5 --- /dev/null +++ b/ingestion/bulker/internal/consumer/consumer.go @@ -0,0 +1,130 @@ +// Package consumer reads from the ingest Kafka topics and feeds the batcher. +// +// We use franz-go's manual commit mode: commit only after a successful +// batcher flush. Combined with at-least-once semantics from the producer +// and idempotent inserts at the analytics layer this is sufficient. +package consumer + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/zap" + + "github.com/dbiz/cdp/ingestion/bulker/internal/batcher" + "github.com/dbiz/cdp/ingestion/bulker/internal/model" + "github.com/dbiz/cdp/ingestion/bulker/internal/writer" +) + +type Consumer struct { + client *kgo.Client + log *zap.Logger + batcher *batcher.Batcher + writer *writer.ClickHouse + dlqTopic string + ingestTopic string +} + +type Config struct { + Brokers []string + Group string + IngestTopic string + DLQTopic string +} + +func New(c Config, b *batcher.Batcher, w *writer.ClickHouse, log *zap.Logger) (*Consumer, error) { + cl, err := kgo.NewClient( + kgo.SeedBrokers(c.Brokers...), + kgo.ConsumerGroup(c.Group), + kgo.ConsumeTopics(c.IngestTopic, c.DLQTopic), + kgo.DisableAutoCommit(), + kgo.ClientID("cdp-bulker"), + kgo.SessionTimeout(45_000_000_000), // 45s + kgo.FetchMaxBytes(50 * 1024 * 1024), + ) + if err != nil { + return nil, fmt.Errorf("kafka client: %w", err) + } + return &Consumer{ + client: cl, + log: log, + batcher: b, + writer: w, + dlqTopic: c.DLQTopic, + ingestTopic: c.IngestTopic, + }, nil +} + +func (c *Consumer) Close() { + c.client.Close() +} + +// Run polls Kafka until ctx is cancelled. One iteration: +// 1. PollFetches +// 2. For each record, parse JSON and route to the right destination +// 3. Commit offsets only after the batch flush succeeded +func (c *Consumer) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + c.log.Info("consumer stopping") + return nil + default: + } + + fetches := c.client.PollFetches(ctx) + if errs := fetches.Errors(); len(errs) > 0 { + for _, e := range errs { + c.log.Warn("fetch error", + zap.String("topic", e.Topic), + zap.Int32("partition", e.Partition), + zap.Error(e.Err)) + } + } + + var dlqBatch []*model.DLQRecord + fetches.EachRecord(func(r *kgo.Record) { + switch r.Topic { + case c.ingestTopic: + var ev model.IngestedEvent + if err := json.Unmarshal(r.Value, &ev); err != nil { + c.log.Warn("ingest decode failed", + zap.String("topic", r.Topic), + zap.Error(err)) + return + } + if err := c.batcher.Add(ctx, &ev); err != nil { + c.log.Error("batcher add failed", zap.Error(err)) + } + case c.dlqTopic: + var d model.DLQRecord + if err := json.Unmarshal(r.Value, &d); err != nil { + return + } + if d.ReceivedAt.IsZero() { + d.ReceivedAt = r.Timestamp + } + dlqBatch = append(dlqBatch, &d) + } + }) + + if len(dlqBatch) > 0 { + if err := c.writer.WriteDLQ(ctx, dlqBatch); err != nil { + c.log.Error("dlq write failed", zap.Error(err)) + } + } + + // Force a flush before committing so committed offsets reflect what's + // actually persisted. The batcher is idempotent for empty buffers. + if err := c.batcher.FlushNow(ctx); err != nil { + c.log.Warn("flush before commit failed", zap.Error(err)) + continue // do not commit -- retry on next poll + } + + if err := c.client.CommitUncommittedOffsets(ctx); err != nil { + c.log.Warn("commit failed", zap.Error(err)) + } + } +} diff --git a/ingestion/bulker/internal/model/event.go b/ingestion/bulker/internal/model/event.go new file mode 100644 index 0000000..792c40f --- /dev/null +++ b/ingestion/bulker/internal/model/event.go @@ -0,0 +1,41 @@ +package model + +import "time" + +// IngestedEvent mirrors the shape ingest publishes onto Kafka. +// Keep these two structs in lock-step (we are intentionally NOT importing +// ingest's package -- bulker compiles standalone). +type IngestedEvent struct { + WorkspaceID string `json:"workspace_id"` + SourceID string `json:"source_id"` + MessageID string `json:"message_id"` + Type string `json:"type"` + AnonymousID string `json:"anonymous_id,omitempty"` + UserID string `json:"user_id,omitempty"` + GroupID string `json:"group_id,omitempty"` + Event string `json:"event,omitempty"` + Name string `json:"name,omitempty"` + Category string `json:"category,omitempty"` + + Properties map[string]any `json:"properties,omitempty"` + Traits map[string]any `json:"traits,omitempty"` + Context map[string]any `json:"context,omitempty"` + + IP string `json:"ip,omitempty"` + UserAgent string `json:"user_agent,omitempty"` + + Timestamp time.Time `json:"timestamp"` + SentAt time.Time `json:"sent_at"` + ReceivedAt time.Time `json:"received_at"` +} + +// DLQRecord is the JSON shape the bulker reads from the DLQ topic. +type DLQRecord struct { + WorkspaceID string `json:"workspace_id"` + SourceID string `json:"source_id"` + MessageID string `json:"message_id"` + Reason string `json:"reason"` + Field string `json:"field"` + RawPayload string `json:"raw_payload"` + ReceivedAt time.Time `json:"received_at"` +} diff --git a/ingestion/bulker/internal/writer/clickhouse.go b/ingestion/bulker/internal/writer/clickhouse.go new file mode 100644 index 0000000..a3cb401 --- /dev/null +++ b/ingestion/bulker/internal/writer/clickhouse.go @@ -0,0 +1,250 @@ +// Package writer wraps the ClickHouse client for batch inserts. +// +// We use the native clickhouse-go v2 client. One PrepareBatch / Append / Send +// cycle per (table, batch). All maps are stringified before insertion -- the +// ClickHouse schema uses Map(String, String) which keeps the table flat and +// avoids column explosion. Analytics queries cast on read. +package writer + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + + "github.com/dbiz/cdp/ingestion/bulker/internal/model" +) + +type ClickHouse struct { + conn driver.Conn + db string +} + +func New(ctx context.Context, addr, db, user, password string) (*ClickHouse, error) { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{addr}, + Auth: clickhouse.Auth{ + Database: db, + Username: user, + Password: password, + }, + Settings: clickhouse.Settings{ + "async_insert": 0, + "wait_for_async_insert": 0, + }, + }) + if err != nil { + return nil, fmt.Errorf("clickhouse open: %w", err) + } + if err := conn.Ping(ctx); err != nil { + return nil, fmt.Errorf("clickhouse ping: %w", err) + } + return &ClickHouse{conn: conn, db: db}, nil +} + +func (c *ClickHouse) Close() error { return c.conn.Close() } + +// WriteEvents fans out a mixed-type batch into the per-type tables. +// Returns the number of rows successfully inserted across all tables. +func (c *ClickHouse) WriteEvents(ctx context.Context, events []*model.IngestedEvent) (int, error) { + if len(events) == 0 { + return 0, nil + } + + // Bucket by event type so each insert hits one table. + buckets := map[string][]*model.IngestedEvent{} + for _, e := range events { + buckets[e.Type] = append(buckets[e.Type], e) + } + + total := 0 + for t, evs := range buckets { + var err error + switch t { + case "track": + err = c.writeTrack(ctx, evs) + case "identify": + err = c.writeIdentify(ctx, evs) + case "page", "screen": + err = c.writePage(ctx, evs) + case "group": + err = c.writeGroup(ctx, evs) + default: + // alias / unknown types -- write to track for now + err = c.writeTrack(ctx, evs) + } + if err != nil { + return total, fmt.Errorf("write %s: %w", t, err) + } + total += len(evs) + } + return total, nil +} + +// --------------------------------------------------------------------------- +// per-table batch inserts +// --------------------------------------------------------------------------- + +func (c *ClickHouse) writeTrack(ctx context.Context, evs []*model.IngestedEvent) error { + batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_track") + if err != nil { + return err + } + for _, e := range evs { + err := batch.Append( + e.WorkspaceID, e.SourceID, e.MessageID, + e.AnonymousID, e.UserID, e.Event, + e.Timestamp, e.SentAt, e.ReceivedAt, + mapToStr(e.Properties), mapToStr(e.Context), + e.IP, e.UserAgent, + libraryName(e.Context), libraryVersion(e.Context), + ) + if err != nil { + return err + } + } + return batch.Send() +} + +func (c *ClickHouse) writeIdentify(ctx context.Context, evs []*model.IngestedEvent) error { + batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_identify") + if err != nil { + return err + } + for _, e := range evs { + err := batch.Append( + e.WorkspaceID, e.SourceID, e.MessageID, + e.AnonymousID, e.UserID, + e.Timestamp, e.SentAt, e.ReceivedAt, + mapToStr(e.Traits), mapToStr(e.Context), + e.IP, e.UserAgent, + ) + if err != nil { + return err + } + } + return batch.Send() +} + +func (c *ClickHouse) writePage(ctx context.Context, evs []*model.IngestedEvent) error { + batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_page") + if err != nil { + return err + } + for _, e := range evs { + path, _ := e.Properties["path"].(string) + url, _ := e.Properties["url"].(string) + referrer, _ := e.Properties["referrer"].(string) + err := batch.Append( + e.WorkspaceID, e.SourceID, e.MessageID, + e.AnonymousID, e.UserID, e.Name, e.Category, + e.Timestamp, e.SentAt, e.ReceivedAt, + mapToStr(e.Properties), mapToStr(e.Context), + e.IP, e.UserAgent, + referrer, path, url, + ) + if err != nil { + return err + } + } + return batch.Send() +} + +func (c *ClickHouse) writeGroup(ctx context.Context, evs []*model.IngestedEvent) error { + batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_group") + if err != nil { + return err + } + for _, e := range evs { + err := batch.Append( + e.WorkspaceID, e.SourceID, e.MessageID, + e.AnonymousID, e.UserID, e.GroupID, + e.Timestamp, e.SentAt, e.ReceivedAt, + mapToStr(e.Traits), mapToStr(e.Context), + e.IP, e.UserAgent, + ) + if err != nil { + return err + } + } + return batch.Send() +} + +// WriteDLQ inserts records from the DLQ topic. +func (c *ClickHouse) WriteDLQ(ctx context.Context, recs []*model.DLQRecord) error { + if len(recs) == 0 { + return nil + } + batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_dlq") + if err != nil { + return err + } + for _, r := range recs { + if err := batch.Append( + r.WorkspaceID, r.SourceID, r.MessageID, r.ReceivedAt, + r.Reason, r.Field, r.RawPayload, + ); err != nil { + return err + } + } + return batch.Send() +} + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +// mapToStr converts a map[string]any into the Map(String, String) shape +// ClickHouse expects. Non-string values are JSON-encoded. +func mapToStr(in map[string]any) map[string]string { + if in == nil { + return map[string]string{} + } + out := make(map[string]string, len(in)) + for k, v := range in { + out[k] = anyToStr(v) + } + return out +} + +func anyToStr(v any) string { + switch x := v.(type) { + case nil: + return "" + case string: + return x + case float64: + return strconv.FormatFloat(x, 'f', -1, 64) + case int: + return strconv.Itoa(x) + case int64: + return strconv.FormatInt(x, 10) + case bool: + return strconv.FormatBool(x) + default: + b, _ := json.Marshal(v) + return string(b) + } +} + +func libraryName(ctx map[string]any) string { + if ctx == nil { + return "" + } + if v, ok := ctx["library_name"].(string); ok { + return v + } + return "" +} +func libraryVersion(ctx map[string]any) string { + if ctx == nil { + return "" + } + if v, ok := ctx["library_version"].(string); ok { + return v + } + return "" +} diff --git a/ingestion/console/Dockerfile b/ingestion/console/Dockerfile new file mode 100644 index 0000000..8c947c7 --- /dev/null +++ b/ingestion/console/Dockerfile @@ -0,0 +1,11 @@ +FROM node:20-alpine AS build +WORKDIR /app +COPY package.json ./ +RUN npm install +COPY . . +RUN npm run build + +FROM nginx:1.27-alpine +COPY --from=build /app/dist /usr/share/nginx/html +COPY nginx.conf /etc/nginx/conf.d/default.conf +EXPOSE 3000 diff --git a/ingestion/console/index.html b/ingestion/console/index.html new file mode 100644 index 0000000..9d11ad7 --- /dev/null +++ b/ingestion/console/index.html @@ -0,0 +1,13 @@ + + + + + + + CDP Console + + +
+ + + diff --git a/ingestion/console/nginx.conf b/ingestion/console/nginx.conf new file mode 100644 index 0000000..8acaa27 --- /dev/null +++ b/ingestion/console/nginx.conf @@ -0,0 +1,23 @@ +server { + listen 3000; + root /usr/share/nginx/html; + index index.html; + + location / { + try_files $uri $uri/ /index.html; + } + + # Proxy to backends so the SPA can hit /api/ingest, /api/bulker, /api/rotor. + location /api/ingest/ { + rewrite ^/api/ingest/(.*)$ /$1 break; + proxy_pass http://ingest:3049; + } + location /api/bulker/ { + rewrite ^/api/bulker/(.*)$ /$1 break; + proxy_pass http://bulker:3042; + } + location /api/rotor/ { + rewrite ^/api/rotor/(.*)$ /$1 break; + proxy_pass http://rotor:3401; + } +} diff --git a/ingestion/console/package.json b/ingestion/console/package.json new file mode 100644 index 0000000..9365913 --- /dev/null +++ b/ingestion/console/package.json @@ -0,0 +1,44 @@ +{ + "name": "cdp-console", + "version": "0.1.0", + "private": true, + "type": "module", + "scripts": { + "dev": "vite", + "build": "tsc -b && vite build", + "preview": "vite preview --port 3000", + "lint": "eslint ." + }, + "dependencies": { + "@radix-ui/react-dialog": "^1.1.2", + "@radix-ui/react-dropdown-menu": "^2.1.2", + "@radix-ui/react-label": "^2.1.0", + "@radix-ui/react-slot": "^1.1.0", + "@radix-ui/react-tabs": "^1.1.1", + "@radix-ui/react-toast": "^1.2.2", + "@tanstack/react-query": "^5.59.16", + "class-variance-authority": "^0.7.0", + "clsx": "^2.1.1", + "lucide-react": "^0.451.0", + "react": "^18.3.1", + "react-dom": "^18.3.1", + "react-hook-form": "^7.53.0", + "react-router-dom": "^6.27.0", + "recharts": "^2.13.0", + "tailwind-merge": "^2.5.4", + "zod": "^3.23.8", + "zustand": "^5.0.0" + }, + "devDependencies": { + "@types/node": "^22.7.5", + "@types/react": "^18.3.11", + "@types/react-dom": "^18.3.0", + "@vitejs/plugin-react": "^4.3.2", + "autoprefixer": "^10.4.20", + "eslint": "^9.12.0", + "postcss": "^8.4.47", + "tailwindcss": "^3.4.13", + "typescript": "^5.6.3", + "vite": "^5.4.9" + } +} diff --git a/ingestion/console/postcss.config.js b/ingestion/console/postcss.config.js new file mode 100644 index 0000000..2aa7205 --- /dev/null +++ b/ingestion/console/postcss.config.js @@ -0,0 +1,6 @@ +export default { + plugins: { + tailwindcss: {}, + autoprefixer: {}, + }, +}; diff --git a/ingestion/console/public/favicon.svg b/ingestion/console/public/favicon.svg new file mode 100644 index 0000000..05931dc --- /dev/null +++ b/ingestion/console/public/favicon.svg @@ -0,0 +1 @@ +cdp diff --git a/ingestion/console/src/App.tsx b/ingestion/console/src/App.tsx new file mode 100644 index 0000000..dfa89cf --- /dev/null +++ b/ingestion/console/src/App.tsx @@ -0,0 +1,32 @@ +import { BrowserRouter, Route, Routes } from 'react-router-dom'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { AppShell } from '@/components/AppShell'; +import { DashboardPage } from '@/pages/Dashboard'; +import { SourcesPage } from '@/pages/Sources'; +import { DestinationsPage } from '@/pages/Destinations'; +import { FunctionsPage } from '@/pages/Functions'; +import { LivePage } from '@/pages/Live'; +import { SettingsPage } from '@/pages/Settings'; + +const qc = new QueryClient({ + defaultOptions: { queries: { retry: 1, staleTime: 30_000 } }, +}); + +export function App() { + return ( + + + + }> + } /> + } /> + } /> + } /> + } /> + } /> + + + + + ); +} diff --git a/ingestion/console/src/api/client.ts b/ingestion/console/src/api/client.ts new file mode 100644 index 0000000..0918348 --- /dev/null +++ b/ingestion/console/src/api/client.ts @@ -0,0 +1,90 @@ +// Thin fetch wrapper. Throws on non-2xx with a structured ApiError. + +export class ApiError extends Error { + status: number; + field?: string; + constructor(status: number, message: string, field?: string) { + super(message); + this.status = status; + this.field = field; + } +} + +const INGEST_BASE = import.meta.env.VITE_API_BASE_URL ?? '/api/ingest'; +const ROTOR_BASE = import.meta.env.VITE_ROTOR_BASE_URL ?? '/api/rotor'; +const BULKER_BASE = import.meta.env.VITE_BULKER_BASE_URL ?? '/api/bulker'; + +async function request(base: string, path: string, init?: RequestInit): Promise { + const res = await fetch(`${base}${path}`, { + ...init, + headers: { + 'content-type': 'application/json', + ...(init?.headers ?? {}), + }, + }); + const text = await res.text(); + const data = text ? safeJSON(text) : undefined; + if (!res.ok) { + const msg = (data as { error?: string })?.error ?? res.statusText; + const field = (data as { field?: string })?.field; + throw new ApiError(res.status, msg, field); + } + return data as T; +} + +function safeJSON(text: string): unknown { + try { + return JSON.parse(text); + } catch { + return text; + } +} + +// --------------------------------------------------------------------------- +// Ingest API +// --------------------------------------------------------------------------- + +export const ingest = { + health: () => request<{ status: string }>(INGEST_BASE, '/health'), + ready: () => request<{ status: string }>(INGEST_BASE, '/ready'), + + track: (writeKey: string, body: Record) => + request<{ ok: boolean }>(INGEST_BASE, '/track', { + method: 'POST', + headers: { Authorization: `Bearer ${writeKey}` }, + body: JSON.stringify(body), + }), +}; + +// --------------------------------------------------------------------------- +// Rotor API +// --------------------------------------------------------------------------- + +export interface RunRequest { + code: string; + event: Record; +} +export interface RunResponse { + result: unknown; +} + +export const rotor = { + run: (body: RunRequest) => + request(ROTOR_BASE, '/v1/run', { + method: 'POST', + body: JSON.stringify(body), + }), + upsert: (body: { workspace_id: string; slug: string; code: string }) => + request<{ ok: boolean }>(ROTOR_BASE, '/v1/functions', { + method: 'POST', + body: JSON.stringify(body), + }), +}; + +// --------------------------------------------------------------------------- +// Bulker API +// --------------------------------------------------------------------------- + +export const bulker = { + health: () => request<{ status: string }>(BULKER_BASE, '/health'), +}; diff --git a/ingestion/console/src/components/AppShell.tsx b/ingestion/console/src/components/AppShell.tsx new file mode 100644 index 0000000..ea4edab --- /dev/null +++ b/ingestion/console/src/components/AppShell.tsx @@ -0,0 +1,50 @@ +import { NavLink, Outlet } from 'react-router-dom'; +import { + Activity, BarChart3, Code2, Database, Settings, Workflow, +} from 'lucide-react'; +import { cn } from '@/lib/utils'; + +const nav = [ + { to: '/', label: 'Dashboard', icon: BarChart3 }, + { to: '/sources', label: 'Sources', icon: Workflow }, + { to: '/destinations', label: 'Destinations', icon: Database }, + { to: '/functions', label: 'Functions', icon: Code2 }, + { to: '/live', label: 'Live events', icon: Activity }, + { to: '/settings', label: 'Settings', icon: Settings }, +]; + +export function AppShell() { + return ( +
+ +
+ +
+
+ ); +} diff --git a/ingestion/console/src/components/ui/badge.tsx b/ingestion/console/src/components/ui/badge.tsx new file mode 100644 index 0000000..2737e27 --- /dev/null +++ b/ingestion/console/src/components/ui/badge.tsx @@ -0,0 +1,25 @@ +import * as React from 'react'; +import { cva, type VariantProps } from 'class-variance-authority'; +import { cn } from '@/lib/utils'; + +const badgeVariants = cva( + 'inline-flex items-center rounded-full border px-2.5 py-0.5 text-xs font-semibold transition-colors', + { + variants: { + variant: { + default: 'border-transparent bg-primary text-primary-foreground', + secondary: 'border-transparent bg-muted text-foreground', + destructive: 'border-transparent bg-destructive text-destructive-foreground', + outline: 'text-foreground', + success: 'border-transparent bg-emerald-500 text-white', + }, + }, + defaultVariants: { variant: 'default' }, + }, +); + +export interface BadgeProps extends React.HTMLAttributes, VariantProps {} + +export function Badge({ className, variant, ...props }: BadgeProps) { + return
; +} diff --git a/ingestion/console/src/components/ui/button.tsx b/ingestion/console/src/components/ui/button.tsx new file mode 100644 index 0000000..ed52fb8 --- /dev/null +++ b/ingestion/console/src/components/ui/button.tsx @@ -0,0 +1,48 @@ +import * as React from 'react'; +import { Slot } from '@radix-ui/react-slot'; +import { cva, type VariantProps } from 'class-variance-authority'; +import { cn } from '@/lib/utils'; + +const buttonVariants = cva( + 'inline-flex items-center justify-center whitespace-nowrap rounded-md text-sm font-medium ring-offset-background transition-colors focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 disabled:pointer-events-none disabled:opacity-50', + { + variants: { + variant: { + default: 'bg-primary text-primary-foreground hover:bg-primary/90', + destructive: 'bg-destructive text-destructive-foreground hover:bg-destructive/90', + outline: 'border border-input bg-background hover:bg-accent hover:text-accent-foreground', + ghost: 'hover:bg-accent hover:text-accent-foreground', + link: 'text-primary underline-offset-4 hover:underline', + }, + size: { + default: 'h-10 px-4 py-2', + sm: 'h-9 rounded-md px-3', + lg: 'h-11 rounded-md px-8', + icon: 'h-10 w-10', + }, + }, + defaultVariants: { variant: 'default', size: 'default' }, + }, +); + +export interface ButtonProps + extends React.ButtonHTMLAttributes, + VariantProps { + asChild?: boolean; +} + +export const Button = React.forwardRef( + ({ className, variant, size, asChild = false, ...props }, ref) => { + const Comp = asChild ? Slot : 'button'; + return ( + + ); + }, +); +Button.displayName = 'Button'; + +export { buttonVariants }; diff --git a/ingestion/console/src/components/ui/card.tsx b/ingestion/console/src/components/ui/card.tsx new file mode 100644 index 0000000..4093660 --- /dev/null +++ b/ingestion/console/src/components/ui/card.tsx @@ -0,0 +1,44 @@ +import * as React from 'react'; +import { cn } from '@/lib/utils'; + +export const Card = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +Card.displayName = 'Card'; + +export const CardHeader = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardHeader.displayName = 'CardHeader'; + +export const CardTitle = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardTitle.displayName = 'CardTitle'; + +export const CardDescription = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardDescription.displayName = 'CardDescription'; + +export const CardContent = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardContent.displayName = 'CardContent'; + +export const CardFooter = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardFooter.displayName = 'CardFooter'; diff --git a/ingestion/console/src/components/ui/input.tsx b/ingestion/console/src/components/ui/input.tsx new file mode 100644 index 0000000..0963199 --- /dev/null +++ b/ingestion/console/src/components/ui/input.tsx @@ -0,0 +1,19 @@ +import * as React from 'react'; +import { cn } from '@/lib/utils'; + +export type InputProps = React.InputHTMLAttributes; + +export const Input = React.forwardRef( + ({ className, type, ...props }, ref) => ( + + ), +); +Input.displayName = 'Input'; diff --git a/ingestion/console/src/index.css b/ingestion/console/src/index.css new file mode 100644 index 0000000..84cc7eb --- /dev/null +++ b/ingestion/console/src/index.css @@ -0,0 +1,46 @@ +@tailwind base; +@tailwind components; +@tailwind utilities; + +@layer base { + :root { + --background: 0 0% 100%; + --foreground: 222.2 84% 4.9%; + --card: 0 0% 100%; + --card-foreground: 222.2 84% 4.9%; + --primary: 222.2 47.4% 11.2%; + --primary-foreground: 210 40% 98%; + --muted: 210 40% 96.1%; + --muted-foreground: 215.4 16.3% 46.9%; + --accent: 210 40% 96.1%; + --accent-foreground: 222.2 47.4% 11.2%; + --destructive: 0 84.2% 60.2%; + --destructive-foreground: 210 40% 98%; + --border: 214.3 31.8% 91.4%; + --input: 214.3 31.8% 91.4%; + --ring: 222.2 84% 4.9%; + --radius: 0.5rem; + } + .dark { + --background: 222.2 84% 4.9%; + --foreground: 210 40% 98%; + --card: 222.2 84% 4.9%; + --card-foreground: 210 40% 98%; + --primary: 210 40% 98%; + --primary-foreground: 222.2 47.4% 11.2%; + --muted: 217.2 32.6% 17.5%; + --muted-foreground: 215 20.2% 65.1%; + --accent: 217.2 32.6% 17.5%; + --accent-foreground: 210 40% 98%; + --destructive: 0 62.8% 30.6%; + --destructive-foreground: 210 40% 98%; + --border: 217.2 32.6% 17.5%; + --input: 217.2 32.6% 17.5%; + --ring: 212.7 26.8% 83.9%; + } +} + +@layer base { + * { @apply border-border; } + body { @apply bg-background text-foreground; } +} diff --git a/ingestion/console/src/lib/utils.ts b/ingestion/console/src/lib/utils.ts new file mode 100644 index 0000000..9ad0df4 --- /dev/null +++ b/ingestion/console/src/lib/utils.ts @@ -0,0 +1,6 @@ +import { type ClassValue, clsx } from 'clsx'; +import { twMerge } from 'tailwind-merge'; + +export function cn(...inputs: ClassValue[]) { + return twMerge(clsx(inputs)); +} diff --git a/ingestion/console/src/main.tsx b/ingestion/console/src/main.tsx new file mode 100644 index 0000000..b2bc486 --- /dev/null +++ b/ingestion/console/src/main.tsx @@ -0,0 +1,10 @@ +import React from 'react'; +import ReactDOM from 'react-dom/client'; +import { App } from './App'; +import './index.css'; + +ReactDOM.createRoot(document.getElementById('root')!).render( + + + , +); diff --git a/ingestion/console/src/pages/Dashboard.tsx b/ingestion/console/src/pages/Dashboard.tsx new file mode 100644 index 0000000..67ad9b4 --- /dev/null +++ b/ingestion/console/src/pages/Dashboard.tsx @@ -0,0 +1,81 @@ +import { useQuery } from '@tanstack/react-query'; +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'; +import { Badge } from '@/components/ui/badge'; +import { bulker, ingest } from '@/api/client'; + +export function DashboardPage() { + const ingestHealth = useQuery({ + queryKey: ['health', 'ingest'], + queryFn: ingest.health, + refetchInterval: 5_000, + }); + const bulkerHealth = useQuery({ + queryKey: ['health', 'bulker'], + queryFn: bulker.health, + refetchInterval: 5_000, + }); + + return ( +
+
+

Dashboard

+

Operational status of the ingestion stack.

+
+ +
+ + + +
+ + + + Getting started + Send a test event with the dev write key. + + +
+{`curl -X POST http://localhost:3049/track \\
+  -H 'Authorization: Bearer cdp_dev_writekey_1234567890' \\
+  -H 'Content-Type: application/json' \\
+  -d '{
+    "type": "track",
+    "messageId": "m_${'${'}Date.now()${'}'}",
+    "anonymousId": "anon_1",
+    "event": "Signed Up",
+    "properties": { "plan": "pro" }
+  }'`}
+          
+
+
+
+ ); +} + +function ServiceCard({ name, status, port }: { name: string; status: ServiceStatus; port: number }) { + return ( + + +
+ {name} + +
+ localhost:{port} +
+
+ ); +} + +type ServiceStatus = 'ok' | 'down' | 'unknown'; + +function statusFromQuery(q: { isLoading: boolean; isError: boolean; data?: { status: string } }): ServiceStatus { + if (q.isLoading) return 'unknown'; + if (q.isError) return 'down'; + return q.data?.status === 'ok' ? 'ok' : 'down'; +} + +function StatusBadge({ status }: { status: ServiceStatus }) { + if (status === 'ok') return healthy; + if (status === 'down') return down; + return unknown; +} diff --git a/ingestion/console/src/pages/Destinations.tsx b/ingestion/console/src/pages/Destinations.tsx new file mode 100644 index 0000000..65f8381 --- /dev/null +++ b/ingestion/console/src/pages/Destinations.tsx @@ -0,0 +1,43 @@ +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'; +import { Badge } from '@/components/ui/badge'; +import { Button } from '@/components/ui/button'; +import { Database, Plus } from 'lucide-react'; + +const destinations = [ + { id: '1', name: 'ClickHouse (warehouse)', kind: 'clickhouse', enabled: true }, + { id: '2', name: 'BigQuery (BI)', kind: 'bigquery', enabled: false }, +]; + +export function DestinationsPage() { + return ( +
+
+
+

Destinations

+

Where events end up.

+
+ +
+ +
+ {destinations.map((d) => ( + + +
+
+
+ {d.name} + {d.kind} +
+
+ {d.enabled ? on : off} +
+ +
+ ))} +
+
+ ); +} diff --git a/ingestion/console/src/pages/Functions.tsx b/ingestion/console/src/pages/Functions.tsx new file mode 100644 index 0000000..51115c2 --- /dev/null +++ b/ingestion/console/src/pages/Functions.tsx @@ -0,0 +1,101 @@ +import { useState } from 'react'; +import { useMutation } from '@tanstack/react-query'; +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'; +import { Button } from '@/components/ui/button'; +import { ApiError, rotor } from '@/api/client'; + +const DEFAULT_CODE = `// Transform an event before it's stored. +// Return the (possibly modified) event, null to drop, or an array to fan out. +function transform(event) { + event.properties = event.properties || {}; + event.properties.tagged_at = new Date().toISOString(); + return event; +}`; + +const DEFAULT_EVENT = JSON.stringify({ + workspace_id: 'ws-1', + source_id: 'src-1', + message_id: 'm-1', + type: 'track', + event: 'Signed Up', + properties: { plan: 'pro' }, +}, null, 2); + +export function FunctionsPage() { + const [code, setCode] = useState(DEFAULT_CODE); + const [eventText, setEventText] = useState(DEFAULT_EVENT); + const [output, setOutput] = useState(''); + + const run = useMutation({ + mutationFn: async () => { + let event: Record; + try { + event = JSON.parse(eventText); + } catch (err) { + throw new ApiError(400, `event is not valid JSON: ${(err as Error).message}`); + } + return rotor.run({ code, event }); + }, + onSuccess: (data) => setOutput(JSON.stringify(data.result, null, 2)), + onError: (err: ApiError) => setOutput(`ERROR (${err.status}): ${err.message}`), + }); + + return ( +
+
+

Functions

+

Author and test transformation functions.

+
+ +
+ + + Code + Define transform(event). + + +