71 lines
2.1 KiB
Go
71 lines
2.1 KiB
Go
package repo
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/dbiz/cdp/data-layer/api/internal/apperr"
|
|
"github.com/dbiz/cdp/data-layer/api/internal/model"
|
|
)
|
|
|
|
// ProfileRepo reads the unified-profile table owned by cdp-ingestion.
|
|
//
|
|
// Assumed schema (TODO: align with cdp-ingestion once that migration lands):
|
|
//
|
|
// profiles (
|
|
// id UUID,
|
|
// workspace_id UUID,
|
|
// user_id TEXT,
|
|
// anonymous_ids TEXT[],
|
|
// traits JSONB,
|
|
// first_seen_at TIMESTAMPTZ,
|
|
// last_seen_at TIMESTAMPTZ
|
|
// )
|
|
type ProfileRepo struct {
|
|
pg *pgxpool.Pool
|
|
}
|
|
|
|
func NewProfileRepo(pg *pgxpool.Pool) *ProfileRepo { return &ProfileRepo{pg: pg} }
|
|
|
|
const selectProfileByID = `
|
|
SELECT id, workspace_id, user_id, anonymous_ids, traits, first_seen_at, last_seen_at
|
|
FROM profiles
|
|
WHERE workspace_id = $1 AND id = $2
|
|
`
|
|
|
|
func (r *ProfileRepo) GetByID(ctx context.Context, workspaceID, profileID string) (*model.Profile, error) {
|
|
row := r.pg.QueryRow(ctx, selectProfileByID, workspaceID, profileID)
|
|
var p model.Profile
|
|
var traitsRaw []byte
|
|
if err := row.Scan(&p.ID, &p.WorkspaceID, &p.UserID, &p.AnonymousIDs, &traitsRaw, &p.FirstSeenAt, &p.LastSeenAt); err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, apperr.NotFound("profile not found")
|
|
}
|
|
return nil, apperr.Internal(err)
|
|
}
|
|
if len(traitsRaw) > 0 {
|
|
if err := json.Unmarshal(traitsRaw, &p.Traits); err != nil {
|
|
return nil, apperr.Internal(err)
|
|
}
|
|
}
|
|
return &p, nil
|
|
}
|
|
|
|
// GetUserIDForProfile resolves a profile UUID back to its primary user_id so
|
|
// the timeline query can target ClickHouse events on that key.
|
|
func (r *ProfileRepo) GetUserIDForProfile(ctx context.Context, workspaceID, profileID string) (string, error) {
|
|
const q = `SELECT user_id FROM profiles WHERE workspace_id = $1 AND id = $2`
|
|
var uid string
|
|
if err := r.pg.QueryRow(ctx, q, workspaceID, profileID).Scan(&uid); err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return "", apperr.NotFound("profile not found")
|
|
}
|
|
return "", apperr.Internal(err)
|
|
}
|
|
return uid, nil
|
|
}
|