280 lines
6.9 KiB
Go
280 lines
6.9 KiB
Go
// Package writer wraps the ClickHouse client for batch inserts.
|
|
//
|
|
// We use the native clickhouse-go v2 client. One PrepareBatch / Append / Send
|
|
// cycle per (table, batch). All maps are stringified before insertion -- the
|
|
// ClickHouse schema uses Map(String, String) which keeps the table flat and
|
|
// avoids column explosion. Analytics queries cast on read.
|
|
package writer
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
|
|
"github.com/dbiz/cdp/ingestion/bulker/internal/model"
|
|
)
|
|
|
|
type ClickHouse struct {
|
|
conn driver.Conn
|
|
db string
|
|
}
|
|
|
|
// New opens a ClickHouse connection. `secure` enables TLS. The wire protocol
|
|
// is auto-selected from the port: 8123/8443 (HTTP interface) use HTTP, the
|
|
// native default otherwise.
|
|
func New(ctx context.Context, addr, db, user, password string, secure bool) (*ClickHouse, error) {
|
|
opts := &clickhouse.Options{
|
|
Addr: []string{addr},
|
|
Protocol: protocolFromAddr(addr),
|
|
Auth: clickhouse.Auth{
|
|
Database: db,
|
|
Username: user,
|
|
Password: password,
|
|
},
|
|
Settings: clickhouse.Settings{
|
|
"async_insert": 0,
|
|
"wait_for_async_insert": 0,
|
|
},
|
|
}
|
|
if secure {
|
|
opts.TLS = &tls.Config{MinVersion: tls.VersionTLS12}
|
|
}
|
|
conn, err := clickhouse.Open(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("clickhouse open: %w", err)
|
|
}
|
|
if err := conn.Ping(ctx); err != nil {
|
|
return nil, fmt.Errorf("clickhouse ping: %w", err)
|
|
}
|
|
return &ClickHouse{conn: conn, db: db}, nil
|
|
}
|
|
|
|
func protocolFromAddr(addr string) clickhouse.Protocol {
|
|
// 8443 = HTTPS interface, 8123 = HTTP interface (both speak the HTTP wire).
|
|
// Everything else (9000/9440/...) speaks the native protocol.
|
|
switch port := portOf(addr); port {
|
|
case "8123", "8443":
|
|
return clickhouse.HTTP
|
|
default:
|
|
return clickhouse.Native
|
|
}
|
|
}
|
|
|
|
func portOf(addr string) string {
|
|
for i := len(addr) - 1; i >= 0; i-- {
|
|
if addr[i] == ':' {
|
|
return addr[i+1:]
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (c *ClickHouse) Close() error { return c.conn.Close() }
|
|
|
|
// WriteEvents fans out a mixed-type batch into the per-type tables.
|
|
// Returns the number of rows successfully inserted across all tables.
|
|
func (c *ClickHouse) WriteEvents(ctx context.Context, events []*model.IngestedEvent) (int, error) {
|
|
if len(events) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
// Bucket by event type so each insert hits one table.
|
|
buckets := map[string][]*model.IngestedEvent{}
|
|
for _, e := range events {
|
|
buckets[e.Type] = append(buckets[e.Type], e)
|
|
}
|
|
|
|
total := 0
|
|
for t, evs := range buckets {
|
|
var err error
|
|
switch t {
|
|
case "track":
|
|
err = c.writeTrack(ctx, evs)
|
|
case "identify":
|
|
err = c.writeIdentify(ctx, evs)
|
|
case "page", "screen":
|
|
err = c.writePage(ctx, evs)
|
|
case "group":
|
|
err = c.writeGroup(ctx, evs)
|
|
default:
|
|
// alias / unknown types -- write to track for now
|
|
err = c.writeTrack(ctx, evs)
|
|
}
|
|
if err != nil {
|
|
return total, fmt.Errorf("write %s: %w", t, err)
|
|
}
|
|
total += len(evs)
|
|
}
|
|
return total, nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// per-table batch inserts
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func (c *ClickHouse) writeTrack(ctx context.Context, evs []*model.IngestedEvent) error {
|
|
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_track")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, e := range evs {
|
|
err := batch.Append(
|
|
e.WorkspaceID, e.SourceID, e.MessageID,
|
|
e.AnonymousID, e.UserID, e.Event,
|
|
e.Timestamp, e.SentAt, e.ReceivedAt,
|
|
mapToStr(e.Properties), mapToStr(e.Context),
|
|
e.IP, e.UserAgent,
|
|
libraryName(e.Context), libraryVersion(e.Context),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return batch.Send()
|
|
}
|
|
|
|
func (c *ClickHouse) writeIdentify(ctx context.Context, evs []*model.IngestedEvent) error {
|
|
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_identify")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, e := range evs {
|
|
err := batch.Append(
|
|
e.WorkspaceID, e.SourceID, e.MessageID,
|
|
e.AnonymousID, e.UserID,
|
|
e.Timestamp, e.SentAt, e.ReceivedAt,
|
|
mapToStr(e.Traits), mapToStr(e.Context),
|
|
e.IP, e.UserAgent,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return batch.Send()
|
|
}
|
|
|
|
func (c *ClickHouse) writePage(ctx context.Context, evs []*model.IngestedEvent) error {
|
|
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_page")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, e := range evs {
|
|
path, _ := e.Properties["path"].(string)
|
|
url, _ := e.Properties["url"].(string)
|
|
referrer, _ := e.Properties["referrer"].(string)
|
|
err := batch.Append(
|
|
e.WorkspaceID, e.SourceID, e.MessageID,
|
|
e.AnonymousID, e.UserID, e.Name, e.Category,
|
|
e.Timestamp, e.SentAt, e.ReceivedAt,
|
|
mapToStr(e.Properties), mapToStr(e.Context),
|
|
e.IP, e.UserAgent,
|
|
referrer, path, url,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return batch.Send()
|
|
}
|
|
|
|
func (c *ClickHouse) writeGroup(ctx context.Context, evs []*model.IngestedEvent) error {
|
|
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_group")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, e := range evs {
|
|
err := batch.Append(
|
|
e.WorkspaceID, e.SourceID, e.MessageID,
|
|
e.AnonymousID, e.UserID, e.GroupID,
|
|
e.Timestamp, e.SentAt, e.ReceivedAt,
|
|
mapToStr(e.Traits), mapToStr(e.Context),
|
|
e.IP, e.UserAgent,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return batch.Send()
|
|
}
|
|
|
|
// WriteDLQ inserts records from the DLQ topic.
|
|
func (c *ClickHouse) WriteDLQ(ctx context.Context, recs []*model.DLQRecord) error {
|
|
if len(recs) == 0 {
|
|
return nil
|
|
}
|
|
batch, err := c.conn.PrepareBatch(ctx, "INSERT INTO events_dlq")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, r := range recs {
|
|
if err := batch.Append(
|
|
r.WorkspaceID, r.SourceID, r.MessageID, r.ReceivedAt,
|
|
r.Reason, r.Field, r.RawPayload,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return batch.Send()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// mapToStr converts a map[string]any into the Map(String, String) shape
|
|
// ClickHouse expects. Non-string values are JSON-encoded.
|
|
func mapToStr(in map[string]any) map[string]string {
|
|
if in == nil {
|
|
return map[string]string{}
|
|
}
|
|
out := make(map[string]string, len(in))
|
|
for k, v := range in {
|
|
out[k] = anyToStr(v)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func anyToStr(v any) string {
|
|
switch x := v.(type) {
|
|
case nil:
|
|
return ""
|
|
case string:
|
|
return x
|
|
case float64:
|
|
return strconv.FormatFloat(x, 'f', -1, 64)
|
|
case int:
|
|
return strconv.Itoa(x)
|
|
case int64:
|
|
return strconv.FormatInt(x, 10)
|
|
case bool:
|
|
return strconv.FormatBool(x)
|
|
default:
|
|
b, _ := json.Marshal(v)
|
|
return string(b)
|
|
}
|
|
}
|
|
|
|
func libraryName(ctx map[string]any) string {
|
|
if ctx == nil {
|
|
return ""
|
|
}
|
|
if v, ok := ctx["library_name"].(string); ok {
|
|
return v
|
|
}
|
|
return ""
|
|
}
|
|
func libraryVersion(ctx map[string]any) string {
|
|
if ctx == nil {
|
|
return ""
|
|
}
|
|
if v, ok := ctx["library_version"].(string); ok {
|
|
return v
|
|
}
|
|
return ""
|
|
}
|