40 lines
1.1 KiB
Go
40 lines
1.1 KiB
Go
package handler
|
|
|
|
import (
|
|
"net/http"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/dbiz/cdp/ingestion/ingest/internal/live"
|
|
)
|
|
|
|
// LiveHandler streams Kafka events over SSE for the console's Live page.
|
|
//
|
|
// Note: this endpoint is unauthenticated in the scaffold. Wire it behind the
|
|
// console session / a workspace token before exposing it publicly.
|
|
type LiveHandler struct {
|
|
stream *live.Streamer
|
|
log *zap.Logger
|
|
}
|
|
|
|
func NewLiveHandler(s *live.Streamer, log *zap.Logger) *LiveHandler {
|
|
return &LiveHandler{stream: s, log: log}
|
|
}
|
|
|
|
// Stream handles GET /live/events. Optional query params:
|
|
//
|
|
// ?workspace_id=... filter by workspace
|
|
// ?source_id=... filter by source
|
|
// ?type=track filter by event type (track|identify|page|group|...)
|
|
func (h *LiveHandler) Stream(w http.ResponseWriter, r *http.Request) {
|
|
q := r.URL.Query()
|
|
flt := live.Filter{
|
|
WorkspaceID: q.Get("workspace_id"),
|
|
SourceID: q.Get("source_id"),
|
|
EventType: q.Get("type"),
|
|
}
|
|
if err := h.stream.Stream(r.Context(), w, flt); err != nil {
|
|
h.log.Warn("live stream ended", zap.Error(err))
|
|
}
|
|
}
|