114 lines
3.6 KiB
Go
114 lines
3.6 KiB
Go
package repo
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/apperr"
|
|
)
|
|
|
|
// SchemaRepo records the data type observed for each (workspace, event_type, field)
|
|
// triple. The ingest hot path calls GetType per field to detect type conflicts,
|
|
// so we wrap PG with an in-memory cache. Cache misses fall through to PG; the
|
|
// resolved type (including the "not seen yet" empty string) is memoised.
|
|
//
|
|
// Cache invalidation: UpsertField writes through, so the writer also refreshes.
|
|
// Other ingest instances are eventually consistent -- a tier-1 PG conflict will
|
|
// surface on the next request that re-fetches. Acceptable for an append-only
|
|
// schema registry.
|
|
type SchemaRepo interface {
|
|
// GetType returns the recorded type, or "" if the field has never been seen.
|
|
GetType(ctx context.Context, workspaceID, eventType, field string) (string, error)
|
|
// UpsertField records a new (or re-confirmed) field type.
|
|
UpsertField(ctx context.Context, workspaceID, eventType, field, dataType string) error
|
|
}
|
|
|
|
type schemaRepo struct {
|
|
db *pgxpool.Pool
|
|
cache *schemaCache
|
|
}
|
|
|
|
func NewSchemaRepo(db *pgxpool.Pool) SchemaRepo {
|
|
return &schemaRepo{
|
|
db: db,
|
|
cache: newSchemaCache(),
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// cache
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type schemaCache struct {
|
|
mu sync.RWMutex
|
|
// "" means "looked up, never seen" -- distinct from "absent from cache".
|
|
data map[string]string
|
|
}
|
|
|
|
func newSchemaCache() *schemaCache {
|
|
return &schemaCache{data: make(map[string]string, 256)}
|
|
}
|
|
|
|
func (c *schemaCache) key(ws, et, field string) string {
|
|
return ws + "|" + et + "|" + field
|
|
}
|
|
|
|
func (c *schemaCache) get(ws, et, field string) (string, bool) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
v, ok := c.data[c.key(ws, et, field)]
|
|
return v, ok
|
|
}
|
|
|
|
func (c *schemaCache) set(ws, et, field, dataType string) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.data[c.key(ws, et, field)] = dataType
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// repo methods
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func (r *schemaRepo) GetType(ctx context.Context, workspaceID, eventType, field string) (string, error) {
|
|
if v, ok := r.cache.get(workspaceID, eventType, field); ok {
|
|
return v, nil
|
|
}
|
|
|
|
const q = `
|
|
SELECT data_type FROM schema_fields
|
|
WHERE workspace_id = $1::uuid AND event_type = $2 AND field = $3`
|
|
var t string
|
|
err := r.db.QueryRow(ctx, q, workspaceID, eventType, field).Scan(&t)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
// negative cache: avoid hammering PG for fields that don't exist yet.
|
|
r.cache.set(workspaceID, eventType, field, "")
|
|
return "", nil
|
|
}
|
|
if err != nil {
|
|
return "", apperr.Internal(fmt.Errorf("schema get: %w", err))
|
|
}
|
|
r.cache.set(workspaceID, eventType, field, t)
|
|
return t, nil
|
|
}
|
|
|
|
func (r *schemaRepo) UpsertField(ctx context.Context, workspaceID, eventType, field, dataType string) error {
|
|
const q = `
|
|
INSERT INTO schema_fields (workspace_id, event_type, field, data_type)
|
|
VALUES ($1::uuid, $2, $3, $4)
|
|
ON CONFLICT (workspace_id, event_type, field) DO UPDATE
|
|
SET last_seen_at = now(),
|
|
sample_count = schema_fields.sample_count + 1`
|
|
if _, err := r.db.Exec(ctx, q, workspaceID, eventType, field, dataType); err != nil {
|
|
return apperr.Internal(fmt.Errorf("schema upsert: %w", err))
|
|
}
|
|
// Write-through: keep the local cache consistent with what we just stored.
|
|
r.cache.set(workspaceID, eventType, field, dataType)
|
|
return nil
|
|
}
|