210 lines
5.8 KiB
Go
210 lines
5.8 KiB
Go
package handler
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
|
|
"github.com/go-playground/validator/v10"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/apperr"
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/middleware"
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/model"
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/service"
|
|
)
|
|
|
|
type EventHandler struct {
|
|
svc *service.IngestService
|
|
val *validator.Validate
|
|
log *zap.Logger
|
|
}
|
|
|
|
func NewEventHandler(svc *service.IngestService, log *zap.Logger) *EventHandler {
|
|
return &EventHandler{
|
|
svc: svc,
|
|
val: validator.New(),
|
|
log: log,
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Routes
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// Single-event endpoints. They differ only in the `type` they force on the
|
|
// body, so they all funnel into one handler.
|
|
|
|
func (h *EventHandler) Track(w http.ResponseWriter, r *http.Request) {
|
|
h.handleSingle(w, r, model.EventTypeTrack)
|
|
}
|
|
|
|
func (h *EventHandler) Identify(w http.ResponseWriter, r *http.Request) {
|
|
h.handleSingle(w, r, model.EventTypeIdentify)
|
|
}
|
|
|
|
func (h *EventHandler) Page(w http.ResponseWriter, r *http.Request) {
|
|
h.handleSingle(w, r, model.EventTypePage)
|
|
}
|
|
|
|
func (h *EventHandler) Group(w http.ResponseWriter, r *http.Request) {
|
|
h.handleSingle(w, r, model.EventTypeGroup)
|
|
}
|
|
|
|
func (h *EventHandler) Alias(w http.ResponseWriter, r *http.Request) {
|
|
h.handleSingle(w, r, model.EventTypeAlias)
|
|
}
|
|
|
|
func (h *EventHandler) Screen(w http.ResponseWriter, r *http.Request) {
|
|
h.handleSingle(w, r, model.EventTypeScreen)
|
|
}
|
|
|
|
func (h *EventHandler) Batch(w http.ResponseWriter, r *http.Request) {
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
h.writeErr(w, apperr.PayloadTooLarge("payload too large"))
|
|
return
|
|
}
|
|
|
|
var env model.BatchEnvelope
|
|
if err := json.Unmarshal(body, &env); err != nil {
|
|
h.writeErr(w, apperr.BadRequest("invalid json body", "", err))
|
|
return
|
|
}
|
|
if err := h.val.Struct(&env); err != nil {
|
|
h.writeErr(w, apperr.BadRequest("validation failed", firstField(err), err))
|
|
return
|
|
}
|
|
|
|
ictx := h.ingestCtx(r, body)
|
|
results := h.svc.IngestBatch(r.Context(), ictx, env.Batch)
|
|
|
|
// Per-event status -- 200 OK, with an array of {message_id, ok, error}.
|
|
type item struct {
|
|
MessageID string `json:"messageId"`
|
|
OK bool `json:"ok"`
|
|
Error string `json:"error,omitempty"`
|
|
Field string `json:"field,omitempty"`
|
|
}
|
|
out := make([]item, len(env.Batch))
|
|
for i, e := range env.Batch {
|
|
it := item{MessageID: e.MessageID, OK: true}
|
|
if results[i] != nil {
|
|
it.OK = false
|
|
if ae, ok := apperr.As(results[i]); ok {
|
|
it.Error = ae.Message
|
|
it.Field = ae.Field
|
|
} else {
|
|
it.Error = "internal error"
|
|
}
|
|
}
|
|
out[i] = it
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"results": out})
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Health / Ready
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func (h *EventHandler) Health(w http.ResponseWriter, r *http.Request) {
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|
|
|
|
func (h *EventHandler) Ready(w http.ResponseWriter, r *http.Request) {
|
|
// Liveness is enough for k8s readiness in this scaffold. Wire in real
|
|
// dependency checks (PG ping, Kafka ping) when needed.
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ready"})
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// shared helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func (h *EventHandler) handleSingle(w http.ResponseWriter, r *http.Request, t model.EventType) {
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
h.writeErr(w, apperr.PayloadTooLarge("payload too large"))
|
|
return
|
|
}
|
|
|
|
var raw model.RawEvent
|
|
if err := json.NewDecoder(bytes.NewReader(body)).Decode(&raw); err != nil {
|
|
h.writeErr(w, apperr.BadRequest("invalid json body", "", err))
|
|
return
|
|
}
|
|
if raw.Type == "" {
|
|
raw.Type = t
|
|
}
|
|
if err := h.val.Struct(&raw); err != nil {
|
|
h.writeErr(w, apperr.BadRequest("validation failed", firstField(err), err))
|
|
return
|
|
}
|
|
|
|
ictx := h.ingestCtx(r, body)
|
|
if err := h.svc.Ingest(r.Context(), ictx, &raw); err != nil {
|
|
h.writeErr(w, err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]bool{"ok": true})
|
|
}
|
|
|
|
func (h *EventHandler) ingestCtx(r *http.Request, body []byte) service.IngestContext {
|
|
wk := middleware.WriteKeyFromCtx(r.Context())
|
|
return service.IngestContext{
|
|
WorkspaceID: wk.WorkspaceID,
|
|
SourceID: wk.SourceID,
|
|
IP: clientIP(r),
|
|
UserAgent: r.UserAgent(),
|
|
RawBody: body,
|
|
}
|
|
}
|
|
|
|
func (h *EventHandler) writeErr(w http.ResponseWriter, err error) {
|
|
if ae, ok := apperr.As(err); ok {
|
|
if ae.RetryAfter > 0 {
|
|
w.Header().Set("Retry-After", strconv.Itoa(ae.RetryAfter))
|
|
}
|
|
writeJSON(w, ae.Code, errorResponse{Error: ae.Message, Field: ae.Field})
|
|
if ae.Err != nil {
|
|
h.log.Warn("request error",
|
|
zap.Int("code", ae.Code),
|
|
zap.String("msg", ae.Message),
|
|
zap.Error(ae.Err))
|
|
}
|
|
return
|
|
}
|
|
h.log.Error("unhandled error", zap.Error(err))
|
|
writeJSON(w, http.StatusInternalServerError, errorResponse{Error: "internal server error"})
|
|
}
|
|
|
|
type errorResponse struct {
|
|
Error string `json:"error"`
|
|
Field string `json:"field,omitempty"`
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, status int, body any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(status)
|
|
_ = json.NewEncoder(w).Encode(body)
|
|
}
|
|
|
|
func firstField(err error) string {
|
|
var verrs validator.ValidationErrors
|
|
if errors.As(err, &verrs) && len(verrs) > 0 {
|
|
return verrs[0].Field()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// clientIP duplicates middleware.clientIP -- intentionally small, no shared types.
|
|
func clientIP(r *http.Request) string {
|
|
if h := r.Header.Get("X-Forwarded-For"); h != "" {
|
|
return h
|
|
}
|
|
return r.RemoteAddr
|
|
}
|