212 lines
5.2 KiB
Go
212 lines
5.2 KiB
Go
package repo
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
|
|
"github.com/dbiz/cdp/data-layer/api/internal/model"
|
|
"github.com/dbiz/cdp/data-layer/api/internal/templates"
|
|
)
|
|
|
|
// chTime formats a Go time.Time for ClickHouse server-side query parameters.
|
|
// clickhouse-go v2 routes args declared via {name:Type} syntax through the
|
|
// server-side parameter protocol, which only accepts string values -- typed
|
|
// helpers like clickhouse.DateNamed fail with
|
|
// "expected string value in NamedValue for query parameter".
|
|
// We emit the format ClickHouse parses for DateTime64(3,'UTC').
|
|
func chTime(t time.Time) string {
|
|
return t.UTC().Format("2006-01-02 15:04:05.000")
|
|
}
|
|
|
|
func chUint(n uint64) string { return strconv.FormatUint(n, 10) }
|
|
|
|
type EventRepo struct {
|
|
ch driver.Conn
|
|
tpl *templates.Store
|
|
}
|
|
|
|
func NewEventRepo(ch driver.Conn, tpl *templates.Store) *EventRepo {
|
|
return &EventRepo{ch: ch, tpl: tpl}
|
|
}
|
|
|
|
// QueryEvents renders the event_explorer template against q.Table and returns
|
|
// columns+rows. The query is parameterized -- user input never lands in the
|
|
// SQL string, only in clickhouse.Named bindings.
|
|
func (r *EventRepo) QueryEvents(ctx context.Context, q model.EventQuery) (*model.QueryResult, error) {
|
|
if !q.Table.Valid() {
|
|
return nil, fmt.Errorf("invalid event table: %q", q.Table)
|
|
}
|
|
sql, err := r.tpl.Render("event_explorer.sql.tmpl", map[string]any{
|
|
"Table": string(q.Table),
|
|
"HasUserID": q.UserID != "",
|
|
"HasAnonymousID": q.AnonymousID != "",
|
|
"HasEventName": q.EventName != "" && q.Table == model.EventTableTrack,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
args := []any{
|
|
clickhouse.Named("workspace_id", q.WorkspaceID),
|
|
clickhouse.Named("from", chTime(q.From)),
|
|
clickhouse.Named("to", chTime(q.To)),
|
|
clickhouse.Named("limit", chUint(uint64(q.Limit))),
|
|
clickhouse.Named("offset", chUint(uint64(q.Offset))),
|
|
}
|
|
if q.UserID != "" {
|
|
args = append(args, clickhouse.Named("user_id", q.UserID))
|
|
}
|
|
if q.AnonymousID != "" {
|
|
args = append(args, clickhouse.Named("anonymous_id", q.AnonymousID))
|
|
}
|
|
if q.EventName != "" && q.Table == model.EventTableTrack {
|
|
args = append(args, clickhouse.Named("event", q.EventName))
|
|
}
|
|
|
|
rows, err := r.ch.Query(ctx, sql, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("clickhouse query: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
return ScanRows(rows)
|
|
}
|
|
|
|
// QueryProfileTimeline returns recent events for a profile (resolved to
|
|
// user_id) across all four event tables, ordered by received_at desc.
|
|
func (r *EventRepo) QueryProfileTimeline(ctx context.Context, workspaceID, userID string, limit, offset int) (*model.QueryResult, error) {
|
|
sql, err := r.tpl.Render("profile_timeline.sql.tmpl", nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rows, err := r.ch.Query(ctx, sql,
|
|
clickhouse.Named("workspace_id", workspaceID),
|
|
clickhouse.Named("user_id", userID),
|
|
clickhouse.Named("limit", chUint(uint64(limit))),
|
|
clickhouse.Named("offset", chUint(uint64(offset))),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("clickhouse query: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return ScanRows(rows)
|
|
}
|
|
|
|
// ScanRows turns a driver.Rows iterator into a generic QueryResult. Column
|
|
// types come from rows.ColumnTypes() so we allocate the right pointer kinds.
|
|
func ScanRows(rows driver.Rows) (*model.QueryResult, error) {
|
|
cols := rows.Columns()
|
|
colTypes := rows.ColumnTypes()
|
|
out := &model.QueryResult{Columns: cols, Rows: [][]any{}}
|
|
|
|
for rows.Next() {
|
|
dest := make([]any, len(colTypes))
|
|
for i, ct := range colTypes {
|
|
dest[i] = newScanTarget(ct.ScanType().String())
|
|
}
|
|
if err := rows.Scan(dest...); err != nil {
|
|
return nil, fmt.Errorf("scan row: %w", err)
|
|
}
|
|
row := make([]any, len(dest))
|
|
for i, p := range dest {
|
|
row[i] = derefScanTarget(p)
|
|
}
|
|
out.Rows = append(out.Rows, row)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
out.RowCount = len(out.Rows)
|
|
return out, nil
|
|
}
|
|
|
|
// newScanTarget returns a pointer matching ClickHouse's reported Go scan type.
|
|
// We keep this list small -- the analytics tables share a handful of types.
|
|
func newScanTarget(typeName string) any {
|
|
switch typeName {
|
|
case "string":
|
|
var v string
|
|
return &v
|
|
case "uint8":
|
|
var v uint8
|
|
return &v
|
|
case "uint16":
|
|
var v uint16
|
|
return &v
|
|
case "uint32":
|
|
var v uint32
|
|
return &v
|
|
case "uint64":
|
|
var v uint64
|
|
return &v
|
|
case "int32":
|
|
var v int32
|
|
return &v
|
|
case "int64":
|
|
var v int64
|
|
return &v
|
|
case "float32":
|
|
var v float32
|
|
return &v
|
|
case "float64":
|
|
var v float64
|
|
return &v
|
|
case "bool":
|
|
var v bool
|
|
return &v
|
|
case "time.Time":
|
|
var v time.Time
|
|
return &v
|
|
case "map[string]string":
|
|
var v map[string]string
|
|
return &v
|
|
case "[]string":
|
|
var v []string
|
|
return &v
|
|
default:
|
|
// Fallback: untyped pointer; driver decides.
|
|
var v any
|
|
return &v
|
|
}
|
|
}
|
|
|
|
func derefScanTarget(p any) any {
|
|
switch v := p.(type) {
|
|
case *string:
|
|
return *v
|
|
case *uint8:
|
|
return *v
|
|
case *uint16:
|
|
return *v
|
|
case *uint32:
|
|
return *v
|
|
case *uint64:
|
|
return *v
|
|
case *int32:
|
|
return *v
|
|
case *int64:
|
|
return *v
|
|
case *float32:
|
|
return *v
|
|
case *float64:
|
|
return *v
|
|
case *bool:
|
|
return *v
|
|
case *map[string]string:
|
|
return *v
|
|
case *[]string:
|
|
return *v
|
|
case *time.Time:
|
|
return *v
|
|
case *any:
|
|
return *v
|
|
default:
|
|
return v
|
|
}
|
|
}
|