Files
cdp/data-layer/api/internal/repo/event_repo.go
2026-05-25 13:38:20 +07:00

212 lines
5.2 KiB
Go

package repo
import (
"context"
"fmt"
"strconv"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/dbiz/cdp/data-layer/api/internal/model"
"github.com/dbiz/cdp/data-layer/api/internal/templates"
)
// chTime formats a Go time.Time for ClickHouse server-side query parameters.
// clickhouse-go v2 routes args declared via {name:Type} syntax through the
// server-side parameter protocol, which only accepts string values -- typed
// helpers like clickhouse.DateNamed fail with
// "expected string value in NamedValue for query parameter".
// We emit the format ClickHouse parses for DateTime64(3,'UTC').
func chTime(t time.Time) string {
return t.UTC().Format("2006-01-02 15:04:05.000")
}
func chUint(n uint64) string { return strconv.FormatUint(n, 10) }
type EventRepo struct {
ch driver.Conn
tpl *templates.Store
}
func NewEventRepo(ch driver.Conn, tpl *templates.Store) *EventRepo {
return &EventRepo{ch: ch, tpl: tpl}
}
// QueryEvents renders the event_explorer template against q.Table and returns
// columns+rows. The query is parameterized -- user input never lands in the
// SQL string, only in clickhouse.Named bindings.
func (r *EventRepo) QueryEvents(ctx context.Context, q model.EventQuery) (*model.QueryResult, error) {
if !q.Table.Valid() {
return nil, fmt.Errorf("invalid event table: %q", q.Table)
}
sql, err := r.tpl.Render("event_explorer.sql.tmpl", map[string]any{
"Table": string(q.Table),
"HasUserID": q.UserID != "",
"HasAnonymousID": q.AnonymousID != "",
"HasEventName": q.EventName != "" && q.Table == model.EventTableTrack,
})
if err != nil {
return nil, err
}
args := []any{
clickhouse.Named("workspace_id", q.WorkspaceID),
clickhouse.Named("from", chTime(q.From)),
clickhouse.Named("to", chTime(q.To)),
clickhouse.Named("limit", chUint(uint64(q.Limit))),
clickhouse.Named("offset", chUint(uint64(q.Offset))),
}
if q.UserID != "" {
args = append(args, clickhouse.Named("user_id", q.UserID))
}
if q.AnonymousID != "" {
args = append(args, clickhouse.Named("anonymous_id", q.AnonymousID))
}
if q.EventName != "" && q.Table == model.EventTableTrack {
args = append(args, clickhouse.Named("event", q.EventName))
}
rows, err := r.ch.Query(ctx, sql, args...)
if err != nil {
return nil, fmt.Errorf("clickhouse query: %w", err)
}
defer rows.Close()
return ScanRows(rows)
}
// QueryProfileTimeline returns recent events for a profile (resolved to
// user_id) across all four event tables, ordered by received_at desc.
func (r *EventRepo) QueryProfileTimeline(ctx context.Context, workspaceID, userID string, limit, offset int) (*model.QueryResult, error) {
sql, err := r.tpl.Render("profile_timeline.sql.tmpl", nil)
if err != nil {
return nil, err
}
rows, err := r.ch.Query(ctx, sql,
clickhouse.Named("workspace_id", workspaceID),
clickhouse.Named("user_id", userID),
clickhouse.Named("limit", chUint(uint64(limit))),
clickhouse.Named("offset", chUint(uint64(offset))),
)
if err != nil {
return nil, fmt.Errorf("clickhouse query: %w", err)
}
defer rows.Close()
return ScanRows(rows)
}
// ScanRows turns a driver.Rows iterator into a generic QueryResult. Column
// types come from rows.ColumnTypes() so we allocate the right pointer kinds.
func ScanRows(rows driver.Rows) (*model.QueryResult, error) {
cols := rows.Columns()
colTypes := rows.ColumnTypes()
out := &model.QueryResult{Columns: cols, Rows: [][]any{}}
for rows.Next() {
dest := make([]any, len(colTypes))
for i, ct := range colTypes {
dest[i] = newScanTarget(ct.ScanType().String())
}
if err := rows.Scan(dest...); err != nil {
return nil, fmt.Errorf("scan row: %w", err)
}
row := make([]any, len(dest))
for i, p := range dest {
row[i] = derefScanTarget(p)
}
out.Rows = append(out.Rows, row)
}
if err := rows.Err(); err != nil {
return nil, err
}
out.RowCount = len(out.Rows)
return out, nil
}
// newScanTarget returns a pointer matching ClickHouse's reported Go scan type.
// We keep this list small -- the analytics tables share a handful of types.
func newScanTarget(typeName string) any {
switch typeName {
case "string":
var v string
return &v
case "uint8":
var v uint8
return &v
case "uint16":
var v uint16
return &v
case "uint32":
var v uint32
return &v
case "uint64":
var v uint64
return &v
case "int32":
var v int32
return &v
case "int64":
var v int64
return &v
case "float32":
var v float32
return &v
case "float64":
var v float64
return &v
case "bool":
var v bool
return &v
case "time.Time":
var v time.Time
return &v
case "map[string]string":
var v map[string]string
return &v
case "[]string":
var v []string
return &v
default:
// Fallback: untyped pointer; driver decides.
var v any
return &v
}
}
func derefScanTarget(p any) any {
switch v := p.(type) {
case *string:
return *v
case *uint8:
return *v
case *uint16:
return *v
case *uint32:
return *v
case *uint64:
return *v
case *int32:
return *v
case *int64:
return *v
case *float32:
return *v
case *float64:
return *v
case *bool:
return *v
case *map[string]string:
return *v
case *[]string:
return *v
case *time.Time:
return *v
case *any:
return *v
default:
return v
}
}