From b40568dd30e7b804b0e78138140e6d104574509d Mon Sep 17 00:00:00 2001 From: renolation Date: Mon, 25 May 2026 13:38:20 +0700 Subject: [PATCH] ok --- .../api/internal/repo/analytics_repo.go | 20 +- data-layer/api/internal/repo/event_repo.go | 31 +- data-layer/console/src/pages/Dashboard.tsx | 405 ++++++++++-------- data-layer/console/src/pages/Retention.tsx | 334 ++++++++++++++- .../clickhouse/retention_cohort.sql.tmpl | 48 ++- ingestion/tests/k6/cohort.js | 211 +++++++++ ingestion/tests/k6/track.js | 158 +++++-- 7 files changed, 948 insertions(+), 259 deletions(-) create mode 100644 ingestion/tests/k6/cohort.js diff --git a/data-layer/api/internal/repo/analytics_repo.go b/data-layer/api/internal/repo/analytics_repo.go index af5e611..f25be14 100644 --- a/data-layer/api/internal/repo/analytics_repo.go +++ b/data-layer/api/internal/repo/analytics_repo.go @@ -60,9 +60,9 @@ func (r *AnalyticsRepo) Funnel(ctx context.Context, q FunnelQuery) (*model.Query args := []any{ clickhouse.Named("workspace_id", q.WorkspaceID), - clickhouse.DateNamed("from", q.From, clickhouse.MilliSeconds), - clickhouse.DateNamed("to", q.To, clickhouse.MilliSeconds), - clickhouse.Named("window_seconds", q.WindowSeconds), + clickhouse.Named("from", chTime(q.From)), + clickhouse.Named("to", chTime(q.To)), + clickhouse.Named("window_seconds", chUint(uint64(q.WindowSeconds))), } for i, name := range q.Steps { args = append(args, clickhouse.Named(fmt.Sprintf("step%d", i), name)) @@ -112,8 +112,8 @@ func (r *AnalyticsRepo) Retention(ctx context.Context, q RetentionQuery) (*model rows, err := r.ch.Query(ctx, sql, clickhouse.Named("workspace_id", q.WorkspaceID), - clickhouse.DateNamed("from", q.From, clickhouse.MilliSeconds), - clickhouse.DateNamed("to", q.To, clickhouse.MilliSeconds), + clickhouse.Named("from", chTime(q.From)), + clickhouse.Named("to", chTime(q.To)), clickhouse.Named("initial_event", q.InitialEvent), clickhouse.Named("return_event", q.ReturnEvent), ) @@ -148,11 +148,11 @@ func (r *AnalyticsRepo) Sessions(ctx context.Context, q SessionQuery) (*model.Qu args := []any{ clickhouse.Named("workspace_id", q.WorkspaceID), - clickhouse.DateNamed("from", q.From, clickhouse.MilliSeconds), - clickhouse.DateNamed("to", q.To, clickhouse.MilliSeconds), - clickhouse.Named("timeout_seconds", q.TimeoutSeconds), - clickhouse.Named("limit", uint32(q.Limit)), - clickhouse.Named("offset", uint32(q.Offset)), + clickhouse.Named("from", chTime(q.From)), + clickhouse.Named("to", chTime(q.To)), + clickhouse.Named("timeout_seconds", chUint(uint64(q.TimeoutSeconds))), + clickhouse.Named("limit", chUint(uint64(q.Limit))), + clickhouse.Named("offset", chUint(uint64(q.Offset))), } if q.UserID != "" { args = append(args, clickhouse.Named("user_id", q.UserID)) diff --git a/data-layer/api/internal/repo/event_repo.go b/data-layer/api/internal/repo/event_repo.go index 0e3f589..e8c29e7 100644 --- a/data-layer/api/internal/repo/event_repo.go +++ b/data-layer/api/internal/repo/event_repo.go @@ -3,6 +3,8 @@ package repo import ( "context" "fmt" + "strconv" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -11,6 +13,18 @@ import ( "github.com/dbiz/cdp/data-layer/api/internal/templates" ) +// chTime formats a Go time.Time for ClickHouse server-side query parameters. +// clickhouse-go v2 routes args declared via {name:Type} syntax through the +// server-side parameter protocol, which only accepts string values -- typed +// helpers like clickhouse.DateNamed fail with +// "expected string value in NamedValue for query parameter". +// We emit the format ClickHouse parses for DateTime64(3,'UTC'). +func chTime(t time.Time) string { + return t.UTC().Format("2006-01-02 15:04:05.000") +} + +func chUint(n uint64) string { return strconv.FormatUint(n, 10) } + type EventRepo struct { ch driver.Conn tpl *templates.Store @@ -39,10 +53,10 @@ func (r *EventRepo) QueryEvents(ctx context.Context, q model.EventQuery) (*model args := []any{ clickhouse.Named("workspace_id", q.WorkspaceID), - clickhouse.DateNamed("from", q.From, clickhouse.MilliSeconds), - clickhouse.DateNamed("to", q.To, clickhouse.MilliSeconds), - clickhouse.Named("limit", uint32(q.Limit)), - clickhouse.Named("offset", uint32(q.Offset)), + clickhouse.Named("from", chTime(q.From)), + clickhouse.Named("to", chTime(q.To)), + clickhouse.Named("limit", chUint(uint64(q.Limit))), + clickhouse.Named("offset", chUint(uint64(q.Offset))), } if q.UserID != "" { args = append(args, clickhouse.Named("user_id", q.UserID)) @@ -73,8 +87,8 @@ func (r *EventRepo) QueryProfileTimeline(ctx context.Context, workspaceID, userI rows, err := r.ch.Query(ctx, sql, clickhouse.Named("workspace_id", workspaceID), clickhouse.Named("user_id", userID), - clickhouse.Named("limit", uint32(limit)), - clickhouse.Named("offset", uint32(offset)), + clickhouse.Named("limit", chUint(uint64(limit))), + clickhouse.Named("offset", chUint(uint64(offset))), ) if err != nil { return nil, fmt.Errorf("clickhouse query: %w", err) @@ -146,7 +160,8 @@ func newScanTarget(typeName string) any { var v bool return &v case "time.Time": - return new(any) // let driver fill, deref below handles it + var v time.Time + return &v case "map[string]string": var v map[string]string return &v @@ -186,6 +201,8 @@ func derefScanTarget(p any) any { return *v case *[]string: return *v + case *time.Time: + return *v case *any: return *v default: diff --git a/data-layer/console/src/pages/Dashboard.tsx b/data-layer/console/src/pages/Dashboard.tsx index 9d8e297..a7243f5 100644 --- a/data-layer/console/src/pages/Dashboard.tsx +++ b/data-layer/console/src/pages/Dashboard.tsx @@ -1,94 +1,102 @@ import { useQuery } from '@tanstack/react-query'; import { - Area, AreaChart, CartesianGrid, ResponsiveContainer, Tooltip, XAxis, YAxis, + Bar, BarChart, CartesianGrid, Cell, ResponsiveContainer, Tooltip, XAxis, YAxis, } from 'recharts'; import { analytics, type QueryResult } from '@/api/client'; import { useWorkspace } from '@/stores/workspace'; import { Card, CardContent, CardHeader, CardTitle, CardDescription } from '@/components/ui/card'; -import { Badge } from '@/components/ui/badge'; /** - * Dashboard runs a handful of canned SELECTs against ClickHouse via - * POST /query/sql and renders them. Every card refreshes every 10 seconds. + * Business overview for the e-commerce demo data (Product Viewed / + * Product Added / Order Completed). Auto-refreshes every 15 s. * - * We bias toward small queries with explicit time bounds so the workspace's - * Redis cache rejects them and the dashboard stays live. + * Pure ClickHouse queries against analytics.events_track via POST /query/sql. */ -const REFRESH_MS = 10_000; +const REFRESH_MS = 15_000; +const WINDOW = '1 HOUR'; // --------------------------------------------------------------------------- -// SQL bundles +// SQL // --------------------------------------------------------------------------- const SQL = { - totalLastHour: ` - SELECT count() AS total + // Revenue, orders, AOV all from the same row to keep network round-trips down. + orderKpis: ` + SELECT + countIf(event = 'Order Completed') AS orders, + sumIf(toFloat64OrZero(properties['total']), event = 'Order Completed') AS revenue, + avgIf(toFloat64OrZero(properties['total']), event = 'Order Completed') AS aov FROM analytics.events_track - WHERE received_at >= now() - INTERVAL 1 HOUR`, + WHERE received_at >= now() - INTERVAL ${WINDOW}`, - uniqueUsersLastHour: ` - SELECT uniqExact(user_id) AS uniq_users + activeCustomers: ` + SELECT uniqExact(user_id) AS active FROM analytics.events_track - WHERE received_at >= now() - INTERVAL 1 HOUR + WHERE received_at >= now() - INTERVAL ${WINDOW} AND user_id != ''`, - uniqueAnonLastHour: ` - SELECT uniqExact(anonymous_id) AS uniq_anon - FROM analytics.events_track - WHERE received_at >= now() - INTERVAL 1 HOUR - AND anonymous_id != ''`, - - // Last 5 minutes, grouped by second. ClickHouse fills gaps via WITH FILL. - throughputPerSecond: ` + // Funnel counts for the canonical 3-step e-commerce funnel. + funnel: ` SELECT - toStartOfSecond(received_at) AS sec, - count() AS events + countIf(event = 'Product Viewed') AS viewed, + countIf(event = 'Product Added') AS added, + countIf(event = 'Order Completed') AS completed FROM analytics.events_track - WHERE received_at >= now() - INTERVAL 5 MINUTE - GROUP BY sec - ORDER BY sec WITH FILL STEP INTERVAL 1 SECOND`, + WHERE received_at >= now() - INTERVAL ${WINDOW}`, - topEvents: ` + // Top products by views, with the matching "added to cart" count for context. + topProducts: ` SELECT - event AS name, - count() AS events + properties['name'] AS product, + properties['category'] AS category, + countIf(event = 'Product Viewed') AS views, + countIf(event = 'Product Added') AS added, + round( + if(countIf(event = 'Product Viewed') > 0, + countIf(event = 'Product Added') / countIf(event = 'Product Viewed'), + 0 + ) * 100, 1 + ) AS add_rate_pct FROM analytics.events_track - WHERE received_at >= now() - INTERVAL 1 HOUR - AND event != '' - GROUP BY name - ORDER BY events DESC + WHERE event IN ('Product Viewed', 'Product Added') + AND received_at >= now() - INTERVAL ${WINDOW} + AND properties['name'] != '' + GROUP BY product, category + ORDER BY views DESC LIMIT 10`, - latencyPercentiles: ` - SELECT - quantile(0.5)(dateDiff('millisecond', sent_at, received_at)) AS p50_ms, - quantile(0.95)(dateDiff('millisecond', sent_at, received_at)) AS p95_ms, - quantile(0.99)(dateDiff('millisecond', sent_at, received_at)) AS p99_ms, - max(dateDiff('millisecond', sent_at, received_at)) AS max_ms - FROM analytics.events_track - WHERE received_at >= now() - INTERVAL 1 HOUR - AND sent_at > toDateTime64('1971-01-01', 3)`, - - recent: ` + recentOrders: ` SELECT received_at, - event, user_id, - anonymous_id, - message_id + properties['order_id'] AS order_id, + toFloat64OrZero(properties['total']) AS total, + properties['currency'] AS currency, + length(JSONExtractArrayRaw(properties['products'])) AS line_items FROM analytics.events_track + WHERE event = 'Order Completed' ORDER BY received_at DESC - LIMIT 20`, + LIMIT 10`, - dlqLastHour: ` - SELECT count() AS failed - FROM analytics.events_dlq - WHERE received_at >= now() - INTERVAL 1 HOUR`, + // Customers ranked by spend in the window. + topCustomers: ` + SELECT + user_id, + anyHeavy(traits['email']) AS email, + anyHeavy(traits['plan']) AS plan, + countIf(event = 'Order Completed') AS orders, + sumIf(toFloat64OrZero(properties['total']), event = 'Order Completed') AS revenue + FROM analytics.events_track + WHERE received_at >= now() - INTERVAL ${WINDOW} + AND user_id != '' + GROUP BY user_id + ORDER BY revenue DESC + LIMIT 5`, }; // --------------------------------------------------------------------------- -// Hooks +// Hooks / helpers // --------------------------------------------------------------------------- function useSQL(key: string, sql: string) { @@ -101,108 +109,168 @@ function useSQL(key: string, sql: string) { }); } -// Convenience: read a single scalar cell out of a 1-row query. function scalar(res: QueryResult | undefined, col = 0, fallback: T = 0 as T): T { if (!res || res.rows.length === 0) return fallback; - const v = res.rows[0][col]; - return (v ?? fallback) as T; + return (res.rows[0][col] ?? fallback) as T; } +const fmtNumber = (n: number) => + n >= 1_000_000 ? (n / 1_000_000).toFixed(1) + 'M' + : n >= 1_000 ? (n / 1_000).toFixed(1) + 'K' + : new Intl.NumberFormat().format(Math.round(n)); + +const fmtMoney = (n: number, currency = 'USD') => + new Intl.NumberFormat('en-US', { style: 'currency', currency, maximumFractionDigits: 2 }).format(n); + // --------------------------------------------------------------------------- // Page // --------------------------------------------------------------------------- export function DashboardPage() { - const total = useSQL('total', SQL.totalLastHour); - const users = useSQL('users', SQL.uniqueUsersLastHour); - const anon = useSQL('anon', SQL.uniqueAnonLastHour); - const dlq = useSQL('dlq', SQL.dlqLastHour); - const tput = useSQL('tput', SQL.throughputPerSecond); - const top = useSQL('top', SQL.topEvents); - const lat = useSQL('lat', SQL.latencyPercentiles); - const recent = useSQL('recent', SQL.recent); + const orderKpis = useSQL('order_kpis', SQL.orderKpis); + const activeCustomers = useSQL('active', SQL.activeCustomers); + const funnel = useSQL('funnel', SQL.funnel); + const topProducts = useSQL('top_products', SQL.topProducts); + const recentOrders = useSQL('recent_orders', SQL.recentOrders); + const topCustomers = useSQL('top_customers', SQL.topCustomers); - const tputData = (tput.data?.rows ?? []).map(([sec, events]) => ({ - sec: new Date(String(sec)).toLocaleTimeString(), - events: Number(events ?? 0), - })); + const orders = Number(scalar(orderKpis.data, 0)); + const revenue = Number(scalar(orderKpis.data, 1)); + const aov = Number(scalar(orderKpis.data, 2)); + const active = Number(scalar(activeCustomers.data, 0)); + + const funnelRow = funnel.data?.rows[0] ?? [0, 0, 0]; + const viewed = Number(funnelRow[0] ?? 0); + const added = Number(funnelRow[1] ?? 0); + const completed = Number(funnelRow[2] ?? 0); + const funnelData = [ + { stage: 'Product Viewed', count: viewed, pct: 100 }, + { stage: 'Product Added', count: added, pct: viewed > 0 ? (added / viewed) * 100 : 0 }, + { stage: 'Order Completed', count: completed, pct: viewed > 0 ? (completed / viewed) * 100 : 0 }, + ]; return (

Overview

- Live view of analytics.events_track. Auto-refresh every {REFRESH_MS / 1000}s. + Last 1 hour · refreshes every {REFRESH_MS / 1000}s

{/* KPI strip */}
- (total.data))} loading={total.isPending} /> - (users.data))} loading={users.isPending} /> - (anon.data))} loading={anon.isPending} /> - (dlq.data))} - loading={dlq.isPending} - tone={scalar(dlq.data) > 0 ? 'destructive' : 'default'} - /> + + + +
- {/* Throughput chart */} - - - Throughput (events/sec) - Last 5 minutes, bucketed per second. - - - {tput.isPending ? ( - - ) : ( - - - - - - - - - - - - - - - - )} - - - - {/* Two-up row: top events + latency */} + {/* Funnel + Top products */}
- Top events (1h) - Top 10 by count. + Purchase funnel + Product Viewed → Added → Order Completed + + + {funnel.isPending ? ( + + ) : ( + + + + + + + [`${fmtNumber(v)} (${ctx.payload.pct.toFixed(1)}%)`, 'count'] + } + /> + + {funnelData.map((_, i) => ( + + ))} + + + + )} + + + + + + Top products + Most-viewed, with add-to-cart rate. - {top.isPending ? ( + {topProducts.isPending ? ( ) : ( - - + + + + - {(top.data?.rows ?? []).map(([name, n], i) => ( + {(topProducts.data?.rows ?? []).map(([product, _cat, views, added, rate], i) => ( - - + + + + ))} + {(topProducts.data?.rows ?? []).length === 0 && ( + + )} + +
eventcountproductviewsaddedrate
{String(name)}{fmt(Number(n))}{String(product)}{fmtNumber(Number(views))}{fmtNumber(Number(added))}{Number(rate).toFixed(1)}%
— no product events yet —
+ )} +
+
+
+ + {/* Top customers + Recent orders */} +
+ + + Top customers + Ranked by revenue in the window. + + + {topCustomers.isPending ? ( + + ) : ( + + + + + + + + + + + {(topCustomers.data?.rows ?? []).map(([uid, email, plan, count, rev], i) => ( + + + + + + + ))} + {(topCustomers.data?.rows ?? []).length === 0 && ( + + )}
customerplanordersrevenue
+
{String(email || uid)}
+
{String(uid)}
+
{String(plan ?? '')}{fmtNumber(Number(count))}{fmtMoney(Number(rev))}
— no customer activity yet —
)} @@ -211,53 +279,42 @@ export function DashboardPage() { - Sent → received latency (1h) - Client clock vs server clock, milliseconds. + Recent orders + Last 10 completed orders. - - (lat.data, 0)} loading={lat.isPending} /> - (lat.data, 1)} loading={lat.isPending} /> - (lat.data, 2)} loading={lat.isPending} /> - (lat.data, 3)} loading={lat.isPending} /> + + {recentOrders.isPending ? ( + + ) : ( + + + + + + + + + + + {(recentOrders.data?.rows ?? []).map(([ts, uid, _orderId, total, currency, items], i) => ( + + + + + + + ))} + {(recentOrders.data?.rows ?? []).length === 0 && ( + + )} + +
whencustomeritemstotal
+ {new Date(String(ts)).toLocaleTimeString()} + {String(uid)}{fmtNumber(Number(items))}{fmtMoney(Number(total), String(currency || 'USD'))}
— no orders yet —
+ )}
- - {/* Recent events table */} - - - Recent events - 20 most recent across the whole workspace. - - - {recent.isPending ? ( - - ) : ( - - - - - - - - - - - - {(recent.data?.rows ?? []).map((row, i) => ( - - - - - - - - ))} - -
received_ateventuser_idanonymous_idmessage_id
{String(row[0])}{String(row[1])}{String(row[2] ?? '')}{String(row[3] ?? '')}{String(row[4])}
- )} -
-
); } @@ -266,9 +323,7 @@ export function DashboardPage() { // Small components // --------------------------------------------------------------------------- -function Kpi({ - title, value, loading, tone = 'default', -}: { title: string; value: string; loading: boolean; tone?: 'default' | 'destructive' }) { +function Kpi({ title, value, loading }: { title: string; value: string; loading: boolean }) { return ( @@ -276,24 +331,13 @@ function Kpi({ {loading - ?
- :
{value}
} + ?
+ :
{value}
} ); } -function LatStat({ label, v, loading }: { label: string; v: number; loading: boolean }) { - return ( -
-
{label}
- {loading - ?
- :
{Number(v).toFixed(1)} ms
} -
- ); -} - function Skeleton() { return (
@@ -303,12 +347,3 @@ function Skeleton() {
); } - -function fmt(n: number) { - if (n >= 1_000_000) return (n / 1_000_000).toFixed(1) + 'M'; - if (n >= 1_000) return (n / 1_000).toFixed(1) + 'K'; - return new Intl.NumberFormat().format(n); -} - -// keep one Badge import alive for future use -void Badge; diff --git a/data-layer/console/src/pages/Retention.tsx b/data-layer/console/src/pages/Retention.tsx index 091bc17..3f16dad 100644 --- a/data-layer/console/src/pages/Retention.tsx +++ b/data-layer/console/src/pages/Retention.tsx @@ -1,10 +1,334 @@ +import { useEffect, useState } from 'react'; +import { useMutation } from '@tanstack/react-query'; +import { + Activity, BookmarkPlus, Repeat, ShoppingCart, Sparkles, TrendingUp, Wand2, +} from 'lucide-react'; +import { analytics, ApiError, type QueryResult } from '@/api/client'; +import { useWorkspace } from '@/stores/workspace'; +import { Button } from '@/components/ui/button'; +import { Input } from '@/components/ui/input'; +import { Card, CardContent, CardHeader, CardTitle, CardDescription } from '@/components/ui/card'; +import { Badge } from '@/components/ui/badge'; +import { cn } from '@/lib/utils'; + +/** + * Cohort retention with pre-baked templates (PostHog-style). + * + * Users pick a template card -> form auto-fills and a query fires. Power + * users can still tweak the form below before re-running. A future "Custom + * builder" will replace the raw form with a typed expression UI. + */ + +const DEFAULT_PERIODS = 7; + +// --------------------------------------------------------------------------- +// Templates +// --------------------------------------------------------------------------- + +interface Template { + id: string; + name: string; + description: string; + icon: typeof Sparkles; + initial_event: string; + return_event: string; + periods: number; +} + +const TEMPLATES: Template[] = [ + { + id: 'engaged-browsers', + name: 'Engaged browsers', + description: 'Of users who viewed a product, how many come back to browse on day N.', + icon: Activity, + initial_event: 'Product Viewed', + return_event: 'Product Viewed', + periods: 7, + }, + { + id: 'cart-to-purchase', + name: 'Cart → purchase', + description: 'Users who added to cart, then completed an order on day N. Conversion proxy.', + icon: ShoppingCart, + initial_event: 'Product Added', + return_event: 'Order Completed', + periods: 7, + }, + { + id: 'repeat-buyers', + name: 'Repeat buyers', + description: 'Of users who completed an order, how many bought again on day N. Loyalty.', + icon: Repeat, + initial_event: 'Order Completed', + return_event: 'Order Completed', + periods: 14, + }, + { + id: 'post-purchase-browse', + name: 'Post-purchase browsing', + description: 'After buying, do customers come back to browse? Engagement after revenue.', + icon: TrendingUp, + initial_event: 'Order Completed', + return_event: 'Product Viewed', + periods: 7, + }, + { + id: 're-engagement', + name: 'Re-engagement', + description: 'Of browsers, how many converted to a purchase on day N. Close the loop.', + icon: Sparkles, + initial_event: 'Product Viewed', + return_event: 'Order Completed', + periods: 14, + }, + { + id: 'custom', + name: 'Custom', + description: 'Use the form below to define your own cohort.', + icon: Wand2, + initial_event: 'Product Viewed', + return_event: 'Product Viewed', + periods: DEFAULT_PERIODS, + }, +]; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function isoDate(daysOffset: number): string { + const d = new Date(); + d.setUTCHours(0, 0, 0, 0); + d.setUTCDate(d.getUTCDate() + daysOffset); + return d.toISOString(); +} + +// --------------------------------------------------------------------------- +// Page +// --------------------------------------------------------------------------- + export function RetentionPage() { + const workspace = useWorkspace((s) => s.currentWorkspace); + + const [activeId, setActiveId] = useState(TEMPLATES[0].id); + const [initialEvent, setInitialEvent] = useState(TEMPLATES[0].initial_event); + const [returnEvent, setReturnEvent] = useState(TEMPLATES[0].return_event); + const [periods, setPeriods] = useState(TEMPLATES[0].periods); + const [from, setFrom] = useState(isoDate(-(TEMPLATES[0].periods + 1))); + const [to, setTo] = useState(isoDate(1)); + + const run = useMutation({ + mutationFn: () => + analytics(workspace).queryRetention({ + initial_event: initialEvent, + return_event: returnEvent, + from, to, + periods, + }), + }); + + function applyTemplate(t: Template, shouldRun = true) { + setActiveId(t.id); + setInitialEvent(t.initial_event); + setReturnEvent(t.return_event); + setPeriods(t.periods); + setFrom(isoDate(-(t.periods + 1))); + setTo(isoDate(1)); + if (shouldRun && t.id !== 'custom') { + // Defer so the state above is committed before the request fires. + setTimeout(() => run.mutate(), 0); + } + } + + // Auto-run the first template once on mount. + useEffect(() => { + run.mutate(); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + return ( -
-

Retention

-

- Cohort retention curves. -

+
+
+

Cohort retention

+

+ Pick a template, or tweak the form below for a one-off cohort. +

+
+ + {/* Template gallery */} +
+ {TEMPLATES.map((t) => { + const Icon = t.icon; + const active = t.id === activeId; + return ( + + ); + })} +
+ + {/* Form (always visible; flipping to custom on any field edit) */} + + +
+ Definition + + {activeId === 'custom' ? 'Build your own cohort.' : 'Tweak the selected template.'} + +
+ +
+ + + + + + +
+ +
+
+
+ + {run.error && ( + + Error + + {run.error.status}{' '} + {run.error.message} + + + )} + + {run.data && ( + t.id === activeId)?.name ?? 'Custom cohort'} + initialEvent={initialEvent} + returnEvent={returnEvent} + /> + )}
); } + +// --------------------------------------------------------------------------- +// Heatmap-ish retention matrix +// --------------------------------------------------------------------------- + +function Matrix({ + result, periods, headline, initialEvent, returnEvent, +}: { + result: QueryResult; + periods: number; + headline: string; + initialEvent: string; + returnEvent: string; +}) { + const rows = result.rows ?? []; + + return ( + + + {headline} + + Cohort = day a user first triggered {initialEvent}. + {' '}Cells show the share who triggered {returnEvent} on D+k. + + + + {rows.length === 0 ? ( +
— no cohorts matched the filter —
+ ) : ( + + + + + + {Array.from({ length: periods }).map((_, i) => ( + + ))} + + + + {rows.map((row, i) => { + const cohortDay = String(row[0]).slice(0, 10); + const cohortSize = Number(row[1] ?? 0); + const dCells = row.slice(2, 2 + periods).map(Number); + return ( + + + + {dCells.map((n, di) => { + const pct = cohortSize > 0 ? (n / cohortSize) * 100 : 0; + return ( + + ); + })} + + ); + })} + +
cohort daysizeD{i}
{cohortDay}{cohortSize} 60 ? 'white' : undefined }} + title={`${n} of ${cohortSize}`} + > + {cohortSize === 0 ? '–' : `${pct.toFixed(0)}%`} +
+ )} +
+
+ ); +} + +function heat(pct: number): string { + if (pct <= 0) return 'transparent'; + const alpha = 0.08 + (Math.min(pct, 100) / 100) * 0.87; + return `rgba(59, 130, 246, ${alpha.toFixed(2)})`; +} diff --git a/data-layer/infra/clickhouse/retention_cohort.sql.tmpl b/data-layer/infra/clickhouse/retention_cohort.sql.tmpl index a653bdc..29251f1 100644 --- a/data-layer/infra/clickhouse/retention_cohort.sql.tmpl +++ b/data-layer/infra/clickhouse/retention_cohort.sql.tmpl @@ -1,41 +1,43 @@ -- Retention Cohort -- of users whose first `initial_event` lands on day D, -- what share triggered `return_event` on day D+k for k in 1..Periods. -- --- Required parameters (clickhouse.Named): +-- We compute the cohort day in a CTE first, then LEFT JOIN events_track and +-- count distinct returners per (cohort_day, day_offset). Doing it this way +-- avoids ClickHouse's "aggregate inside another aggregate" restriction that +-- the older retention()-based form ran into. +-- +-- Required parameters (clickhouse.Named, string-valued): -- workspace_id : String --- from : DateTime64(3,'UTC') +-- from : DateTime64(3,'UTC') (formatted as 'YYYY-MM-DD HH:MM:SS.mmm') -- to : DateTime64(3,'UTC') -- initial_event : String -- return_event : String -- -- Template inputs: --- .Outer : []{ RIndex int; OffsetDay int; Last bool } --- One entry per follow-up day. RIndex is the position in the retention() --- output array; OffsetDay is the day delta from the cohort day. -SELECT - cohort_day, - countIf(arrayElement(r, 1)) AS cohort_size, -{{- range $p := .Outer }} - countIf(arrayElement(r, {{ $p.RIndex }})) AS retained_d{{ $p.OffsetDay }}{{ if not $p.Last }},{{ end }} -{{- end }} -FROM ( +-- .Outer : []{ OffsetDay int; Last bool } +WITH cohorts AS ( SELECT user_id, - toDate(min(if(event = {initial_event:String}, timestamp, NULL))) AS cohort_day, - retention( - event = {initial_event:String} AND toDate(timestamp) = cohort_day, -{{- range $p := .Outer }} - event = {return_event:String} AND toDate(timestamp) = addDays(cohort_day, {{ $p.OffsetDay }}){{ if not $p.Last }},{{ end }} -{{- end }} - ) AS r + toDate(min(timestamp)) AS cohort_day FROM events_track WHERE workspace_id = {workspace_id:String} AND received_at >= {from:DateTime64(3,'UTC')} AND received_at < {to:DateTime64(3,'UTC')} AND user_id != '' - AND event IN ({initial_event:String}, {return_event:String}) + AND event = {initial_event:String} GROUP BY user_id - HAVING cohort_day IS NOT NULL ) -GROUP BY cohort_day -ORDER BY cohort_day +SELECT + c.cohort_day AS cohort_day, + uniqExact(c.user_id) AS cohort_size, +{{- range $p := .Outer }} + uniqExactIf(c.user_id, e.event = {return_event:String} AND toDate(e.timestamp) = addDays(c.cohort_day, {{ $p.OffsetDay }})) AS retained_d{{ $p.OffsetDay }}{{ if not $p.Last }},{{ end }} +{{- end }} +FROM cohorts AS c +LEFT JOIN events_track AS e + ON e.workspace_id = {workspace_id:String} + AND e.user_id = c.user_id + AND e.received_at >= {from:DateTime64(3,'UTC')} + AND e.received_at < {to:DateTime64(3,'UTC')} +GROUP BY c.cohort_day +ORDER BY c.cohort_day diff --git a/ingestion/tests/k6/cohort.js b/ingestion/tests/k6/cohort.js new file mode 100644 index 0000000..b751fd9 --- /dev/null +++ b/ingestion/tests/k6/cohort.js @@ -0,0 +1,211 @@ +// k6 generator -- backfills events with timestamps spread over the last N days +// so the analytics console can show a realistic cohort retention matrix. +// +// How it works +// 1. setup() pre-computes a schedule: each of S shoppers gets a "first day" +// between [-N, 0], then revisits later days with a decay probability. +// 2. The default function fires one scheduled event per iteration; we run +// `shared-iterations` so all VUs collaborate to drain the schedule. +// +// We deliberately omit `sentAt` from the payload. The ingest's 24h late-event +// check is on sent_at (default = now when omitted), not on timestamp; so the +// timestamp column lands in the past while the request itself looks live. +// +// Usage: +// k6 run tests/k6/cohort.js +// k6 run -e DAYS=14 -e SHOPPERS=80 -e VUS=10 tests/k6/cohort.js + +import http from 'k6/http'; +import { check } from 'k6'; +import encoding from 'k6/encoding'; +import { randomItem, randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js'; + +const BASE = __ENV.BASE ?? 'http://localhost:3049'; +const WRITE_KEY = __ENV.WRITE_KEY ?? 'cdp_dev_writekey_1234567890'; +const SHOPPERS = parseInt(__ENV.SHOPPERS ?? '50', 10); // unique users +const DAYS = parseInt(__ENV.DAYS ?? '7', 10); // cohort window +const VUS = parseInt(__ENV.VUS ?? '5', 10); + +const AUTH = 'Basic ' + encoding.b64encode(`${WRITE_KEY}:`); + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +const PRODUCTS = [ + { id: 'sku_alpha', name: 'Alpha Hoodie', category: 'apparel', brand: 'CDP', price: 49.0 }, + { id: 'sku_beta', name: 'Beta Mug', category: 'drinkware', brand: 'CDP', price: 12.5 }, + { id: 'sku_gamma', name: 'Gamma Backpack', category: 'bags', brand: 'CDP', price: 89.0 }, + { id: 'sku_delta', name: 'Delta Sneakers', category: 'footwear', brand: 'Athleta', price: 129.0 }, + { id: 'sku_eps', name: 'Epsilon Headset', category: 'electronics',brand: 'Sonix', price: 199.0 }, +]; + +const PLANS = ['free', 'pro', 'team']; +const COUNTRIES = ['US', 'VN', 'GB', 'SG', 'JP', 'DE', 'FR']; + +// --------------------------------------------------------------------------- +// Pre-compute schedule (runs once before any VU starts) +// --------------------------------------------------------------------------- + +function rand(min, max) { return Math.floor(Math.random() * (max - min + 1)) + min; } +function pick(arr) { return arr[Math.floor(Math.random() * arr.length)]; } + +function isoOnDay(daysAgo, hourSeed) { + // anchor at "today 12:00 UTC" minus N days, jitter a few hours so events + // spread across the day rather than clumping at the same minute. + const d = new Date(); + d.setUTCHours(12, 0, 0, 0); + d.setUTCDate(d.getUTCDate() - daysAgo); + d.setUTCHours(d.getUTCHours() + (hourSeed % 9) - 4); + d.setUTCMinutes(d.getUTCMinutes() + rand(0, 59)); + return d.toISOString(); +} + +export function setup() { + const events = []; + for (let i = 1; i <= SHOPPERS; i++) { + const userId = `c_${String(i).padStart(3, '0')}`; + const traits = { + email: `cohort${i}@example.com`, + plan: pick(PLANS), + country: pick(COUNTRIES), + }; + + // first day = day -N .. day -1 (no first-day-today so retention makes sense) + const firstDay = rand(1, DAYS - 1); + + // Day 0 of this user's lifecycle = signup behavior. + push(events, userId, traits, firstDay, 'Product Viewed', null); + if (Math.random() < 0.7) push(events, userId, traits, firstDay, 'Product Added', null); + if (Math.random() < 0.4) push(events, userId, traits, firstDay, 'Order Completed', null); + + // Later days: decaying return probability. + for (let d = firstDay - 1; d >= 0; d--) { + const since = firstDay - d; // days since first + const pReturn = Math.max(0.05, 0.85 - 0.15 * since); // 85% → 5% + if (Math.random() < pReturn) { + push(events, userId, traits, d, 'Product Viewed', null); + if (Math.random() < 0.3) push(events, userId, traits, d, 'Product Added', null); + if (Math.random() < 0.15) push(events, userId, traits, d, 'Order Completed', null); + } + } + } + // Shuffle so partition keys spread evenly. + for (let i = events.length - 1; i > 0; i--) { + const j = Math.floor(Math.random() * (i + 1)); + [events[i], events[j]] = [events[j], events[i]]; + } + return { events }; +} + +function push(events, userId, traits, daysAgo, eventName, _) { + events.push({ + userId, + traits, + daysAgo, + eventName, + seed: events.length, + }); +} + +// --------------------------------------------------------------------------- +// Scenario +// --------------------------------------------------------------------------- + +export const options = { + scenarios: { + backfill: { + executor: 'shared-iterations', + vus: VUS, + // Set this big enough; setup() decides the real cap. We just stop once + // we run out of events (returns early in default()). + iterations: 20_000, + maxDuration: '5m', + }, + }, + thresholds: { + http_req_failed: ['rate<0.02'], + checks: ['rate>0.98'], + }, +}; + +export default function (data) { + const ev = data.events[__ITER]; + if (!ev) return; // schedule exhausted + + const ts = isoOnDay(ev.daysAgo, ev.seed); + const product = randomItem(PRODUCTS); + const messageId = `cohort_${ev.userId}_${ev.seed}_${Date.now()}`; + + let properties; + switch (ev.eventName) { + case 'Product Viewed': + properties = productProps(product); + break; + case 'Product Added': + properties = { ...productProps(product), quantity: randomIntBetween(1, 3) }; + break; + case 'Order Completed': + properties = orderProps(); + break; + } + + const payload = JSON.stringify({ + type: 'track', + messageId, + userId: ev.userId, + anonymousId: `anon_${ev.userId}`, + event: ev.eventName, + properties, + traits: ev.traits, + // event time in the past... + timestamp: ts, + // ...request time intentionally omitted so the ingest's late-event guard + // (which checks sent_at, not timestamp) does not reject us. + context: { + library_name: 'k6-cohort-sim', + library_version: '0.1.0', + ip: '127.0.0.1', + userAgent: 'k6/cohort', + locale: 'en-US', + }, + }); + + const res = http.post(`${BASE}/v1/track`, payload, { + headers: { 'Content-Type': 'application/json', Authorization: AUTH }, + tags: { event: ev.eventName }, + }); + + check(res, { + 'status 200': (r) => r.status === 200, + 'body ok': (r) => r.json('ok') === true, + }); +} + +function productProps(p) { + return { + product_id: p.id, sku: p.id, name: p.name, category: p.category, + brand: p.brand, price: p.price, currency: 'USD', + }; +} + +function orderProps() { + const lines = []; + const n = randomIntBetween(1, 3); + let total = 0; + for (let i = 0; i < n; i++) { + const p = randomItem(PRODUCTS); + const qty = randomIntBetween(1, 2); + total += p.price * qty; + lines.push({ product_id: p.id, sku: p.id, name: p.name, price: p.price, quantity: qty }); + } + return { + order_id: `ord_${Date.now()}_${randomIntBetween(1000, 9999)}`, + revenue: Number(total.toFixed(2)), + currency: 'USD', + tax: Number((total * 0.08).toFixed(2)), + shipping: 5, + total: Number((total + total * 0.08 + 5).toFixed(2)), + products: lines, + }; +} diff --git a/ingestion/tests/k6/track.js b/ingestion/tests/k6/track.js index 79d3970..f1b382b 100644 --- a/ingestion/tests/k6/track.js +++ b/ingestion/tests/k6/track.js @@ -1,69 +1,121 @@ -// k6 load test — POST /v1/track against the local cdp-ingest service. +// k6 load test simulating realistic e-commerce traffic for 5 shoppers. +// +// Each VU plays one shopper with a stable userId + anonymousId. Per iteration +// the shopper fires a random Segment-spec event from a small catalog +// (Product Viewed / Product Added / Order Completed), then "thinks" for a +// short while before the next action. // // Usage: -// brew install k6 # one-time -// k6 run tests/k6/track.js # defaults: 50 CCU, 1m +// brew install k6 +// k6 run tests/k6/track.js // -// Override at the CLI: +// Overrides: // k6 run -e WRITE_KEY=xxx -e BASE=http://localhost:3049 \ -// -e VUS=100 -e DURATION=2m tests/k6/track.js +// -e VUS=5 -e DURATION=2m tests/k6/track.js import http from 'k6/http'; -import { check } from 'k6'; +import { check, sleep } from 'k6'; +import { randomItem, randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js'; import encoding from 'k6/encoding'; -const BASE = __ENV.BASE ?? 'http://localhost:3049'; -const WRITE_KEY = __ENV.WRITE_KEY ?? 'cdp_dev_writekey_1234567890'; -const VUS = parseInt(__ENV.VUS ?? '50', 10); -const DURATION = __ENV.DURATION ?? '1m'; +const BASE = __ENV.BASE ?? 'http://localhost:3049'; +const WRITE_KEY = __ENV.WRITE_KEY ?? 'cdp_dev_writekey_1234567890'; +const VUS = parseInt(__ENV.VUS ?? '5', 10); +const DURATION = __ENV.DURATION ?? '1m'; -// Segment-compatible auth: Basic base64(writeKey + ":") const AUTH = 'Basic ' + encoding.b64encode(`${WRITE_KEY}:`); export const options = { scenarios: { - constant_load: { + shoppers: { executor: 'constant-vus', vus: VUS, duration: DURATION, }, }, thresholds: { - http_req_failed: ['rate<0.01'], // < 1% errors - http_req_duration: ['p(95)<300', 'p(99)<800'], + http_req_failed: ['rate<0.01'], + http_req_duration: ['p(95)<500', 'p(99)<1500'], checks: ['rate>0.99'], }, }; +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +const SHOPPERS = [ + { user_id: 'u_001', email: 'alice@example.com', plan: 'pro', country: 'US' }, + { user_id: 'u_002', email: 'bob@example.com', plan: 'free', country: 'VN' }, + { user_id: 'u_003', email: 'charlie@example.com', plan: 'pro', country: 'GB' }, + { user_id: 'u_004', email: 'dana@example.com', plan: 'free', country: 'SG' }, + { user_id: 'u_005', email: 'eric@example.com', plan: 'team', country: 'JP' }, +]; + +const PRODUCTS = [ + { id: 'sku_alpha', name: 'Alpha Hoodie', category: 'apparel', brand: 'CDP', price: 49.0 }, + { id: 'sku_beta', name: 'Beta Mug', category: 'drinkware', brand: 'CDP', price: 12.5 }, + { id: 'sku_gamma', name: 'Gamma Backpack', category: 'bags', brand: 'CDP', price: 89.0 }, + { id: 'sku_delta', name: 'Delta Sneakers', category: 'footwear', brand: 'Athleta', price: 129.0 }, + { id: 'sku_eps', name: 'Epsilon Headset', category: 'electronics',brand: 'Sonix', price: 199.0 }, +]; + +const EVENT_KINDS = ['Product Viewed', 'Product Added', 'Order Completed']; + +// --------------------------------------------------------------------------- +// Per-iteration +// --------------------------------------------------------------------------- + export default function () { + // Stable per-VU identity. __VU starts at 1. + const shopper = SHOPPERS[(__VU - 1) % SHOPPERS.length]; + const anonymousId = `anon_${shopper.user_id}`; + + const eventName = randomItem(EVENT_KINDS); + let properties; + switch (eventName) { + case 'Product Viewed': + properties = productProps(randomItem(PRODUCTS)); + break; + case 'Product Added': + properties = { + ...productProps(randomItem(PRODUCTS)), + quantity: randomIntBetween(1, 3), + }; + break; + case 'Order Completed': + properties = orderProps(); + break; + } + const now = new Date().toISOString(); - const messageId = `k6_${__VU}_${__ITER}_${Date.now()}`; + const messageId = `k6_${shopper.user_id}_${__ITER}_${Date.now()}`; const payload = JSON.stringify({ type: 'track', messageId, - anonymousId: `anon_${__VU}`, - userId: `user_${__VU}@example.com`, - event: 'k6 Test Event', - properties: { - testProp: 'load test', - vu: __VU, - iter: __ITER, - price: 42.5, + userId: shopper.user_id, + anonymousId, + event: eventName, + properties, + traits: { + email: shopper.email, + plan: shopper.plan, + country: shopper.country, }, timestamp: now, sentAt: now, context: { - library_name: 'k6', - library_version: '0.1.0', + library_name: 'k6-ecommerce-sim', + library_version: '0.2.0', ip: '127.0.0.1', userAgent: 'k6/loadtest', locale: 'en-US', page: { - path: '/', - host: 'example.com', - title: 'Example page', - url: 'https://example.com/', + path: '/checkout', + host: 'shop.example.com', + title: 'Shop', + url: 'https://shop.example.com/checkout', }, }, }); @@ -73,6 +125,7 @@ export default function () { 'Content-Type': 'application/json', Authorization: AUTH, }, + tags: { event: eventName }, }); check(res, { @@ -80,4 +133,51 @@ export default function () { 'body ok': (r) => r.json('ok') === true, 'fast (<500ms)': (r) => r.timings.duration < 500, }); + + // Think time -- a real shopper does not click 100x/sec. + sleep(randomIntBetween(1, 4)); +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function productProps(p) { + return { + product_id: p.id, + sku: p.id, + name: p.name, + category: p.category, + brand: p.brand, + price: p.price, + currency: 'USD', + }; +} + +function orderProps() { + // 1 - 3 line items + const lines = []; + const n = randomIntBetween(1, 3); + let total = 0; + for (let i = 0; i < n; i++) { + const p = randomItem(PRODUCTS); + const qty = randomIntBetween(1, 2); + total += p.price * qty; + lines.push({ + product_id: p.id, + sku: p.id, + name: p.name, + price: p.price, + quantity: qty, + }); + } + return { + order_id: `ord_${Date.now()}_${randomIntBetween(1000, 9999)}`, + revenue: Number(total.toFixed(2)), + currency: 'USD', + tax: Number((total * 0.08).toFixed(2)), + shipping: 5, + total: Number((total + total * 0.08 + 5).toFixed(2)), + products: lines, + }; }