From a428170fef31b2940bb724cfaac35c366984f1ff Mon Sep 17 00:00:00 2001 From: renolation Date: Mon, 25 May 2026 08:38:26 +0700 Subject: [PATCH] data layer --- data-layer/.env.example | 38 ++++ data-layer/.gitignore | 41 ++++ data-layer/Makefile | 120 +++++++++++ data-layer/README.md | 91 ++++++++ data-layer/api/Dockerfile | 12 ++ data-layer/api/cmd/server/main.go | 167 +++++++++++++++ data-layer/api/go.mod | 15 ++ data-layer/api/internal/apperr/apperr.go | 75 +++++++ data-layer/api/internal/cache/cache.go | 52 +++++ data-layer/api/internal/config/config.go | 47 +++++ data-layer/api/internal/handler/.gitkeep | 0 .../api/internal/handler/analytics_handler.go | 132 ++++++++++++ data-layer/api/internal/handler/decode.go | 36 ++++ .../api/internal/handler/event_handler.go | 74 +++++++ .../api/internal/handler/profile_handler.go | 85 ++++++++ data-layer/api/internal/handler/render.go | 47 +++++ .../internal/handler/saved_query_handler.go | 125 +++++++++++ .../api/internal/handler/sql_handler.go | 47 +++++ .../api/internal/middleware/middleware.go | 111 ++++++++++ .../api/internal/middleware/workspace.go | 51 +++++ data-layer/api/internal/model/profile.go | 27 +++ data-layer/api/internal/model/query.go | 47 +++++ data-layer/api/internal/repo/.gitkeep | 0 .../api/internal/repo/analytics_repo.go | 167 +++++++++++++++ data-layer/api/internal/repo/chconn.go | 58 ++++++ data-layer/api/internal/repo/event_repo.go | 194 ++++++++++++++++++ data-layer/api/internal/repo/pool.go | 28 +++ data-layer/api/internal/repo/profile_repo.go | 70 +++++++ .../api/internal/repo/saved_query_repo.go | 120 +++++++++++ data-layer/api/internal/service/.gitkeep | 0 .../api/internal/service/profile_service.go | 66 ++++++ .../api/internal/service/query_service.go | 87 ++++++++ .../api/internal/service/sql_service.go | 98 +++++++++ .../api/internal/templates/templates.go | 65 ++++++ data-layer/console/Dockerfile | 11 + data-layer/console/index.html | 13 ++ data-layer/console/nginx.conf | 14 ++ data-layer/console/package.json | 44 ++++ data-layer/console/postcss.config.js | 6 + data-layer/console/public/favicon.svg | 1 + data-layer/console/src/App.tsx | 34 +++ data-layer/console/src/api/client.ts | 122 +++++++++++ .../console/src/components/AppShell.tsx | 51 +++++ .../console/src/components/ui/badge.tsx | 25 +++ .../console/src/components/ui/button.tsx | 48 +++++ data-layer/console/src/components/ui/card.tsx | 44 ++++ .../console/src/components/ui/input.tsx | 19 ++ data-layer/console/src/hooks/.gitkeep | 0 data-layer/console/src/index.css | 46 +++++ data-layer/console/src/lib/utils.ts | 6 + data-layer/console/src/main.tsx | 10 + data-layer/console/src/pages/Explore.tsx | 139 +++++++++++++ data-layer/console/src/pages/Funnels.tsx | 10 + data-layer/console/src/pages/Profiles.tsx | 10 + data-layer/console/src/pages/Retention.tsx | 10 + data-layer/console/src/pages/SQL.tsx | 85 ++++++++ data-layer/console/src/pages/Segments.tsx | 10 + data-layer/console/src/pages/Traits.tsx | 10 + data-layer/console/src/stores/workspace.ts | 15 ++ data-layer/console/tailwind.config.ts | 50 +++++ data-layer/console/tsconfig.json | 22 ++ data-layer/console/vite.config.ts | 22 ++ data-layer/infra/clickhouse/.gitkeep | 0 .../infra/clickhouse/event_explorer.sql.tmpl | 60 ++++++ .../infra/clickhouse/funnel_analysis.sql.tmpl | 35 ++++ .../clickhouse/profile_timeline.sql.tmpl | 57 +++++ .../clickhouse/retention_cohort.sql.tmpl | 41 ++++ .../clickhouse/session_analysis.sql.tmpl | 52 +++++ data-layer/infra/docker/.gitkeep | 0 .../infra/migrations/000001_init.down.sql | 5 + .../infra/migrations/000001_init.up.sql | 95 +++++++++ data-layer/infra/scripts/clickhouse_apply.sh | 67 ++++++ data-layer/workers/Dockerfile | 12 ++ data-layer/workers/cmd/worker/main.go | 148 +++++++++++++ data-layer/workers/cmd/worker/slog_adapter.go | 14 ++ data-layer/workers/go.mod | 15 ++ data-layer/workers/internal/apperr/apperr.go | 33 +++ data-layer/workers/internal/config/config.go | 37 ++++ data-layer/workers/internal/handler/.gitkeep | 0 data-layer/workers/internal/job/.gitkeep | 0 data-layer/workers/internal/repo/.gitkeep | 0 81 files changed, 3941 insertions(+) create mode 100644 data-layer/.env.example create mode 100644 data-layer/.gitignore create mode 100644 data-layer/Makefile create mode 100644 data-layer/README.md create mode 100644 data-layer/api/Dockerfile create mode 100644 data-layer/api/cmd/server/main.go create mode 100644 data-layer/api/go.mod create mode 100644 data-layer/api/internal/apperr/apperr.go create mode 100644 data-layer/api/internal/cache/cache.go create mode 100644 data-layer/api/internal/config/config.go create mode 100644 data-layer/api/internal/handler/.gitkeep create mode 100644 data-layer/api/internal/handler/analytics_handler.go create mode 100644 data-layer/api/internal/handler/decode.go create mode 100644 data-layer/api/internal/handler/event_handler.go create mode 100644 data-layer/api/internal/handler/profile_handler.go create mode 100644 data-layer/api/internal/handler/render.go create mode 100644 data-layer/api/internal/handler/saved_query_handler.go create mode 100644 data-layer/api/internal/handler/sql_handler.go create mode 100644 data-layer/api/internal/middleware/middleware.go create mode 100644 data-layer/api/internal/middleware/workspace.go create mode 100644 data-layer/api/internal/model/profile.go create mode 100644 data-layer/api/internal/model/query.go create mode 100644 data-layer/api/internal/repo/.gitkeep create mode 100644 data-layer/api/internal/repo/analytics_repo.go create mode 100644 data-layer/api/internal/repo/chconn.go create mode 100644 data-layer/api/internal/repo/event_repo.go create mode 100644 data-layer/api/internal/repo/pool.go create mode 100644 data-layer/api/internal/repo/profile_repo.go create mode 100644 data-layer/api/internal/repo/saved_query_repo.go create mode 100644 data-layer/api/internal/service/.gitkeep create mode 100644 data-layer/api/internal/service/profile_service.go create mode 100644 data-layer/api/internal/service/query_service.go create mode 100644 data-layer/api/internal/service/sql_service.go create mode 100644 data-layer/api/internal/templates/templates.go create mode 100644 data-layer/console/Dockerfile create mode 100644 data-layer/console/index.html create mode 100644 data-layer/console/nginx.conf create mode 100644 data-layer/console/package.json create mode 100644 data-layer/console/postcss.config.js create mode 100644 data-layer/console/public/favicon.svg create mode 100644 data-layer/console/src/App.tsx create mode 100644 data-layer/console/src/api/client.ts create mode 100644 data-layer/console/src/components/AppShell.tsx create mode 100644 data-layer/console/src/components/ui/badge.tsx create mode 100644 data-layer/console/src/components/ui/button.tsx create mode 100644 data-layer/console/src/components/ui/card.tsx create mode 100644 data-layer/console/src/components/ui/input.tsx create mode 100644 data-layer/console/src/hooks/.gitkeep create mode 100644 data-layer/console/src/index.css create mode 100644 data-layer/console/src/lib/utils.ts create mode 100644 data-layer/console/src/main.tsx create mode 100644 data-layer/console/src/pages/Explore.tsx create mode 100644 data-layer/console/src/pages/Funnels.tsx create mode 100644 data-layer/console/src/pages/Profiles.tsx create mode 100644 data-layer/console/src/pages/Retention.tsx create mode 100644 data-layer/console/src/pages/SQL.tsx create mode 100644 data-layer/console/src/pages/Segments.tsx create mode 100644 data-layer/console/src/pages/Traits.tsx create mode 100644 data-layer/console/src/stores/workspace.ts create mode 100644 data-layer/console/tailwind.config.ts create mode 100644 data-layer/console/tsconfig.json create mode 100644 data-layer/console/vite.config.ts create mode 100644 data-layer/infra/clickhouse/.gitkeep create mode 100644 data-layer/infra/clickhouse/event_explorer.sql.tmpl create mode 100644 data-layer/infra/clickhouse/funnel_analysis.sql.tmpl create mode 100644 data-layer/infra/clickhouse/profile_timeline.sql.tmpl create mode 100644 data-layer/infra/clickhouse/retention_cohort.sql.tmpl create mode 100644 data-layer/infra/clickhouse/session_analysis.sql.tmpl create mode 100644 data-layer/infra/docker/.gitkeep create mode 100644 data-layer/infra/migrations/000001_init.down.sql create mode 100644 data-layer/infra/migrations/000001_init.up.sql create mode 100755 data-layer/infra/scripts/clickhouse_apply.sh create mode 100644 data-layer/workers/Dockerfile create mode 100644 data-layer/workers/cmd/worker/main.go create mode 100644 data-layer/workers/cmd/worker/slog_adapter.go create mode 100644 data-layer/workers/go.mod create mode 100644 data-layer/workers/internal/apperr/apperr.go create mode 100644 data-layer/workers/internal/config/config.go create mode 100644 data-layer/workers/internal/handler/.gitkeep create mode 100644 data-layer/workers/internal/job/.gitkeep create mode 100644 data-layer/workers/internal/repo/.gitkeep diff --git a/data-layer/.env.example b/data-layer/.env.example new file mode 100644 index 0000000..2439507 --- /dev/null +++ b/data-layer/.env.example @@ -0,0 +1,38 @@ +# --------------------------------------------------------------------------- +# Shared infrastructure (matches cdp-ingestion) +# --------------------------------------------------------------------------- +POSTGRES_DSN=postgres://cdp:cdp@localhost:5432/cdp?sslmode=disable +REDIS_ADDR=localhost:6379 +CLICKHOUSE_ADDR=localhost:9000 +CLICKHOUSE_DB=cdp +CLICKHOUSE_USER=default +CLICKHOUSE_PASSWORD= + +# --------------------------------------------------------------------------- +# Analytics API +# --------------------------------------------------------------------------- +ANALYTICS_HTTP_ADDR=:4000 +ANALYTICS_LOG_LEVEL=info +ANALYTICS_SHUTDOWN_TIMEOUT_SECONDS=30s +ANALYTICS_CACHE_TTL_QUERY_SECONDS=60s +ANALYTICS_CACHE_TTL_PROFILE_SECONDS=30s +ANALYTICS_CH_TEMPLATES_DIR=../infra/clickhouse + +# Custom SQL ClickHouse user — must have SELECT-only grants +ANALYTICS_CH_SQL_USER=analytics_ro +ANALYTICS_CH_SQL_PASSWORD= + +# --------------------------------------------------------------------------- +# Analytics Worker (river) +# --------------------------------------------------------------------------- +WORKER_HTTP_ADDR=:4001 +WORKER_LOG_LEVEL=info +WORKER_SHUTDOWN_TIMEOUT_SECONDS=60s +WORKER_MAX_WORKERS=50 +WORKER_COMPUTE_TRAITS_EVERY=1h +WORKER_REFRESH_SEGMENT_EVERY=1h + +# --------------------------------------------------------------------------- +# Analytics Console (Vite) +# --------------------------------------------------------------------------- +VITE_ANALYTICS_BASE_URL=http://localhost:4000 diff --git a/data-layer/.gitignore b/data-layer/.gitignore new file mode 100644 index 0000000..6f704ab --- /dev/null +++ b/data-layer/.gitignore @@ -0,0 +1,41 @@ +# Binaries +bin/ +*.exe +*.test +*.out + +# Go workspace +go.work +go.work.sum + +# Node +node_modules/ +dist/ +build/ +.next/ +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +pnpm-debug.log* + +# Env +.env +.env.local +.env.*.local + +# IDE +.vscode/ +.idea/ +*.swp +.DS_Store + +# Coverage +coverage/ +*.cover +*.coverage +coverage.out + +# Console build output +console/dist/ +console/.vite/ diff --git a/data-layer/Makefile b/data-layer/Makefile new file mode 100644 index 0000000..2be08dd --- /dev/null +++ b/data-layer/Makefile @@ -0,0 +1,120 @@ +.PHONY: help \ + migrate/new migrate/up migrate/down migrate/status \ + clickhouse/up clickhouse/down \ + run/api run/workers run/console \ + build/api build/workers \ + test test/integration \ + lint fmt tidy + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +POSTGRES_DSN ?= postgres://cdp:cdp@localhost:5432/cdp?sslmode=disable +CLICKHOUSE_DSN ?= clickhouse://default:@localhost:9000/cdp +MIGRATE_BIN ?= migrate +MIGRATIONS_DIR := infra/migrations +CLICKHOUSE_DIR := infra/clickhouse + +# --------------------------------------------------------------------------- +# Help +# --------------------------------------------------------------------------- + +help: + @echo "CDP Analytics (data-layer) - common tasks" + @echo "" + @echo " make migrate/new name=X create new PG migration" + @echo " make migrate/up apply PG migrations" + @echo " make migrate/down rollback one" + @echo " make migrate/status migration status" + @echo "" + @echo " make clickhouse/up apply analytics ClickHouse DDL" + @echo " make clickhouse/down drop analytics ClickHouse schema" + @echo "" + @echo " make run/api run analytics API (port 4000)" + @echo " make run/workers run analytics worker (port 4001)" + @echo " make run/console run analytics console (port 4002)" + @echo "" + @echo " make test unit tests" + @echo " make test/integration integration tests (testcontainers)" + @echo "" + @echo " Shared infra (postgres, redis, clickhouse) lives in ../ingestion." + @echo " Run 'make up' from there first." + +# --------------------------------------------------------------------------- +# PostgreSQL migrations +# --------------------------------------------------------------------------- + +migrate/new: + @if [ -z "$(name)" ]; then echo "usage: make migrate/new name=add_xxx"; exit 1; fi + $(MIGRATE_BIN) create -ext sql -dir $(MIGRATIONS_DIR) -seq $(name) + +migrate/up: + $(MIGRATE_BIN) -path $(MIGRATIONS_DIR) -database "$(POSTGRES_DSN)" up + +migrate/down: + $(MIGRATE_BIN) -path $(MIGRATIONS_DIR) -database "$(POSTGRES_DSN)" down 1 + +migrate/status: + $(MIGRATE_BIN) -path $(MIGRATIONS_DIR) -database "$(POSTGRES_DSN)" version + +# --------------------------------------------------------------------------- +# ClickHouse DDL +# --------------------------------------------------------------------------- + +clickhouse/up: + @bash infra/scripts/clickhouse_apply.sh up + +clickhouse/down: + @bash infra/scripts/clickhouse_apply.sh down + +# --------------------------------------------------------------------------- +# Run services +# --------------------------------------------------------------------------- + +run/api: + cd api && go run ./cmd/server + +run/workers: + cd workers && go run ./cmd/worker + +run/console: + cd console && npm run dev + +# --------------------------------------------------------------------------- +# Build +# --------------------------------------------------------------------------- + +build/api: + cd api && CGO_ENABLED=0 go build -o ../bin/api ./cmd/server + +build/workers: + cd workers && CGO_ENABLED=0 go build -o ../bin/worker ./cmd/worker + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +test: + cd api && go test ./... -count=1 + cd workers && go test ./... -count=1 + +test/integration: + cd api && go test -tags=integration ./... -count=1 -timeout=5m + cd workers && go test -tags=integration ./... -count=1 -timeout=5m + +# --------------------------------------------------------------------------- +# Code quality +# --------------------------------------------------------------------------- + +lint: + cd api && golangci-lint run ./... + cd workers && golangci-lint run ./... + +fmt: + cd api && gofmt -w . + cd workers && gofmt -w . + +tidy: + cd api && go mod tidy + cd workers && go mod tidy diff --git a/data-layer/README.md b/data-layer/README.md new file mode 100644 index 0000000..3f7b5a5 --- /dev/null +++ b/data-layer/README.md @@ -0,0 +1,91 @@ +# CDP Analytics (data-layer) + +Read-side of the self-hosted CDP platform. Queries events written by +`cdp-ingestion`, computes traits and segments, and activates segments +to external tools. + +## Services + +| Service | Lang | Port | Role | +|-----------|-------------|------|------| +| `api` | Go | 4000 | Query API, Profile API, Custom SQL sandbox | +| `workers` | Go (river) | 4001 | Computed Traits, Segment refresh, Reverse ETL | +| `console` | React + Vite| 4002 | Analytics UI | + +## Quick start + +Shared infra (Postgres / Redis / ClickHouse) is brought up by the ingestion +repo. Start it there first: + +```bash +cd ../ingestion && make up +``` + +Then in this directory: + +```bash +make migrate/up # apply analytics PostgreSQL migrations +make clickhouse/up # apply analytics ClickHouse DDL (if any) + +# First time only: +(cd api && go mod tidy) +(cd workers && go mod tidy) +(cd console && npm install) + +make run/api # start API on :4000 +make run/workers # start worker on :4001 +make run/console # start console on :4002 +``` + +## Endpoints (shipped) + +All endpoints below require an `X-Workspace-Id` header (UUID). Workspace +membership / auth is a TODO; the header is the only authority for now. + +| Method | Path | Priority | Description | +|--------|-------------------------------|----------|-------------| +| GET | `/health` | - | Liveness | +| GET | `/ready` | - | Readiness | +| POST | `/query/events` | P0 | Filter raw events on one of `events_track/identify/page/group` | +| POST | `/query/sql` | P0 | Arbitrary `SELECT` on ClickHouse (read-only user) | +| GET | `/profiles/{id}` | P0 | Unified profile lookup | +| GET | `/profiles/{id}/events` | P0 | Merged event timeline for the profile's `user_id` | +| POST | `/queries` | P0 | Create saved query | +| GET | `/queries` | P0 | List saved queries | +| GET | `/queries/{id}` | P0 | Get saved query | +| PUT | `/queries/{id}` | P0 | Update saved query | +| DELETE | `/queries/{id}` | P0 | Delete saved query | +| POST | `/query/funnel` | P1 | Windowed funnel via ClickHouse `windowFunnel()` | +| POST | `/query/retention` | P1 | Cohort retention via ClickHouse `retention()` | +| POST | `/query/session` | P1 | Session bucketing with inactivity timeout | + +Cache: 60s default for query results, 30s for profile lookups. Per-query +TTLs configurable via `ANALYTICS_CACHE_TTL_*_SECONDS`. Custom SQL is never +cached. + +## Console pages (shipped) + +- **Explore** — wired to `/query/events` +- **Custom SQL** — wired to `/query/sql` +- Profiles / Funnels / Retention / Segments / Traits — placeholders + +## Testing + +```bash +make test # unit tests (no containers) +make test/integration # repo-layer integration tests (testcontainers) +``` + +## Caveats + +- The `profiles` table is **read-only contract from cdp-ingestion**; it does + not exist yet in the ingestion migrations. `repo/profile_repo.go` assumes + `profiles(id, workspace_id, user_id, anonymous_ids, traits, first_seen_at, + last_seen_at)` — align before shipping. +- `/query/sql` ideally runs against a `analytics_ro` ClickHouse user with + `SELECT`-only grants. If that account does not exist the server falls back + to the main connection and logs a warning — fix before production. +- Auth: every request must supply `X-Workspace-Id`. Wire the console's + workspace store to a real session/JWT once the auth scheme is decided. + +See [CLAUDE_analytics.md](./CLAUDE_analytics.md) for the full design contract. diff --git a/data-layer/api/Dockerfile b/data-layer/api/Dockerfile new file mode 100644 index 0000000..b77f662 --- /dev/null +++ b/data-layer/api/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.22-alpine AS build +WORKDIR /src +COPY go.mod go.sum* ./ +RUN go mod download || true +COPY . . +RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/api ./cmd/server + +FROM gcr.io/distroless/static-debian12:nonroot +COPY --from=build /out/api /api +EXPOSE 4000 +USER nonroot:nonroot +ENTRYPOINT ["/api"] diff --git a/data-layer/api/cmd/server/main.go b/data-layer/api/cmd/server/main.go new file mode 100644 index 0000000..d107a46 --- /dev/null +++ b/data-layer/api/cmd/server/main.go @@ -0,0 +1,167 @@ +// Command server runs the CDP analytics HTTP API. +package main + +import ( + "context" + "errors" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/go-chi/chi/v5" + "github.com/redis/rueidis" + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/cache" + "github.com/dbiz/cdp/data-layer/api/internal/config" + "github.com/dbiz/cdp/data-layer/api/internal/handler" + mw "github.com/dbiz/cdp/data-layer/api/internal/middleware" + "github.com/dbiz/cdp/data-layer/api/internal/repo" + "github.com/dbiz/cdp/data-layer/api/internal/service" + "github.com/dbiz/cdp/data-layer/api/internal/templates" +) + +func main() { + if err := run(); err != nil { + log.Fatal(err) + } +} + +func run() error { + cfg, err := config.Load() + if err != nil { + return err + } + + logger, err := newLogger(cfg.LogLevel) + if err != nil { + return err + } + defer func() { _ = logger.Sync() }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // ---- infra clients ---------------------------------------------------- + pg, err := repo.NewPool(ctx, cfg.PostgresDSN) + if err != nil { + return err + } + defer pg.Close() + + redisClient, err := rueidis.NewClient(rueidis.ClientOption{ + InitAddress: []string{cfg.RedisAddr}, + }) + if err != nil { + return err + } + defer redisClient.Close() + + chMain, err := repo.NewClickHouse(ctx, cfg.ClickHouseAddr, cfg.ClickHouseDB, cfg.ClickHouseUser, cfg.ClickHousePassword) + if err != nil { + return err + } + defer func() { _ = chMain.Close() }() + + chRO, err := repo.NewClickHouseReadOnly(ctx, cfg.ClickHouseAddr, cfg.ClickHouseDB, cfg.ClickHouseSQLUser, cfg.ClickHouseSQLPassword) + if err != nil { + // Read-only user might not be provisioned in dev. Log + fall back to + // the main connection so /query/sql still works locally; production + // must provide separate credentials. + logger.Warn("read-only clickhouse user unavailable; /query/sql will use the main connection (dev only)", + zap.Error(err)) + chRO = chMain + } else { + defer func() { _ = chRO.Close() }() + } + + // ---- shared singletons ------------------------------------------------ + tpl := templates.New(cfg.ClickHouseTemplatesDir) + c := cache.New(redisClient) + + eventRepo := repo.NewEventRepo(chMain, tpl) + analyticsRepo := repo.NewAnalyticsRepo(chMain, tpl) + profileRepo := repo.NewProfileRepo(pg) + savedRepo := repo.NewSavedQueryRepo(pg) + + querySvc := service.NewQueryService(eventRepo, analyticsRepo, c, cfg.CacheTTLQuery, logger) + sqlSvc := service.NewSQLService(chRO, logger) + profileSvc := service.NewProfileService(profileRepo, eventRepo, c, cfg.CacheTTLProfile, logger) + + eventH := handler.NewEventHandler(querySvc, logger) + sqlH := handler.NewSQLHandler(sqlSvc, logger) + profileH := handler.NewProfileHandler(profileSvc, logger) + analyticsH := handler.NewAnalyticsHandler(querySvc, logger) + savedH := handler.NewSavedQueryHandler(savedRepo, logger) + + // ---- HTTP router ------------------------------------------------------ + r := chi.NewRouter() + r.Use(mw.RequestID) + r.Use(mw.Recover(logger)) + r.Use(mw.Logger(logger)) + r.Use(mw.CORS) + + r.Get("/health", eventH.Health) + r.Get("/ready", eventH.Ready) + + r.Group(func(rr chi.Router) { + rr.Use(mw.Workspace) + + rr.Post("/query/events", eventH.QueryEvents) + rr.Post("/query/sql", sqlH.CustomSQL) + rr.Post("/query/funnel", analyticsH.Funnel) + rr.Post("/query/retention", analyticsH.Retention) + rr.Post("/query/session", analyticsH.Session) + + rr.Get("/profiles/{id}", profileH.Get) + rr.Get("/profiles/{id}/events", profileH.Timeline) + + rr.Post("/queries", savedH.Create) + rr.Get("/queries", savedH.List) + rr.Get("/queries/{id}", savedH.Get) + rr.Put("/queries/{id}", savedH.Update) + rr.Delete("/queries/{id}", savedH.Delete) + }) + + srv := &http.Server{ + Addr: cfg.HTTPAddr, + Handler: r, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 60 * time.Second, + WriteTimeout: 60 * time.Second, + IdleTimeout: 120 * time.Second, + } + + // ---- graceful shutdown ------------------------------------------------ + shutdownErr := make(chan error, 1) + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + logger.Info("shutdown signal received; draining...") + shutCtx, cancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout) + defer cancel() + shutdownErr <- srv.Shutdown(shutCtx) + }() + + logger.Info("analytics api listening", zap.String("addr", cfg.HTTPAddr)) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return <-shutdownErr +} + +func newLogger(level string) (*zap.Logger, error) { + lvl, err := zap.ParseAtomicLevel(level) + if err != nil { + lvl = zap.NewAtomicLevelAt(zap.InfoLevel) + } + cfg := zap.NewProductionConfig() + cfg.Level = lvl + cfg.EncoderConfig.TimeKey = "ts" + cfg.EncoderConfig.MessageKey = "msg" + return cfg.Build() +} diff --git a/data-layer/api/go.mod b/data-layer/api/go.mod new file mode 100644 index 0000000..bbd50d8 --- /dev/null +++ b/data-layer/api/go.mod @@ -0,0 +1,15 @@ +module github.com/dbiz/cdp/data-layer/api + +go 1.22 + +require ( + github.com/ClickHouse/clickhouse-go/v2 v2.30.0 + github.com/caarlos0/env/v11 v11.2.2 + github.com/go-chi/chi/v5 v5.1.0 + github.com/go-playground/validator/v10 v10.22.1 + github.com/google/uuid v1.6.0 + github.com/jackc/pgx/v5 v5.6.0 + github.com/redis/rueidis v1.0.45 + github.com/stretchr/testify v1.9.0 + go.uber.org/zap v1.27.0 +) diff --git a/data-layer/api/internal/apperr/apperr.go b/data-layer/api/internal/apperr/apperr.go new file mode 100644 index 0000000..1a1a0ba --- /dev/null +++ b/data-layer/api/internal/apperr/apperr.go @@ -0,0 +1,75 @@ +// Package apperr defines AppError, the single error type returned by every +// service/repo function. Handlers translate AppError into HTTP responses. +package apperr + +import ( + "errors" + "fmt" + "net/http" +) + +type AppError struct { + Code int // HTTP status to return + Message string // user-facing message (safe to expose) + Field string // optional: which field caused the error + RetryAfter int // seconds, for 429 + Err error // original error for logging (never exposed) +} + +func (e *AppError) Error() string { + if e.Err != nil { + return fmt.Sprintf("%s: %v", e.Message, e.Err) + } + return e.Message +} + +func (e *AppError) Unwrap() error { return e.Err } + +// As reports whether err is or wraps an *AppError. +func As(err error) (*AppError, bool) { + var ae *AppError + if errors.As(err, &ae) { + return ae, true + } + return nil, false +} + +// --------------------------------------------------------------------------- +// Constructors +// --------------------------------------------------------------------------- + +func BadRequest(msg, field string, err error) *AppError { + return &AppError{Code: http.StatusBadRequest, Message: msg, Field: field, Err: err} +} + +func Unauthorized(msg string) *AppError { + return &AppError{Code: http.StatusUnauthorized, Message: msg} +} + +func Forbidden(msg string) *AppError { + return &AppError{Code: http.StatusForbidden, Message: msg} +} + +func NotFound(msg string) *AppError { + return &AppError{Code: http.StatusNotFound, Message: msg} +} + +func Conflict(msg string, err error) *AppError { + return &AppError{Code: http.StatusConflict, Message: msg, Err: err} +} + +func UnprocessableEntity(msg string) *AppError { + return &AppError{Code: http.StatusUnprocessableEntity, Message: msg} +} + +func TooManyRequests(retryAfterSeconds int) *AppError { + return &AppError{ + Code: http.StatusTooManyRequests, + Message: "rate limit exceeded", + RetryAfter: retryAfterSeconds, + } +} + +func Internal(err error) *AppError { + return &AppError{Code: http.StatusInternalServerError, Message: "internal server error", Err: err} +} diff --git a/data-layer/api/internal/cache/cache.go b/data-layer/api/internal/cache/cache.go new file mode 100644 index 0000000..caad024 --- /dev/null +++ b/data-layer/api/internal/cache/cache.go @@ -0,0 +1,52 @@ +// Package cache wraps rueidis with the semantic-key convention used by the +// analytics service. Keys follow cache::: +// so a workspace can be invalidated without scanning unrelated entries. +package cache + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/redis/rueidis" +) + +type Cache struct { + client rueidis.Client +} + +func New(client rueidis.Client) *Cache { return &Cache{client: client} } + +// Key builds a deterministic cache key for the given (kind, workspace, params). +// Params must JSON-serialize stably -- use a struct or a sorted map. +func Key(kind, workspaceID string, params any) (string, error) { + raw, err := json.Marshal(params) + if err != nil { + return "", fmt.Errorf("cache key marshal: %w", err) + } + sum := sha256.Sum256(raw) + return fmt.Sprintf("cache:%s:%s:%s", kind, workspaceID, hex.EncodeToString(sum[:16])), nil +} + +// Get returns (value, true) on hit and (nil, false) on miss. Any redis error +// is treated as a miss -- the caller falls through to the underlying source. +func (c *Cache) Get(ctx context.Context, key string) ([]byte, bool) { + res := c.client.Do(ctx, c.client.B().Get().Key(key).Build()) + b, err := res.AsBytes() + if err != nil { + if errors.Is(err, rueidis.Nil) { + return nil, false + } + return nil, false + } + return b, true +} + +// Set writes the value with a TTL. +func (c *Cache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return c.client.Do(ctx, c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).Ex(ttl).Build()).Error() +} diff --git a/data-layer/api/internal/config/config.go b/data-layer/api/internal/config/config.go new file mode 100644 index 0000000..ac4da75 --- /dev/null +++ b/data-layer/api/internal/config/config.go @@ -0,0 +1,47 @@ +// Package config loads runtime configuration from environment variables. +// +// Vars prefixed with ANALYTICS_ are owned by this service; un-prefixed ones +// (POSTGRES_DSN, REDIS_ADDR, CLICKHOUSE_*) are shared with cdp-ingestion. +package config + +import ( + "fmt" + "time" + + "github.com/caarlos0/env/v11" +) + +type Config struct { + HTTPAddr string `env:"ANALYTICS_HTTP_ADDR" envDefault:":4000"` + LogLevel string `env:"ANALYTICS_LOG_LEVEL" envDefault:"info"` + ShutdownTimeout time.Duration `env:"ANALYTICS_SHUTDOWN_TIMEOUT_SECONDS" envDefault:"30s"` + + // Cache TTLs — configurable per query type. + CacheTTLQuery time.Duration `env:"ANALYTICS_CACHE_TTL_QUERY_SECONDS" envDefault:"60s"` + CacheTTLProfile time.Duration `env:"ANALYTICS_CACHE_TTL_PROFILE_SECONDS" envDefault:"30s"` + + // Where ClickHouse SQL templates live on disk. Resolved relative to the + // process working directory; default matches `cd api && go run ./cmd/server`. + ClickHouseTemplatesDir string `env:"ANALYTICS_CH_TEMPLATES_DIR" envDefault:"../infra/clickhouse"` + + // Custom SQL ClickHouse credentials — separate read-only user. + ClickHouseSQLUser string `env:"ANALYTICS_CH_SQL_USER" envDefault:"analytics_ro"` + ClickHouseSQLPassword string `env:"ANALYTICS_CH_SQL_PASSWORD"` + + // Shared infra ---------------------------------------------------------- + PostgresDSN string `env:"POSTGRES_DSN,required"` + RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"` + + ClickHouseAddr string `env:"CLICKHOUSE_ADDR" envDefault:"localhost:9000"` + ClickHouseDB string `env:"CLICKHOUSE_DB" envDefault:"cdp"` + ClickHouseUser string `env:"CLICKHOUSE_USER" envDefault:"default"` + ClickHousePassword string `env:"CLICKHOUSE_PASSWORD"` +} + +func Load() (*Config, error) { + cfg := &Config{} + if err := env.Parse(cfg); err != nil { + return nil, fmt.Errorf("config load: %w", err) + } + return cfg, nil +} diff --git a/data-layer/api/internal/handler/.gitkeep b/data-layer/api/internal/handler/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/data-layer/api/internal/handler/analytics_handler.go b/data-layer/api/internal/handler/analytics_handler.go new file mode 100644 index 0000000..7a31395 --- /dev/null +++ b/data-layer/api/internal/handler/analytics_handler.go @@ -0,0 +1,132 @@ +package handler + +import ( + "net/http" + "time" + + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/middleware" + "github.com/dbiz/cdp/data-layer/api/internal/repo" + "github.com/dbiz/cdp/data-layer/api/internal/service" +) + +type AnalyticsHandler struct { + svc *service.QueryService + log *zap.Logger +} + +func NewAnalyticsHandler(svc *service.QueryService, log *zap.Logger) *AnalyticsHandler { + return &AnalyticsHandler{svc: svc, log: log} +} + +// --------------------------------------------------------------------------- +// Funnel +// --------------------------------------------------------------------------- + +type funnelRequest struct { + Steps []string `json:"steps" validate:"required,min=2,max=10,dive,min=1"` + From *time.Time `json:"from" validate:"required"` + To *time.Time `json:"to" validate:"required,gtfield=From"` + WindowSeconds uint32 `json:"window_seconds" validate:"required,min=1,max=2592000"` // up to 30d +} + +func (h *AnalyticsHandler) Funnel(w http.ResponseWriter, r *http.Request) { + var req funnelRequest + if err := decodeAndValidate(r, &req); err != nil { + writeError(w, err) + return + } + ws := middleware.WorkspaceFromCtx(r.Context()) + res, err := h.svc.Funnel(r.Context(), repo.FunnelQuery{ + WorkspaceID: ws, + Steps: req.Steps, + From: *req.From, + To: *req.To, + WindowSeconds: req.WindowSeconds, + }) + if err != nil { + h.log.Error("funnel", zap.String("workspace_id", ws), zap.Error(err)) + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, res) +} + +// --------------------------------------------------------------------------- +// Retention +// --------------------------------------------------------------------------- + +type retentionRequest struct { + InitialEvent string `json:"initial_event" validate:"required,min=1"` + ReturnEvent string `json:"return_event" validate:"required,min=1"` + From *time.Time `json:"from" validate:"required"` + To *time.Time `json:"to" validate:"required,gtfield=From"` + Periods int `json:"periods" validate:"omitempty,min=1,max=90"` +} + +func (h *AnalyticsHandler) Retention(w http.ResponseWriter, r *http.Request) { + var req retentionRequest + if err := decodeAndValidate(r, &req); err != nil { + writeError(w, err) + return + } + ws := middleware.WorkspaceFromCtx(r.Context()) + res, err := h.svc.Retention(r.Context(), repo.RetentionQuery{ + WorkspaceID: ws, + InitialEvent: req.InitialEvent, + ReturnEvent: req.ReturnEvent, + From: *req.From, + To: *req.To, + Periods: req.Periods, + }) + if err != nil { + h.log.Error("retention", zap.String("workspace_id", ws), zap.Error(err)) + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, res) +} + +// --------------------------------------------------------------------------- +// Session +// --------------------------------------------------------------------------- + +type sessionRequest struct { + From *time.Time `json:"from" validate:"required"` + To *time.Time `json:"to" validate:"required,gtfield=From"` + TimeoutSeconds uint32 `json:"timeout_seconds" validate:"omitempty,min=60,max=86400"` + UserID string `json:"user_id"` + Limit int `json:"limit" validate:"omitempty,min=1,max=1000"` + Offset int `json:"offset" validate:"omitempty,min=0"` +} + +func (h *AnalyticsHandler) Session(w http.ResponseWriter, r *http.Request) { + var req sessionRequest + if err := decodeAndValidate(r, &req); err != nil { + writeError(w, err) + return + } + if req.TimeoutSeconds == 0 { + req.TimeoutSeconds = 30 * 60 + } + if req.Limit == 0 { + req.Limit = 100 + } + ws := middleware.WorkspaceFromCtx(r.Context()) + res, err := h.svc.Sessions(r.Context(), repo.SessionQuery{ + WorkspaceID: ws, + UserID: req.UserID, + From: *req.From, + To: *req.To, + TimeoutSeconds: req.TimeoutSeconds, + Limit: req.Limit, + Offset: req.Offset, + }) + if err != nil { + h.log.Error("session", zap.String("workspace_id", ws), zap.Error(err)) + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, res) +} diff --git a/data-layer/api/internal/handler/decode.go b/data-layer/api/internal/handler/decode.go new file mode 100644 index 0000000..a643f79 --- /dev/null +++ b/data-layer/api/internal/handler/decode.go @@ -0,0 +1,36 @@ +package handler + +import ( + "encoding/json" + "errors" + "io" + "net/http" + + "github.com/go-playground/validator/v10" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" +) + +var validate = validator.New(validator.WithRequiredStructEnabled()) + +// decodeAndValidate reads JSON into `dst`, then runs validator tags. Returns +// a wrapped AppError so handlers can pass it straight to writeError. +func decodeAndValidate(r *http.Request, dst any) error { + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + if err := dec.Decode(dst); err != nil { + if errors.Is(err, io.EOF) { + return apperr.BadRequest("request body is empty", "", err) + } + return apperr.BadRequest("invalid JSON: "+err.Error(), "", err) + } + if err := validate.Struct(dst); err != nil { + var verrs validator.ValidationErrors + if errors.As(err, &verrs) && len(verrs) > 0 { + ve := verrs[0] + return apperr.BadRequest("validation failed on "+ve.Field()+": "+ve.Tag(), ve.Field(), err) + } + return apperr.BadRequest("validation failed: "+err.Error(), "", err) + } + return nil +} diff --git a/data-layer/api/internal/handler/event_handler.go b/data-layer/api/internal/handler/event_handler.go new file mode 100644 index 0000000..dc8fc77 --- /dev/null +++ b/data-layer/api/internal/handler/event_handler.go @@ -0,0 +1,74 @@ +package handler + +import ( + "net/http" + "time" + + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/middleware" + "github.com/dbiz/cdp/data-layer/api/internal/model" + "github.com/dbiz/cdp/data-layer/api/internal/service" +) + +type EventHandler struct { + svc *service.QueryService + log *zap.Logger +} + +func NewEventHandler(svc *service.QueryService, log *zap.Logger) *EventHandler { + return &EventHandler{svc: svc, log: log} +} + +type queryEventsRequest struct { + Table string `json:"table" validate:"required,oneof=events_track events_identify events_page events_group"` + From *time.Time `json:"from" validate:"required"` + To *time.Time `json:"to" validate:"required,gtfield=From"` + UserID string `json:"user_id"` + AnonymousID string `json:"anonymous_id"` + EventName string `json:"event"` + Limit int `json:"limit" validate:"omitempty,min=1,max=1000"` + Offset int `json:"offset" validate:"omitempty,min=0"` +} + +// QueryEvents handles POST /query/events. +func (h *EventHandler) QueryEvents(w http.ResponseWriter, r *http.Request) { + var req queryEventsRequest + if err := decodeAndValidate(r, &req); err != nil { + writeError(w, err) + return + } + if req.Limit == 0 { + req.Limit = 100 + } + + ws := middleware.WorkspaceFromCtx(r.Context()) + res, err := h.svc.Events(r.Context(), model.EventQuery{ + WorkspaceID: ws, + Table: model.EventTable(req.Table), + From: *req.From, + To: *req.To, + UserID: req.UserID, + AnonymousID: req.AnonymousID, + EventName: req.EventName, + Limit: req.Limit, + Offset: req.Offset, + }) + if err != nil { + h.log.Error("query events", + zap.String("workspace_id", ws), + zap.String("table", req.Table), + zap.Error(err)) + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, res) +} + +// Health / Ready -- shared between all handlers but parked here for now. +func (h *EventHandler) Health(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} +func (h *EventHandler) Ready(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) +} diff --git a/data-layer/api/internal/handler/profile_handler.go b/data-layer/api/internal/handler/profile_handler.go new file mode 100644 index 0000000..30e6231 --- /dev/null +++ b/data-layer/api/internal/handler/profile_handler.go @@ -0,0 +1,85 @@ +package handler + +import ( + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" + "github.com/dbiz/cdp/data-layer/api/internal/middleware" + "github.com/dbiz/cdp/data-layer/api/internal/service" +) + +type ProfileHandler struct { + svc *service.ProfileService + log *zap.Logger +} + +func NewProfileHandler(svc *service.ProfileService, log *zap.Logger) *ProfileHandler { + return &ProfileHandler{svc: svc, log: log} +} + +// Get handles GET /profiles/:id. +func (h *ProfileHandler) Get(w http.ResponseWriter, r *http.Request) { + id, err := parseProfileID(r) + if err != nil { + writeError(w, err) + return + } + ws := middleware.WorkspaceFromCtx(r.Context()) + p, err := h.svc.Get(r.Context(), ws, id) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, p) +} + +// Timeline handles GET /profiles/:id/events. +func (h *ProfileHandler) Timeline(w http.ResponseWriter, r *http.Request) { + id, err := parseProfileID(r) + if err != nil { + writeError(w, err) + return + } + limit, offset := parsePagination(r, 100, 1000) + + ws := middleware.WorkspaceFromCtx(r.Context()) + res, err := h.svc.Timeline(r.Context(), ws, id, limit, offset) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, res) +} + +func parseProfileID(r *http.Request) (string, error) { + raw := chi.URLParam(r, "id") + if raw == "" { + return "", apperr.BadRequest("missing profile id", "id", nil) + } + if _, err := uuid.Parse(raw); err != nil { + return "", apperr.BadRequest("profile id must be uuid", "id", err) + } + return raw, nil +} + +// parsePagination reads ?limit & ?offset with bounds. Invalid values fall back +// to the defaults rather than erroring -- the endpoints are GET, not strict. +func parsePagination(r *http.Request, def, max int) (limit, offset int) { + limit = def + if v := r.URL.Query().Get("limit"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= max { + limit = n + } + } + if v := r.URL.Query().Get("offset"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + offset = n + } + } + return +} diff --git a/data-layer/api/internal/handler/render.go b/data-layer/api/internal/handler/render.go new file mode 100644 index 0000000..79c7e01 --- /dev/null +++ b/data-layer/api/internal/handler/render.go @@ -0,0 +1,47 @@ +// Package handler holds HTTP handlers. Handlers parse the request, call into +// service, and translate the result (or error) into an HTTP response. +package handler + +import ( + "encoding/json" + "net/http" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" +) + +type errorResponse struct { + Error string `json:"error"` + Field string `json:"field,omitempty"` +} + +func writeJSON(w http.ResponseWriter, status int, body any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(body) +} + +func writeError(w http.ResponseWriter, err error) { + if ae, ok := apperr.As(err); ok { + if ae.RetryAfter > 0 { + w.Header().Set("Retry-After", itoa(ae.RetryAfter)) + } + writeJSON(w, ae.Code, errorResponse{Error: ae.Message, Field: ae.Field}) + return + } + writeJSON(w, http.StatusInternalServerError, errorResponse{Error: "internal server error"}) +} + +func itoa(i int) string { + const digits = "0123456789" + if i == 0 { + return "0" + } + var buf [20]byte + pos := len(buf) + for i > 0 { + pos-- + buf[pos] = digits[i%10] + i /= 10 + } + return string(buf[pos:]) +} diff --git a/data-layer/api/internal/handler/saved_query_handler.go b/data-layer/api/internal/handler/saved_query_handler.go new file mode 100644 index 0000000..e8c770c --- /dev/null +++ b/data-layer/api/internal/handler/saved_query_handler.go @@ -0,0 +1,125 @@ +package handler + +import ( + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" + "github.com/dbiz/cdp/data-layer/api/internal/middleware" + "github.com/dbiz/cdp/data-layer/api/internal/model" + "github.com/dbiz/cdp/data-layer/api/internal/repo" +) + +type SavedQueryHandler struct { + repo *repo.SavedQueryRepo + log *zap.Logger +} + +func NewSavedQueryHandler(r *repo.SavedQueryRepo, log *zap.Logger) *SavedQueryHandler { + return &SavedQueryHandler{repo: r, log: log} +} + +type createSavedQueryRequest struct { + Name string `json:"name" validate:"required,min=1,max=200"` + Kind string `json:"kind" validate:"required,oneof=events sql funnel retention session"` + Spec map[string]any `json:"spec" validate:"required"` +} + +type updateSavedQueryRequest struct { + Name string `json:"name" validate:"required,min=1,max=200"` + Spec map[string]any `json:"spec" validate:"required"` +} + +func (h *SavedQueryHandler) Create(w http.ResponseWriter, r *http.Request) { + var req createSavedQueryRequest + if err := decodeAndValidate(r, &req); err != nil { + writeError(w, err) + return + } + ws := middleware.WorkspaceFromCtx(r.Context()) + q, err := h.repo.Create(r.Context(), model.SavedQuery{ + WorkspaceID: ws, + Name: req.Name, + Kind: req.Kind, + Spec: req.Spec, + }) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusCreated, q) +} + +func (h *SavedQueryHandler) List(w http.ResponseWriter, r *http.Request) { + limit, offset := parsePagination(r, 50, 500) + ws := middleware.WorkspaceFromCtx(r.Context()) + qs, err := h.repo.List(r.Context(), ws, limit, offset) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"items": qs, "limit": limit, "offset": offset}) +} + +func (h *SavedQueryHandler) Get(w http.ResponseWriter, r *http.Request) { + id, err := parseSavedQueryID(r) + if err != nil { + writeError(w, err) + return + } + ws := middleware.WorkspaceFromCtx(r.Context()) + q, err := h.repo.Get(r.Context(), ws, id) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, q) +} + +func (h *SavedQueryHandler) Update(w http.ResponseWriter, r *http.Request) { + id, err := parseSavedQueryID(r) + if err != nil { + writeError(w, err) + return + } + var req updateSavedQueryRequest + if err := decodeAndValidate(r, &req); err != nil { + writeError(w, err) + return + } + ws := middleware.WorkspaceFromCtx(r.Context()) + q, err := h.repo.Update(r.Context(), ws, id, req.Name, req.Spec) + if err != nil { + writeError(w, err) + return + } + writeJSON(w, http.StatusOK, q) +} + +func (h *SavedQueryHandler) Delete(w http.ResponseWriter, r *http.Request) { + id, err := parseSavedQueryID(r) + if err != nil { + writeError(w, err) + return + } + ws := middleware.WorkspaceFromCtx(r.Context()) + if err := h.repo.Delete(r.Context(), ws, id); err != nil { + writeError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func parseSavedQueryID(r *http.Request) (string, error) { + raw := chi.URLParam(r, "id") + if raw == "" { + return "", apperr.BadRequest("missing query id", "id", nil) + } + if _, err := uuid.Parse(raw); err != nil { + return "", apperr.BadRequest("query id must be uuid", "id", err) + } + return raw, nil +} diff --git a/data-layer/api/internal/handler/sql_handler.go b/data-layer/api/internal/handler/sql_handler.go new file mode 100644 index 0000000..87ab225 --- /dev/null +++ b/data-layer/api/internal/handler/sql_handler.go @@ -0,0 +1,47 @@ +package handler + +import ( + "net/http" + + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/middleware" + "github.com/dbiz/cdp/data-layer/api/internal/service" +) + +type SQLHandler struct { + svc *service.SQLService + log *zap.Logger +} + +func NewSQLHandler(svc *service.SQLService, log *zap.Logger) *SQLHandler { + return &SQLHandler{svc: svc, log: log} +} + +type customSQLRequest struct { + SQL string `json:"sql" validate:"required,min=1,max=20000"` +} + +// CustomSQL handles POST /query/sql. +func (h *SQLHandler) CustomSQL(w http.ResponseWriter, r *http.Request) { + var req customSQLRequest + if err := decodeAndValidate(r, &req); err != nil { + writeError(w, err) + return + } + ws := middleware.WorkspaceFromCtx(r.Context()) + + res, err := h.svc.Run(r.Context(), req.SQL) + if err != nil { + h.log.Warn("custom sql rejected", + zap.String("workspace_id", ws), + zap.Error(err)) + writeError(w, err) + return + } + h.log.Info("custom sql ok", + zap.String("workspace_id", ws), + zap.Int("rows", res.RowCount), + zap.Int64("duration_ms", res.DurationMS)) + writeJSON(w, http.StatusOK, res) +} diff --git a/data-layer/api/internal/middleware/middleware.go b/data-layer/api/internal/middleware/middleware.go new file mode 100644 index 0000000..08689bd --- /dev/null +++ b/data-layer/api/internal/middleware/middleware.go @@ -0,0 +1,111 @@ +// Package middleware provides chi-compatible HTTP middleware: +// request-id, panic recovery, structured logging, CORS. +package middleware + +import ( + "context" + "net/http" + "runtime/debug" + "strings" + "time" + + "github.com/google/uuid" + "go.uber.org/zap" +) + +type ctxKey string + +const ctxKeyRequestID ctxKey = "request_id" + +// RequestID assigns a uuid v4 to each request and stores it in context. +func RequestID(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + id := r.Header.Get("X-Request-Id") + if id == "" { + id = uuid.NewString() + } + ctx := context.WithValue(r.Context(), ctxKeyRequestID, id) + w.Header().Set("X-Request-Id", id) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +func RequestIDFromCtx(ctx context.Context) string { + v, _ := ctx.Value(ctxKeyRequestID).(string) + return v +} + +// Recover handles panics so a buggy handler can't take down the server. +func Recover(log *zap.Logger) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if rec := recover(); rec != nil { + log.Error("panic in handler", + zap.Any("panic", rec), + zap.String("path", r.URL.Path), + zap.ByteString("stack", debug.Stack())) + http.Error(w, `{"error":"internal server error"}`, http.StatusInternalServerError) + } + }() + next.ServeHTTP(w, r) + }) + } +} + +// Logger logs one structured line per request. +func Logger(log *zap.Logger) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + rw := &statusRecorder{ResponseWriter: w, status: 200} + next.ServeHTTP(rw, r) + log.Info("http", + zap.String("method", r.Method), + zap.String("path", r.URL.Path), + zap.Int("status", rw.status), + zap.Int64("duration_ms", time.Since(start).Milliseconds()), + zap.String("request_id", RequestIDFromCtx(r.Context())), + zap.String("ip", clientIP(r))) + }) + } +} + +// CORS returns a permissive CORS handler. The Analytics console calls the API +// directly from the browser during development. +func CORS(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, X-Request-Id") + w.Header().Set("Access-Control-Max-Age", "86400") + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + next.ServeHTTP(w, r) + }) +} + +func clientIP(r *http.Request) string { + if xff := r.Header.Get("X-Forwarded-For"); xff != "" { + if i := strings.Index(xff, ","); i >= 0 { + return strings.TrimSpace(xff[:i]) + } + return strings.TrimSpace(xff) + } + if rip := r.Header.Get("X-Real-Ip"); rip != "" { + return rip + } + return r.RemoteAddr +} + +type statusRecorder struct { + http.ResponseWriter + status int +} + +func (s *statusRecorder) WriteHeader(code int) { + s.status = code + s.ResponseWriter.WriteHeader(code) +} diff --git a/data-layer/api/internal/middleware/workspace.go b/data-layer/api/internal/middleware/workspace.go new file mode 100644 index 0000000..c9f4429 --- /dev/null +++ b/data-layer/api/internal/middleware/workspace.go @@ -0,0 +1,51 @@ +package middleware + +import ( + "context" + "net/http" + + "github.com/google/uuid" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" +) + +const ctxKeyWorkspace ctxKey = "workspace_id" + +// Workspace pulls the active workspace UUID from the X-Workspace-Id header +// and stores it in context. Returns 400 for missing / malformed values. +// +// TODO(auth): wire this to the console session / JWT once the auth scheme +// for the data-layer is finalized. For now the header drives everything. +func Workspace(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + raw := r.Header.Get("X-Workspace-Id") + if raw == "" { + writeAppErr(w, apperr.BadRequest("missing X-Workspace-Id header", "workspace_id", nil)) + return + } + id, err := uuid.Parse(raw) + if err != nil { + writeAppErr(w, apperr.BadRequest("invalid X-Workspace-Id", "workspace_id", err)) + return + } + ctx := context.WithValue(r.Context(), ctxKeyWorkspace, id.String()) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +// WorkspaceFromCtx returns the workspace id set by Workspace middleware. +func WorkspaceFromCtx(ctx context.Context) string { + v, _ := ctx.Value(ctxKeyWorkspace).(string) + return v +} + +func writeAppErr(w http.ResponseWriter, err *apperr.AppError) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(err.Code) + body := `{"error":"` + err.Message + `"` + if err.Field != "" { + body += `,"field":"` + err.Field + `"` + } + body += `}` + _, _ = w.Write([]byte(body)) +} diff --git a/data-layer/api/internal/model/profile.go b/data-layer/api/internal/model/profile.go new file mode 100644 index 0000000..e14a5d0 --- /dev/null +++ b/data-layer/api/internal/model/profile.go @@ -0,0 +1,27 @@ +package model + +import "time" + +// Profile is the unified-profile shape returned by /profiles/:id. The +// underlying table is owned by cdp-ingestion (identity-resolution). +type Profile struct { + ID string `json:"id"` + WorkspaceID string `json:"workspace_id"` + UserID string `json:"user_id,omitempty"` + AnonymousIDs []string `json:"anonymous_ids,omitempty"` + Traits map[string]any `json:"traits,omitempty"` + FirstSeenAt time.Time `json:"first_seen_at"` + LastSeenAt time.Time `json:"last_seen_at"` +} + +// SavedQuery mirrors the saved_queries table. +type SavedQuery struct { + ID string `json:"id"` + WorkspaceID string `json:"workspace_id"` + OwnerID string `json:"owner_id,omitempty"` + Name string `json:"name"` + Kind string `json:"kind"` + Spec map[string]any `json:"spec"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} diff --git a/data-layer/api/internal/model/query.go b/data-layer/api/internal/model/query.go new file mode 100644 index 0000000..f591f5e --- /dev/null +++ b/data-layer/api/internal/model/query.go @@ -0,0 +1,47 @@ +// Package model defines domain types passed between layers. +package model + +import "time" + +// EventTable enumerates the four ClickHouse event tables written by +// cdp-ingestion. Used to whitelist `events` queries so we never interpolate +// an untrusted table name into a template. +type EventTable string + +const ( + EventTableTrack EventTable = "events_track" + EventTableIdentify EventTable = "events_identify" + EventTablePage EventTable = "events_page" + EventTableGroup EventTable = "events_group" +) + +func (t EventTable) Valid() bool { + switch t { + case EventTableTrack, EventTableIdentify, EventTablePage, EventTableGroup: + return true + } + return false +} + +// EventQuery is the parsed filter passed to repo.QueryEvents. +type EventQuery struct { + WorkspaceID string + Table EventTable + From time.Time + To time.Time + UserID string // optional + AnonymousID string // optional + EventName string // optional, only meaningful when Table == events_track + Limit int + Offset int +} + +// QueryResult is a generic columns+rows envelope returned by Query API endpoints. +type QueryResult struct { + Columns []string `json:"columns"` + Rows [][]any `json:"rows"` + RowCount int `json:"row_count"` + DurationMS int64 `json:"duration_ms"` + CacheHit bool `json:"cache_hit"` + Meta map[string]any `json:"meta,omitempty"` +} diff --git a/data-layer/api/internal/repo/.gitkeep b/data-layer/api/internal/repo/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/data-layer/api/internal/repo/analytics_repo.go b/data-layer/api/internal/repo/analytics_repo.go new file mode 100644 index 0000000..af5e611 --- /dev/null +++ b/data-layer/api/internal/repo/analytics_repo.go @@ -0,0 +1,167 @@ +package repo + +import ( + "context" + "fmt" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + + "github.com/dbiz/cdp/data-layer/api/internal/model" + "github.com/dbiz/cdp/data-layer/api/internal/templates" +) + +// AnalyticsRepo runs the higher-level P1 query templates (funnel, retention, +// session) against ClickHouse. It shares the read connection with EventRepo +// but lives in its own file because the templates need their own data shapes. +type AnalyticsRepo struct { + ch driver.Conn + tpl *templates.Store +} + +func NewAnalyticsRepo(ch driver.Conn, tpl *templates.Store) *AnalyticsRepo { + return &AnalyticsRepo{ch: ch, tpl: tpl} +} + +// --------------------------------------------------------------------------- +// Funnel +// --------------------------------------------------------------------------- + +type FunnelQuery struct { + WorkspaceID string + Steps []string + From time.Time + To time.Time + WindowSeconds uint32 +} + +func (r *AnalyticsRepo) Funnel(ctx context.Context, q FunnelQuery) (*model.QueryResult, error) { + if len(q.Steps) < 2 { + return nil, fmt.Errorf("funnel requires at least 2 steps") + } + + type stepTpl struct { + Index int + Last bool + } + stepsTpl := make([]stepTpl, len(q.Steps)) + for i := range q.Steps { + stepsTpl[i] = stepTpl{Index: i, Last: i == len(q.Steps)-1} + } + + sql, err := r.tpl.Render("funnel_analysis.sql.tmpl", map[string]any{ + "Steps": stepsTpl, + "StepCount": len(q.Steps), + }) + if err != nil { + return nil, err + } + + args := []any{ + clickhouse.Named("workspace_id", q.WorkspaceID), + clickhouse.DateNamed("from", q.From, clickhouse.MilliSeconds), + clickhouse.DateNamed("to", q.To, clickhouse.MilliSeconds), + clickhouse.Named("window_seconds", q.WindowSeconds), + } + for i, name := range q.Steps { + args = append(args, clickhouse.Named(fmt.Sprintf("step%d", i), name)) + } + + rows, err := r.ch.Query(ctx, sql, args...) + if err != nil { + return nil, fmt.Errorf("clickhouse funnel: %w", err) + } + defer rows.Close() + return ScanRows(rows) +} + +// --------------------------------------------------------------------------- +// Retention +// --------------------------------------------------------------------------- + +type RetentionQuery struct { + WorkspaceID string + InitialEvent string + ReturnEvent string + From time.Time + To time.Time + Periods int // e.g. 14 => D0..D13 +} + +func (r *AnalyticsRepo) Retention(ctx context.Context, q RetentionQuery) (*model.QueryResult, error) { + if q.Periods < 1 { + q.Periods = 14 + } + type periodTpl struct { + RIndex int + OffsetDay int + Last bool + } + outer := make([]periodTpl, q.Periods) + for i := 0; i < q.Periods; i++ { + outer[i] = periodTpl{RIndex: i + 2, OffsetDay: i + 1, Last: i == q.Periods-1} + } + + sql, err := r.tpl.Render("retention_cohort.sql.tmpl", map[string]any{ + "Outer": outer, + }) + if err != nil { + return nil, err + } + + rows, err := r.ch.Query(ctx, sql, + clickhouse.Named("workspace_id", q.WorkspaceID), + clickhouse.DateNamed("from", q.From, clickhouse.MilliSeconds), + clickhouse.DateNamed("to", q.To, clickhouse.MilliSeconds), + clickhouse.Named("initial_event", q.InitialEvent), + clickhouse.Named("return_event", q.ReturnEvent), + ) + if err != nil { + return nil, fmt.Errorf("clickhouse retention: %w", err) + } + defer rows.Close() + return ScanRows(rows) +} + +// --------------------------------------------------------------------------- +// Session +// --------------------------------------------------------------------------- + +type SessionQuery struct { + WorkspaceID string + UserID string // optional + From time.Time + To time.Time + TimeoutSeconds uint32 + Limit int + Offset int +} + +func (r *AnalyticsRepo) Sessions(ctx context.Context, q SessionQuery) (*model.QueryResult, error) { + sql, err := r.tpl.Render("session_analysis.sql.tmpl", map[string]any{ + "HasUserID": q.UserID != "", + }) + if err != nil { + return nil, err + } + + args := []any{ + clickhouse.Named("workspace_id", q.WorkspaceID), + clickhouse.DateNamed("from", q.From, clickhouse.MilliSeconds), + clickhouse.DateNamed("to", q.To, clickhouse.MilliSeconds), + clickhouse.Named("timeout_seconds", q.TimeoutSeconds), + clickhouse.Named("limit", uint32(q.Limit)), + clickhouse.Named("offset", uint32(q.Offset)), + } + if q.UserID != "" { + args = append(args, clickhouse.Named("user_id", q.UserID)) + } + + rows, err := r.ch.Query(ctx, sql, args...) + if err != nil { + return nil, fmt.Errorf("clickhouse session: %w", err) + } + defer rows.Close() + return ScanRows(rows) +} diff --git a/data-layer/api/internal/repo/chconn.go b/data-layer/api/internal/repo/chconn.go new file mode 100644 index 0000000..22224f2 --- /dev/null +++ b/data-layer/api/internal/repo/chconn.go @@ -0,0 +1,58 @@ +package repo + +import ( + "context" + "fmt" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" +) + +// NewClickHouse opens a native-protocol ClickHouse connection. The returned +// driver.Conn is safe for concurrent use. Caller owns Close(). +func NewClickHouse(ctx context.Context, addr, db, user, password string) (driver.Conn, error) { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{addr}, + Auth: clickhouse.Auth{ + Database: db, + Username: user, + Password: password, + }, + Settings: clickhouse.Settings{ + "readonly": 0, // analytics queries; per-user read-only enforced for /query/sql separately + }, + }) + if err != nil { + return nil, fmt.Errorf("open clickhouse: %w", err) + } + if err := conn.Ping(ctx); err != nil { + _ = conn.Close() + return nil, fmt.Errorf("ping clickhouse: %w", err) + } + return conn, nil +} + +// NewClickHouseReadOnly opens a ClickHouse connection using a SELECT-only +// account. Used to back the /query/sql sandbox: DDL/DML are rejected at the DB +// level even if the app-level keyword guard is bypassed. +func NewClickHouseReadOnly(ctx context.Context, addr, db, user, password string) (driver.Conn, error) { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{addr}, + Auth: clickhouse.Auth{ + Database: db, + Username: user, + Password: password, + }, + Settings: clickhouse.Settings{ + "readonly": 2, // belt-and-braces: server-side enforce read-only + }, + }) + if err != nil { + return nil, fmt.Errorf("open clickhouse (ro): %w", err) + } + if err := conn.Ping(ctx); err != nil { + _ = conn.Close() + return nil, fmt.Errorf("ping clickhouse (ro): %w", err) + } + return conn, nil +} diff --git a/data-layer/api/internal/repo/event_repo.go b/data-layer/api/internal/repo/event_repo.go new file mode 100644 index 0000000..0e3f589 --- /dev/null +++ b/data-layer/api/internal/repo/event_repo.go @@ -0,0 +1,194 @@ +package repo + +import ( + "context" + "fmt" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + + "github.com/dbiz/cdp/data-layer/api/internal/model" + "github.com/dbiz/cdp/data-layer/api/internal/templates" +) + +type EventRepo struct { + ch driver.Conn + tpl *templates.Store +} + +func NewEventRepo(ch driver.Conn, tpl *templates.Store) *EventRepo { + return &EventRepo{ch: ch, tpl: tpl} +} + +// QueryEvents renders the event_explorer template against q.Table and returns +// columns+rows. The query is parameterized -- user input never lands in the +// SQL string, only in clickhouse.Named bindings. +func (r *EventRepo) QueryEvents(ctx context.Context, q model.EventQuery) (*model.QueryResult, error) { + if !q.Table.Valid() { + return nil, fmt.Errorf("invalid event table: %q", q.Table) + } + sql, err := r.tpl.Render("event_explorer.sql.tmpl", map[string]any{ + "Table": string(q.Table), + "HasUserID": q.UserID != "", + "HasAnonymousID": q.AnonymousID != "", + "HasEventName": q.EventName != "" && q.Table == model.EventTableTrack, + }) + if err != nil { + return nil, err + } + + args := []any{ + clickhouse.Named("workspace_id", q.WorkspaceID), + clickhouse.DateNamed("from", q.From, clickhouse.MilliSeconds), + clickhouse.DateNamed("to", q.To, clickhouse.MilliSeconds), + clickhouse.Named("limit", uint32(q.Limit)), + clickhouse.Named("offset", uint32(q.Offset)), + } + if q.UserID != "" { + args = append(args, clickhouse.Named("user_id", q.UserID)) + } + if q.AnonymousID != "" { + args = append(args, clickhouse.Named("anonymous_id", q.AnonymousID)) + } + if q.EventName != "" && q.Table == model.EventTableTrack { + args = append(args, clickhouse.Named("event", q.EventName)) + } + + rows, err := r.ch.Query(ctx, sql, args...) + if err != nil { + return nil, fmt.Errorf("clickhouse query: %w", err) + } + defer rows.Close() + + return ScanRows(rows) +} + +// QueryProfileTimeline returns recent events for a profile (resolved to +// user_id) across all four event tables, ordered by received_at desc. +func (r *EventRepo) QueryProfileTimeline(ctx context.Context, workspaceID, userID string, limit, offset int) (*model.QueryResult, error) { + sql, err := r.tpl.Render("profile_timeline.sql.tmpl", nil) + if err != nil { + return nil, err + } + rows, err := r.ch.Query(ctx, sql, + clickhouse.Named("workspace_id", workspaceID), + clickhouse.Named("user_id", userID), + clickhouse.Named("limit", uint32(limit)), + clickhouse.Named("offset", uint32(offset)), + ) + if err != nil { + return nil, fmt.Errorf("clickhouse query: %w", err) + } + defer rows.Close() + return ScanRows(rows) +} + +// ScanRows turns a driver.Rows iterator into a generic QueryResult. Column +// types come from rows.ColumnTypes() so we allocate the right pointer kinds. +func ScanRows(rows driver.Rows) (*model.QueryResult, error) { + cols := rows.Columns() + colTypes := rows.ColumnTypes() + out := &model.QueryResult{Columns: cols, Rows: [][]any{}} + + for rows.Next() { + dest := make([]any, len(colTypes)) + for i, ct := range colTypes { + dest[i] = newScanTarget(ct.ScanType().String()) + } + if err := rows.Scan(dest...); err != nil { + return nil, fmt.Errorf("scan row: %w", err) + } + row := make([]any, len(dest)) + for i, p := range dest { + row[i] = derefScanTarget(p) + } + out.Rows = append(out.Rows, row) + } + if err := rows.Err(); err != nil { + return nil, err + } + out.RowCount = len(out.Rows) + return out, nil +} + +// newScanTarget returns a pointer matching ClickHouse's reported Go scan type. +// We keep this list small -- the analytics tables share a handful of types. +func newScanTarget(typeName string) any { + switch typeName { + case "string": + var v string + return &v + case "uint8": + var v uint8 + return &v + case "uint16": + var v uint16 + return &v + case "uint32": + var v uint32 + return &v + case "uint64": + var v uint64 + return &v + case "int32": + var v int32 + return &v + case "int64": + var v int64 + return &v + case "float32": + var v float32 + return &v + case "float64": + var v float64 + return &v + case "bool": + var v bool + return &v + case "time.Time": + return new(any) // let driver fill, deref below handles it + case "map[string]string": + var v map[string]string + return &v + case "[]string": + var v []string + return &v + default: + // Fallback: untyped pointer; driver decides. + var v any + return &v + } +} + +func derefScanTarget(p any) any { + switch v := p.(type) { + case *string: + return *v + case *uint8: + return *v + case *uint16: + return *v + case *uint32: + return *v + case *uint64: + return *v + case *int32: + return *v + case *int64: + return *v + case *float32: + return *v + case *float64: + return *v + case *bool: + return *v + case *map[string]string: + return *v + case *[]string: + return *v + case *any: + return *v + default: + return v + } +} diff --git a/data-layer/api/internal/repo/pool.go b/data-layer/api/internal/repo/pool.go new file mode 100644 index 0000000..6ad0a31 --- /dev/null +++ b/data-layer/api/internal/repo/pool.go @@ -0,0 +1,28 @@ +// Package repo holds data-access code. PostgreSQL handles owned tables +// (trait_definitions, profile_traits, segment_*, saved_queries) and read-only +// joins onto ingestion-owned tables (workspaces, profiles, sources, ...). +package repo + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// NewPool returns a pgxpool ready for use. Caller owns Close(). +func NewPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) { + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, fmt.Errorf("parse pg dsn: %w", err) + } + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("open pg pool: %w", err) + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping pg: %w", err) + } + return pool, nil +} diff --git a/data-layer/api/internal/repo/profile_repo.go b/data-layer/api/internal/repo/profile_repo.go new file mode 100644 index 0000000..1fbe2dd --- /dev/null +++ b/data-layer/api/internal/repo/profile_repo.go @@ -0,0 +1,70 @@ +package repo + +import ( + "context" + "encoding/json" + "errors" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" + "github.com/dbiz/cdp/data-layer/api/internal/model" +) + +// ProfileRepo reads the unified-profile table owned by cdp-ingestion. +// +// Assumed schema (TODO: align with cdp-ingestion once that migration lands): +// +// profiles ( +// id UUID, +// workspace_id UUID, +// user_id TEXT, +// anonymous_ids TEXT[], +// traits JSONB, +// first_seen_at TIMESTAMPTZ, +// last_seen_at TIMESTAMPTZ +// ) +type ProfileRepo struct { + pg *pgxpool.Pool +} + +func NewProfileRepo(pg *pgxpool.Pool) *ProfileRepo { return &ProfileRepo{pg: pg} } + +const selectProfileByID = ` +SELECT id, workspace_id, user_id, anonymous_ids, traits, first_seen_at, last_seen_at +FROM profiles +WHERE workspace_id = $1 AND id = $2 +` + +func (r *ProfileRepo) GetByID(ctx context.Context, workspaceID, profileID string) (*model.Profile, error) { + row := r.pg.QueryRow(ctx, selectProfileByID, workspaceID, profileID) + var p model.Profile + var traitsRaw []byte + if err := row.Scan(&p.ID, &p.WorkspaceID, &p.UserID, &p.AnonymousIDs, &traitsRaw, &p.FirstSeenAt, &p.LastSeenAt); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, apperr.NotFound("profile not found") + } + return nil, apperr.Internal(err) + } + if len(traitsRaw) > 0 { + if err := json.Unmarshal(traitsRaw, &p.Traits); err != nil { + return nil, apperr.Internal(err) + } + } + return &p, nil +} + +// GetUserIDForProfile resolves a profile UUID back to its primary user_id so +// the timeline query can target ClickHouse events on that key. +func (r *ProfileRepo) GetUserIDForProfile(ctx context.Context, workspaceID, profileID string) (string, error) { + const q = `SELECT user_id FROM profiles WHERE workspace_id = $1 AND id = $2` + var uid string + if err := r.pg.QueryRow(ctx, q, workspaceID, profileID).Scan(&uid); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return "", apperr.NotFound("profile not found") + } + return "", apperr.Internal(err) + } + return uid, nil +} diff --git a/data-layer/api/internal/repo/saved_query_repo.go b/data-layer/api/internal/repo/saved_query_repo.go new file mode 100644 index 0000000..eb6abc5 --- /dev/null +++ b/data-layer/api/internal/repo/saved_query_repo.go @@ -0,0 +1,120 @@ +package repo + +import ( + "context" + "encoding/json" + "errors" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" + "github.com/dbiz/cdp/data-layer/api/internal/model" +) + +type SavedQueryRepo struct { + pg *pgxpool.Pool +} + +func NewSavedQueryRepo(pg *pgxpool.Pool) *SavedQueryRepo { return &SavedQueryRepo{pg: pg} } + +const ( + insertSavedQuery = ` +INSERT INTO saved_queries (workspace_id, owner_id, name, kind, spec) +VALUES ($1, NULLIF($2, '')::uuid, $3, $4, $5) +RETURNING id, workspace_id, COALESCE(owner_id::text, '') AS owner_id, name, kind, spec, created_at, updated_at +` + selectSavedQueries = ` +SELECT id, workspace_id, COALESCE(owner_id::text, '') AS owner_id, name, kind, spec, created_at, updated_at +FROM saved_queries +WHERE workspace_id = $1 +ORDER BY updated_at DESC +LIMIT $2 OFFSET $3 +` + selectSavedQuery = ` +SELECT id, workspace_id, COALESCE(owner_id::text, '') AS owner_id, name, kind, spec, created_at, updated_at +FROM saved_queries +WHERE workspace_id = $1 AND id = $2 +` + updateSavedQuery = ` +UPDATE saved_queries +SET name = $3, spec = $4, updated_at = now() +WHERE workspace_id = $1 AND id = $2 +RETURNING id, workspace_id, COALESCE(owner_id::text, '') AS owner_id, name, kind, spec, created_at, updated_at +` + deleteSavedQuery = `DELETE FROM saved_queries WHERE workspace_id = $1 AND id = $2` +) + +func (r *SavedQueryRepo) Create(ctx context.Context, q model.SavedQuery) (*model.SavedQuery, error) { + spec, err := json.Marshal(q.Spec) + if err != nil { + return nil, apperr.BadRequest("spec must be valid json", "spec", err) + } + row := r.pg.QueryRow(ctx, insertSavedQuery, q.WorkspaceID, q.OwnerID, q.Name, q.Kind, spec) + return scanSavedQuery(row) +} + +func (r *SavedQueryRepo) List(ctx context.Context, workspaceID string, limit, offset int) ([]model.SavedQuery, error) { + rows, err := r.pg.Query(ctx, selectSavedQueries, workspaceID, limit, offset) + if err != nil { + return nil, apperr.Internal(err) + } + defer rows.Close() + + out := []model.SavedQuery{} + for rows.Next() { + q, err := scanSavedQuery(rows) + if err != nil { + return nil, err + } + out = append(out, *q) + } + return out, rows.Err() +} + +func (r *SavedQueryRepo) Get(ctx context.Context, workspaceID, id string) (*model.SavedQuery, error) { + row := r.pg.QueryRow(ctx, selectSavedQuery, workspaceID, id) + return scanSavedQuery(row) +} + +func (r *SavedQueryRepo) Update(ctx context.Context, workspaceID, id, name string, spec map[string]any) (*model.SavedQuery, error) { + specJSON, err := json.Marshal(spec) + if err != nil { + return nil, apperr.BadRequest("spec must be valid json", "spec", err) + } + row := r.pg.QueryRow(ctx, updateSavedQuery, workspaceID, id, name, specJSON) + return scanSavedQuery(row) +} + +func (r *SavedQueryRepo) Delete(ctx context.Context, workspaceID, id string) error { + ct, err := r.pg.Exec(ctx, deleteSavedQuery, workspaceID, id) + if err != nil { + return apperr.Internal(err) + } + if ct.RowsAffected() == 0 { + return apperr.NotFound("saved query not found") + } + return nil +} + +// scanSavedQuery accepts both pgx.Row and pgx.Rows (they share Scan). +type scanner interface { + Scan(dest ...any) error +} + +func scanSavedQuery(s scanner) (*model.SavedQuery, error) { + var q model.SavedQuery + var specRaw []byte + if err := s.Scan(&q.ID, &q.WorkspaceID, &q.OwnerID, &q.Name, &q.Kind, &specRaw, &q.CreatedAt, &q.UpdatedAt); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, apperr.NotFound("saved query not found") + } + return nil, apperr.Internal(err) + } + if len(specRaw) > 0 { + if err := json.Unmarshal(specRaw, &q.Spec); err != nil { + return nil, apperr.Internal(err) + } + } + return &q, nil +} diff --git a/data-layer/api/internal/service/.gitkeep b/data-layer/api/internal/service/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/data-layer/api/internal/service/profile_service.go b/data-layer/api/internal/service/profile_service.go new file mode 100644 index 0000000..6792f97 --- /dev/null +++ b/data-layer/api/internal/service/profile_service.go @@ -0,0 +1,66 @@ +package service + +import ( + "context" + "encoding/json" + "time" + + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" + "github.com/dbiz/cdp/data-layer/api/internal/cache" + "github.com/dbiz/cdp/data-layer/api/internal/model" + "github.com/dbiz/cdp/data-layer/api/internal/repo" +) + +type ProfileService struct { + profiles *repo.ProfileRepo + events *repo.EventRepo + cache *cache.Cache + profileTTL time.Duration + log *zap.Logger +} + +func NewProfileService(p *repo.ProfileRepo, e *repo.EventRepo, c *cache.Cache, profileTTL time.Duration, log *zap.Logger) *ProfileService { + return &ProfileService{profiles: p, events: e, cache: c, profileTTL: profileTTL, log: log} +} + +func (s *ProfileService) Get(ctx context.Context, workspaceID, profileID string) (*model.Profile, error) { + key, err := cache.Key("profile", workspaceID, profileID) + if err != nil { + return nil, apperr.Internal(err) + } + if b, ok := s.cache.Get(ctx, key); ok { + var p model.Profile + if jerr := json.Unmarshal(b, &p); jerr == nil { + return &p, nil + } + } + p, err := s.profiles.GetByID(ctx, workspaceID, profileID) + if err != nil { + return nil, err + } + if b, jerr := json.Marshal(p); jerr == nil { + if cerr := s.cache.Set(ctx, key, b, s.profileTTL); cerr != nil { + s.log.Warn("cache set", zap.String("key", key), zap.Error(cerr)) + } + } + return p, nil +} + +func (s *ProfileService) Timeline(ctx context.Context, workspaceID, profileID string, limit, offset int) (*model.QueryResult, error) { + uid, err := s.profiles.GetUserIDForProfile(ctx, workspaceID, profileID) + if err != nil { + return nil, err + } + if uid == "" { + return nil, apperr.NotFound("profile has no user_id and cannot be timelined") + } + start := time.Now() + res, err := s.events.QueryProfileTimeline(ctx, workspaceID, uid, limit, offset) + if err != nil { + return nil, apperr.Internal(err) + } + res.DurationMS = time.Since(start).Milliseconds() + return res, nil +} diff --git a/data-layer/api/internal/service/query_service.go b/data-layer/api/internal/service/query_service.go new file mode 100644 index 0000000..932a07b --- /dev/null +++ b/data-layer/api/internal/service/query_service.go @@ -0,0 +1,87 @@ +// Package service holds business logic. It owns cache orchestration around +// the read repos and never touches HTTP/chi or the SQL drivers directly. +package service + +import ( + "context" + "encoding/json" + "time" + + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" + "github.com/dbiz/cdp/data-layer/api/internal/cache" + "github.com/dbiz/cdp/data-layer/api/internal/model" + "github.com/dbiz/cdp/data-layer/api/internal/repo" +) + +type QueryService struct { + events *repo.EventRepo + analytics *repo.AnalyticsRepo + cache *cache.Cache + queryTTL time.Duration + log *zap.Logger +} + +func NewQueryService(events *repo.EventRepo, analytics *repo.AnalyticsRepo, c *cache.Cache, queryTTL time.Duration, log *zap.Logger) *QueryService { + return &QueryService{events: events, analytics: analytics, cache: c, queryTTL: queryTTL, log: log} +} + +// cached wraps `fetch` with the per-workspace Redis cache. Result is JSON- +// encoded on miss; CacheHit is set true on hit. +func (s *QueryService) cached( + ctx context.Context, + kind, workspaceID string, + params any, + fetch func(context.Context) (*model.QueryResult, error), +) (*model.QueryResult, error) { + key, err := cache.Key(kind, workspaceID, params) + if err != nil { + return nil, apperr.Internal(err) + } + if cached, ok := s.cache.Get(ctx, key); ok { + var out model.QueryResult + if jerr := json.Unmarshal(cached, &out); jerr == nil { + out.CacheHit = true + return &out, nil + } + } + start := time.Now() + res, err := fetch(ctx) + if err != nil { + return nil, apperr.Internal(err) + } + res.DurationMS = time.Since(start).Milliseconds() + res.CacheHit = false + + if b, jerr := json.Marshal(res); jerr == nil { + if cerr := s.cache.Set(ctx, key, b, s.queryTTL); cerr != nil { + s.log.Warn("cache set", zap.String("key", key), zap.Error(cerr)) + } + } + return res, nil +} + +func (s *QueryService) Events(ctx context.Context, q model.EventQuery) (*model.QueryResult, error) { + return s.cached(ctx, "query:events", q.WorkspaceID, q, func(c context.Context) (*model.QueryResult, error) { + return s.events.QueryEvents(c, q) + }) +} + +func (s *QueryService) Funnel(ctx context.Context, q repo.FunnelQuery) (*model.QueryResult, error) { + return s.cached(ctx, "query:funnel", q.WorkspaceID, q, func(c context.Context) (*model.QueryResult, error) { + return s.analytics.Funnel(c, q) + }) +} + +func (s *QueryService) Retention(ctx context.Context, q repo.RetentionQuery) (*model.QueryResult, error) { + return s.cached(ctx, "query:retention", q.WorkspaceID, q, func(c context.Context) (*model.QueryResult, error) { + return s.analytics.Retention(c, q) + }) +} + +func (s *QueryService) Sessions(ctx context.Context, q repo.SessionQuery) (*model.QueryResult, error) { + return s.cached(ctx, "query:session", q.WorkspaceID, q, func(c context.Context) (*model.QueryResult, error) { + return s.analytics.Sessions(c, q) + }) +} diff --git a/data-layer/api/internal/service/sql_service.go b/data-layer/api/internal/service/sql_service.go new file mode 100644 index 0000000..145bbe1 --- /dev/null +++ b/data-layer/api/internal/service/sql_service.go @@ -0,0 +1,98 @@ +package service + +import ( + "context" + "strings" + "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "go.uber.org/zap" + + "github.com/dbiz/cdp/data-layer/api/internal/apperr" + "github.com/dbiz/cdp/data-layer/api/internal/model" + "github.com/dbiz/cdp/data-layer/api/internal/repo" +) + +// SQLService backs the Custom SQL sandbox. It applies two layers of guard: +// 1. App-level: parse the statement, reject anything that is not a single +// SELECT and anything containing DDL/DML keywords. +// 2. DB-level: queries run against a SELECT-only ClickHouse account so the +// server rejects writes even if app-level checks are bypassed. +type SQLService struct { + ch driver.Conn // read-only conn + log *zap.Logger +} + +func NewSQLService(roConn driver.Conn, log *zap.Logger) *SQLService { + return &SQLService{ch: roConn, log: log} +} + +var forbiddenKeywords = []string{ + "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", "TRUNCATE", + "GRANT", "REVOKE", "ATTACH", "DETACH", "OPTIMIZE", "RENAME", "EXCHANGE", +} + +// validateReadOnly rejects multi-statement input and obvious DDL/DML. +func validateReadOnly(sql string) error { + trimmed := strings.TrimSpace(sql) + if trimmed == "" { + return apperr.BadRequest("sql is empty", "sql", nil) + } + // Reject multiple statements -- the ClickHouse driver also rejects this, + // but we want a friendly error before hitting the wire. + if strings.Contains(strings.TrimRight(trimmed, ";"), ";") { + return apperr.BadRequest("only a single statement is allowed", "sql", nil) + } + upper := strings.ToUpper(trimmed) + if !strings.HasPrefix(upper, "SELECT") && !strings.HasPrefix(upper, "WITH") { + return apperr.BadRequest("only SELECT statements are allowed", "sql", nil) + } + // Token-level keyword scan: \bKW\b to avoid false positives like "created_at". + for _, kw := range forbiddenKeywords { + if hasWord(upper, kw) { + return apperr.BadRequest("statement contains forbidden keyword: "+kw, "sql", nil) + } + } + return nil +} + +func hasWord(s, word string) bool { + for { + idx := strings.Index(s, word) + if idx < 0 { + return false + } + left := idx == 0 || !isIdent(s[idx-1]) + right := idx+len(word) == len(s) || !isIdent(s[idx+len(word)]) + if left && right { + return true + } + s = s[idx+len(word):] + } +} + +func isIdent(c byte) bool { + return c == '_' || (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') +} + +// Run executes the (validated) SQL against the read-only ClickHouse user. +// Results are never cached -- queries are arbitrary. +func (s *SQLService) Run(ctx context.Context, sql string) (*model.QueryResult, error) { + if err := validateReadOnly(sql); err != nil { + return nil, err + } + start := time.Now() + rows, err := s.ch.Query(ctx, sql) + if err != nil { + // ClickHouse syntax / permission errors are user-visible, not 500. + return nil, apperr.BadRequest("clickhouse rejected query: "+err.Error(), "sql", err) + } + defer rows.Close() + + res, err := repo.ScanRows(rows) + if err != nil { + return nil, apperr.Internal(err) + } + res.DurationMS = time.Since(start).Milliseconds() + return res, nil +} diff --git a/data-layer/api/internal/templates/templates.go b/data-layer/api/internal/templates/templates.go new file mode 100644 index 0000000..7c88fbe --- /dev/null +++ b/data-layer/api/internal/templates/templates.go @@ -0,0 +1,65 @@ +// Package templates loads ClickHouse SQL templates from disk. Templates are +// rendered via text/template so we can interpolate validated structural bits +// (e.g. which event table to read from); value parameters are bound via +// clickhouse.Named at call site rather than rendered. +package templates + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "text/template" +) + +type Store struct { + dir string + mu sync.RWMutex + cache map[string]*template.Template +} + +func New(dir string) *Store { + return &Store{dir: dir, cache: map[string]*template.Template{}} +} + +// Render loads `name` (with a `.sql.tmpl` suffix appended if not given) and +// renders it against `data`. Templates are parsed once and cached. +func (s *Store) Render(name string, data any) (string, error) { + tpl, err := s.load(name) + if err != nil { + return "", err + } + var buf bytes.Buffer + if err := tpl.Execute(&buf, data); err != nil { + return "", fmt.Errorf("render %s: %w", name, err) + } + return buf.String(), nil +} + +func (s *Store) load(name string) (*template.Template, error) { + if !strings.HasSuffix(name, ".sql") && !strings.HasSuffix(name, ".sql.tmpl") { + name += ".sql.tmpl" + } + s.mu.RLock() + if t, ok := s.cache[name]; ok { + s.mu.RUnlock() + return t, nil + } + s.mu.RUnlock() + + path := filepath.Join(s.dir, name) + raw, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read template %s: %w", path, err) + } + t, err := template.New(name).Parse(string(raw)) + if err != nil { + return nil, fmt.Errorf("parse template %s: %w", path, err) + } + s.mu.Lock() + s.cache[name] = t + s.mu.Unlock() + return t, nil +} diff --git a/data-layer/console/Dockerfile b/data-layer/console/Dockerfile new file mode 100644 index 0000000..1b53f5a --- /dev/null +++ b/data-layer/console/Dockerfile @@ -0,0 +1,11 @@ +FROM node:20-alpine AS build +WORKDIR /app +COPY package.json ./ +RUN npm install +COPY . . +RUN npm run build + +FROM nginx:1.27-alpine +COPY --from=build /app/dist /usr/share/nginx/html +COPY nginx.conf /etc/nginx/conf.d/default.conf +EXPOSE 4002 diff --git a/data-layer/console/index.html b/data-layer/console/index.html new file mode 100644 index 0000000..8da7666 --- /dev/null +++ b/data-layer/console/index.html @@ -0,0 +1,13 @@ + + + + + + + CDP Analytics + + +
+ + + diff --git a/data-layer/console/nginx.conf b/data-layer/console/nginx.conf new file mode 100644 index 0000000..582f1b7 --- /dev/null +++ b/data-layer/console/nginx.conf @@ -0,0 +1,14 @@ +server { + listen 4002; + root /usr/share/nginx/html; + index index.html; + + location / { + try_files $uri $uri/ /index.html; + } + + location /api/analytics/ { + rewrite ^/api/analytics/(.*)$ /$1 break; + proxy_pass http://api:4000; + } +} diff --git a/data-layer/console/package.json b/data-layer/console/package.json new file mode 100644 index 0000000..bb45818 --- /dev/null +++ b/data-layer/console/package.json @@ -0,0 +1,44 @@ +{ + "name": "cdp-analytics-console", + "version": "0.1.0", + "private": true, + "type": "module", + "scripts": { + "dev": "vite", + "build": "tsc -b && vite build", + "preview": "vite preview --port 4002", + "lint": "eslint ." + }, + "dependencies": { + "@radix-ui/react-dialog": "^1.1.2", + "@radix-ui/react-dropdown-menu": "^2.1.2", + "@radix-ui/react-label": "^2.1.0", + "@radix-ui/react-slot": "^1.1.0", + "@radix-ui/react-tabs": "^1.1.1", + "@radix-ui/react-toast": "^1.2.2", + "@tanstack/react-query": "^5.59.16", + "class-variance-authority": "^0.7.0", + "clsx": "^2.1.1", + "lucide-react": "^0.451.0", + "react": "^18.3.1", + "react-dom": "^18.3.1", + "react-hook-form": "^7.53.0", + "react-router-dom": "^6.27.0", + "recharts": "^2.13.0", + "tailwind-merge": "^2.5.4", + "zod": "^3.23.8", + "zustand": "^5.0.0" + }, + "devDependencies": { + "@types/node": "^22.7.5", + "@types/react": "^18.3.11", + "@types/react-dom": "^18.3.0", + "@vitejs/plugin-react": "^4.3.2", + "autoprefixer": "^10.4.20", + "eslint": "^9.12.0", + "postcss": "^8.4.47", + "tailwindcss": "^3.4.13", + "typescript": "^5.6.3", + "vite": "^5.4.9" + } +} diff --git a/data-layer/console/postcss.config.js b/data-layer/console/postcss.config.js new file mode 100644 index 0000000..2aa7205 --- /dev/null +++ b/data-layer/console/postcss.config.js @@ -0,0 +1,6 @@ +export default { + plugins: { + tailwindcss: {}, + autoprefixer: {}, + }, +}; diff --git a/data-layer/console/public/favicon.svg b/data-layer/console/public/favicon.svg new file mode 100644 index 0000000..4f69b3c --- /dev/null +++ b/data-layer/console/public/favicon.svg @@ -0,0 +1 @@ + diff --git a/data-layer/console/src/App.tsx b/data-layer/console/src/App.tsx new file mode 100644 index 0000000..d30b207 --- /dev/null +++ b/data-layer/console/src/App.tsx @@ -0,0 +1,34 @@ +import { BrowserRouter, Route, Routes } from 'react-router-dom'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { AppShell } from '@/components/AppShell'; +import { ExplorePage } from '@/pages/Explore'; +import { SQLPage } from '@/pages/SQL'; +import { ProfilesPage } from '@/pages/Profiles'; +import { FunnelsPage } from '@/pages/Funnels'; +import { RetentionPage } from '@/pages/Retention'; +import { SegmentsPage } from '@/pages/Segments'; +import { TraitsPage } from '@/pages/Traits'; + +const qc = new QueryClient({ + defaultOptions: { queries: { retry: 1, staleTime: 30_000 } }, +}); + +export function App() { + return ( + + + + }> + } /> + } /> + } /> + } /> + } /> + } /> + } /> + + + + + ); +} diff --git a/data-layer/console/src/api/client.ts b/data-layer/console/src/api/client.ts new file mode 100644 index 0000000..728d207 --- /dev/null +++ b/data-layer/console/src/api/client.ts @@ -0,0 +1,122 @@ +// Thin fetch wrapper for the analytics API. Throws ApiError on non-2xx. + +export class ApiError extends Error { + status: number; + field?: string; + constructor(status: number, message: string, field?: string) { + super(message); + this.status = status; + this.field = field; + } +} + +const BASE = import.meta.env.VITE_ANALYTICS_BASE_URL ?? '/api/analytics'; + +async function request(path: string, workspaceID: string, init?: RequestInit): Promise { + const res = await fetch(`${BASE}${path}`, { + ...init, + headers: { + 'content-type': 'application/json', + 'X-Workspace-Id': workspaceID, + ...(init?.headers ?? {}), + }, + }); + const text = await res.text(); + const data = text ? safeJSON(text) : undefined; + if (!res.ok) { + const msg = (data as { error?: string })?.error ?? res.statusText; + const field = (data as { field?: string })?.field; + throw new ApiError(res.status, msg, field); + } + return data as T; +} + +function safeJSON(text: string): unknown { + try { + return JSON.parse(text); + } catch { + return text; + } +} + +// --------------------------------------------------------------------------- +// Common shapes +// --------------------------------------------------------------------------- + +export interface QueryResult { + columns: string[]; + rows: unknown[][]; + row_count: number; + duration_ms: number; + cache_hit: boolean; + meta?: Record; +} + +export interface Profile { + id: string; + workspace_id: string; + user_id?: string; + anonymous_ids?: string[]; + traits?: Record; + first_seen_at: string; + last_seen_at: string; +} + +export interface SavedQuery { + id: string; + workspace_id: string; + owner_id?: string; + name: string; + kind: 'events' | 'sql' | 'funnel' | 'retention' | 'session'; + spec: Record; + created_at: string; + updated_at: string; +} + +// --------------------------------------------------------------------------- +// Endpoints (current workspace passed by each caller) +// --------------------------------------------------------------------------- + +export const analytics = (workspaceID: string) => ({ + health: () => request<{ status: string }>('/health', workspaceID), + ready: () => request<{ status: string }>('/ready', workspaceID), + + queryEvents: (body: { + table: 'events_track' | 'events_identify' | 'events_page' | 'events_group'; + from: string; to: string; + user_id?: string; anonymous_id?: string; event?: string; + limit?: number; offset?: number; + }) => request('/query/events', workspaceID, { method: 'POST', body: JSON.stringify(body) }), + + querySQL: (body: { sql: string }) => + request('/query/sql', workspaceID, { method: 'POST', body: JSON.stringify(body) }), + + queryFunnel: (body: { steps: string[]; from: string; to: string; window_seconds: number }) => + request('/query/funnel', workspaceID, { method: 'POST', body: JSON.stringify(body) }), + + queryRetention: (body: { + initial_event: string; return_event: string; + from: string; to: string; periods?: number; + }) => request('/query/retention', workspaceID, { method: 'POST', body: JSON.stringify(body) }), + + querySession: (body: { + from: string; to: string; + timeout_seconds?: number; user_id?: string; + limit?: number; offset?: number; + }) => request('/query/session', workspaceID, { method: 'POST', body: JSON.stringify(body) }), + + getProfile: (id: string) => + request(`/profiles/${id}`, workspaceID), + + getProfileTimeline: (id: string, limit = 100, offset = 0) => + request(`/profiles/${id}/events?limit=${limit}&offset=${offset}`, workspaceID), + + listSavedQueries: () => + request<{ items: SavedQuery[]; limit: number; offset: number }>('/queries', workspaceID), + + createSavedQuery: (body: { name: string; kind: SavedQuery['kind']; spec: Record }) => + request('/queries', workspaceID, { method: 'POST', body: JSON.stringify(body) }), + + deleteSavedQuery: (id: string) => + request(`/queries/${id}`, workspaceID, { method: 'DELETE' }), +}); diff --git a/data-layer/console/src/components/AppShell.tsx b/data-layer/console/src/components/AppShell.tsx new file mode 100644 index 0000000..9be0634 --- /dev/null +++ b/data-layer/console/src/components/AppShell.tsx @@ -0,0 +1,51 @@ +import { NavLink, Outlet } from 'react-router-dom'; +import { + Activity, Code2, LineChart, Search, Settings, Tags, Users, +} from 'lucide-react'; +import { cn } from '@/lib/utils'; + +const nav = [ + { to: '/', label: 'Explore', icon: Search }, + { to: '/sql', label: 'Custom SQL', icon: Code2 }, + { to: '/profiles', label: 'Profiles', icon: Users }, + { to: '/funnels', label: 'Funnels', icon: LineChart }, + { to: '/retention', label: 'Retention', icon: Activity }, + { to: '/segments', label: 'Segments', icon: Tags }, + { to: '/traits', label: 'Traits', icon: Settings }, +]; + +export function AppShell() { + return ( +
+ +
+ +
+
+ ); +} diff --git a/data-layer/console/src/components/ui/badge.tsx b/data-layer/console/src/components/ui/badge.tsx new file mode 100644 index 0000000..2737e27 --- /dev/null +++ b/data-layer/console/src/components/ui/badge.tsx @@ -0,0 +1,25 @@ +import * as React from 'react'; +import { cva, type VariantProps } from 'class-variance-authority'; +import { cn } from '@/lib/utils'; + +const badgeVariants = cva( + 'inline-flex items-center rounded-full border px-2.5 py-0.5 text-xs font-semibold transition-colors', + { + variants: { + variant: { + default: 'border-transparent bg-primary text-primary-foreground', + secondary: 'border-transparent bg-muted text-foreground', + destructive: 'border-transparent bg-destructive text-destructive-foreground', + outline: 'text-foreground', + success: 'border-transparent bg-emerald-500 text-white', + }, + }, + defaultVariants: { variant: 'default' }, + }, +); + +export interface BadgeProps extends React.HTMLAttributes, VariantProps {} + +export function Badge({ className, variant, ...props }: BadgeProps) { + return
; +} diff --git a/data-layer/console/src/components/ui/button.tsx b/data-layer/console/src/components/ui/button.tsx new file mode 100644 index 0000000..ed52fb8 --- /dev/null +++ b/data-layer/console/src/components/ui/button.tsx @@ -0,0 +1,48 @@ +import * as React from 'react'; +import { Slot } from '@radix-ui/react-slot'; +import { cva, type VariantProps } from 'class-variance-authority'; +import { cn } from '@/lib/utils'; + +const buttonVariants = cva( + 'inline-flex items-center justify-center whitespace-nowrap rounded-md text-sm font-medium ring-offset-background transition-colors focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 disabled:pointer-events-none disabled:opacity-50', + { + variants: { + variant: { + default: 'bg-primary text-primary-foreground hover:bg-primary/90', + destructive: 'bg-destructive text-destructive-foreground hover:bg-destructive/90', + outline: 'border border-input bg-background hover:bg-accent hover:text-accent-foreground', + ghost: 'hover:bg-accent hover:text-accent-foreground', + link: 'text-primary underline-offset-4 hover:underline', + }, + size: { + default: 'h-10 px-4 py-2', + sm: 'h-9 rounded-md px-3', + lg: 'h-11 rounded-md px-8', + icon: 'h-10 w-10', + }, + }, + defaultVariants: { variant: 'default', size: 'default' }, + }, +); + +export interface ButtonProps + extends React.ButtonHTMLAttributes, + VariantProps { + asChild?: boolean; +} + +export const Button = React.forwardRef( + ({ className, variant, size, asChild = false, ...props }, ref) => { + const Comp = asChild ? Slot : 'button'; + return ( + + ); + }, +); +Button.displayName = 'Button'; + +export { buttonVariants }; diff --git a/data-layer/console/src/components/ui/card.tsx b/data-layer/console/src/components/ui/card.tsx new file mode 100644 index 0000000..4093660 --- /dev/null +++ b/data-layer/console/src/components/ui/card.tsx @@ -0,0 +1,44 @@ +import * as React from 'react'; +import { cn } from '@/lib/utils'; + +export const Card = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +Card.displayName = 'Card'; + +export const CardHeader = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardHeader.displayName = 'CardHeader'; + +export const CardTitle = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardTitle.displayName = 'CardTitle'; + +export const CardDescription = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardDescription.displayName = 'CardDescription'; + +export const CardContent = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardContent.displayName = 'CardContent'; + +export const CardFooter = React.forwardRef>( + ({ className, ...props }, ref) => ( +
+ ), +); +CardFooter.displayName = 'CardFooter'; diff --git a/data-layer/console/src/components/ui/input.tsx b/data-layer/console/src/components/ui/input.tsx new file mode 100644 index 0000000..0963199 --- /dev/null +++ b/data-layer/console/src/components/ui/input.tsx @@ -0,0 +1,19 @@ +import * as React from 'react'; +import { cn } from '@/lib/utils'; + +export type InputProps = React.InputHTMLAttributes; + +export const Input = React.forwardRef( + ({ className, type, ...props }, ref) => ( + + ), +); +Input.displayName = 'Input'; diff --git a/data-layer/console/src/hooks/.gitkeep b/data-layer/console/src/hooks/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/data-layer/console/src/index.css b/data-layer/console/src/index.css new file mode 100644 index 0000000..84cc7eb --- /dev/null +++ b/data-layer/console/src/index.css @@ -0,0 +1,46 @@ +@tailwind base; +@tailwind components; +@tailwind utilities; + +@layer base { + :root { + --background: 0 0% 100%; + --foreground: 222.2 84% 4.9%; + --card: 0 0% 100%; + --card-foreground: 222.2 84% 4.9%; + --primary: 222.2 47.4% 11.2%; + --primary-foreground: 210 40% 98%; + --muted: 210 40% 96.1%; + --muted-foreground: 215.4 16.3% 46.9%; + --accent: 210 40% 96.1%; + --accent-foreground: 222.2 47.4% 11.2%; + --destructive: 0 84.2% 60.2%; + --destructive-foreground: 210 40% 98%; + --border: 214.3 31.8% 91.4%; + --input: 214.3 31.8% 91.4%; + --ring: 222.2 84% 4.9%; + --radius: 0.5rem; + } + .dark { + --background: 222.2 84% 4.9%; + --foreground: 210 40% 98%; + --card: 222.2 84% 4.9%; + --card-foreground: 210 40% 98%; + --primary: 210 40% 98%; + --primary-foreground: 222.2 47.4% 11.2%; + --muted: 217.2 32.6% 17.5%; + --muted-foreground: 215 20.2% 65.1%; + --accent: 217.2 32.6% 17.5%; + --accent-foreground: 210 40% 98%; + --destructive: 0 62.8% 30.6%; + --destructive-foreground: 210 40% 98%; + --border: 217.2 32.6% 17.5%; + --input: 217.2 32.6% 17.5%; + --ring: 212.7 26.8% 83.9%; + } +} + +@layer base { + * { @apply border-border; } + body { @apply bg-background text-foreground; } +} diff --git a/data-layer/console/src/lib/utils.ts b/data-layer/console/src/lib/utils.ts new file mode 100644 index 0000000..9ad0df4 --- /dev/null +++ b/data-layer/console/src/lib/utils.ts @@ -0,0 +1,6 @@ +import { type ClassValue, clsx } from 'clsx'; +import { twMerge } from 'tailwind-merge'; + +export function cn(...inputs: ClassValue[]) { + return twMerge(clsx(inputs)); +} diff --git a/data-layer/console/src/main.tsx b/data-layer/console/src/main.tsx new file mode 100644 index 0000000..b2bc486 --- /dev/null +++ b/data-layer/console/src/main.tsx @@ -0,0 +1,10 @@ +import React from 'react'; +import ReactDOM from 'react-dom/client'; +import { App } from './App'; +import './index.css'; + +ReactDOM.createRoot(document.getElementById('root')!).render( + + + , +); diff --git a/data-layer/console/src/pages/Explore.tsx b/data-layer/console/src/pages/Explore.tsx new file mode 100644 index 0000000..9eb4a46 --- /dev/null +++ b/data-layer/console/src/pages/Explore.tsx @@ -0,0 +1,139 @@ +import { useState } from 'react'; +import { useMutation } from '@tanstack/react-query'; +import { analytics, ApiError, type QueryResult } from '@/api/client'; +import { useWorkspace } from '@/stores/workspace'; +import { Button } from '@/components/ui/button'; +import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; +import { Input } from '@/components/ui/input'; +import { Badge } from '@/components/ui/badge'; + +type Table = 'events_track' | 'events_identify' | 'events_page' | 'events_group'; + +const TABLE_OPTIONS: Table[] = ['events_track', 'events_identify', 'events_page', 'events_group']; + +function isoNow(offsetHours = 0) { + return new Date(Date.now() + offsetHours * 3_600_000).toISOString(); +} + +export function ExplorePage() { + const workspace = useWorkspace((s) => s.currentWorkspace); + const [table, setTable] = useState('events_track'); + const [from, setFrom] = useState(isoNow(-24)); + const [to, setTo] = useState(isoNow(0)); + const [userID, setUserID] = useState(''); + const [eventName, setEventName] = useState(''); + const [limit, setLimit] = useState(100); + + const mutation = useMutation({ + mutationFn: () => + analytics(workspace).queryEvents({ + table, + from, to, + user_id: userID || undefined, + event: eventName || undefined, + limit, + }), + }); + + return ( +
+
+

Event Explorer

+

+ Filter raw events. Backed by POST /query/events. +

+
+ + + Filter + + + + + + + {table === 'events_track' && ( + + )} +
+ +
+
+
+ + {mutation.error && ( + + Error + + {mutation.error.status}{' '} + {mutation.error.message} + + + )} + + {mutation.data && } +
+ ); +} + +function ResultsTable({ result }: { result: QueryResult }) { + return ( + + + + Results + + {result.row_count} rows · {result.duration_ms} ms + {result.cache_hit ? ' · cache hit' : ''} + + + + +
+ + + {result.columns.map((c) => )} + + + + {result.rows.map((row, ri) => ( + + {row.map((cell, ci) => ( + + ))} + + ))} + +
{c}
+ {typeof cell === 'object' ? JSON.stringify(cell) : String(cell)} +
+ + + ); +} diff --git a/data-layer/console/src/pages/Funnels.tsx b/data-layer/console/src/pages/Funnels.tsx new file mode 100644 index 0000000..69b54d1 --- /dev/null +++ b/data-layer/console/src/pages/Funnels.tsx @@ -0,0 +1,10 @@ +export function FunnelsPage() { + return ( +
+

Funnels

+

+ Multi-step conversion analysis. +

+
+ ); +} diff --git a/data-layer/console/src/pages/Profiles.tsx b/data-layer/console/src/pages/Profiles.tsx new file mode 100644 index 0000000..89ef41d --- /dev/null +++ b/data-layer/console/src/pages/Profiles.tsx @@ -0,0 +1,10 @@ +export function ProfilesPage() { + return ( +
+

Profiles

+

+ Unified profile lookup and event timeline. +

+
+ ); +} diff --git a/data-layer/console/src/pages/Retention.tsx b/data-layer/console/src/pages/Retention.tsx new file mode 100644 index 0000000..091bc17 --- /dev/null +++ b/data-layer/console/src/pages/Retention.tsx @@ -0,0 +1,10 @@ +export function RetentionPage() { + return ( +
+

Retention

+

+ Cohort retention curves. +

+
+ ); +} diff --git a/data-layer/console/src/pages/SQL.tsx b/data-layer/console/src/pages/SQL.tsx new file mode 100644 index 0000000..90409fd --- /dev/null +++ b/data-layer/console/src/pages/SQL.tsx @@ -0,0 +1,85 @@ +import { useState } from 'react'; +import { useMutation } from '@tanstack/react-query'; +import { analytics, ApiError, type QueryResult } from '@/api/client'; +import { useWorkspace } from '@/stores/workspace'; +import { Button } from '@/components/ui/button'; +import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; +import { Badge } from '@/components/ui/badge'; + +export function SQLPage() { + const workspace = useWorkspace((s) => s.currentWorkspace); + const [sql, setSQL] = useState('SELECT count() FROM events_track'); + + const mutation = useMutation({ + mutationFn: () => analytics(workspace).querySQL({ sql }), + }); + + return ( +
+
+

Custom SQL

+

+ SELECT-only. Runs as the analytics_ro ClickHouse user. +

+
+ + + Query + +