// Command server runs the CDP bulker -- Kafka consumer that batches events // into ClickHouse (and other warehouses, when configured). package main import ( "context" "encoding/json" "errors" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/go-chi/chi/v5" "go.uber.org/zap" "github.com/dbiz/cdp/ingestion/bulker/internal/batcher" "github.com/dbiz/cdp/ingestion/bulker/internal/config" "github.com/dbiz/cdp/ingestion/bulker/internal/consumer" "github.com/dbiz/cdp/ingestion/bulker/internal/model" "github.com/dbiz/cdp/ingestion/bulker/internal/writer" ) func main() { if err := run(); err != nil { log.Fatal(err) } } func run() error { cfg, err := config.Load() if err != nil { return err } logger, err := newLogger(cfg.LogLevel) if err != nil { return err } defer func() { _ = logger.Sync() }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // ---- ClickHouse ------------------------------------------------------- ch, err := writer.New(ctx, cfg.ClickHouseAddr, cfg.ClickHouseDB, cfg.ClickHouseUser, cfg.ClickHousePassword, cfg.ClickHouseSecure) if err != nil { return err } defer func() { _ = ch.Close() }() // ---- Batcher --------------------------------------------------------- b := batcher.New(cfg.BatchSize, cfg.BatchInterval, func(ctx context.Context, evs []*model.IngestedEvent) error { _, err := ch.WriteEvents(ctx, evs) return err }, logger) go b.Run(ctx) // ---- Consumer -------------------------------------------------------- cons, err := consumer.New(consumer.Config{ Brokers: cfg.KafkaBrokers, Group: cfg.KafkaGroup, IngestTopic: cfg.KafkaTopicIngest, DLQTopic: cfg.KafkaTopicDLQ, }, b, ch, logger) if err != nil { return err } defer cons.Close() consumerErr := make(chan error, 1) go func() { consumerErr <- cons.Run(ctx) }() // ---- HTTP (health) --------------------------------------------------- r := chi.NewRouter() r.Get("/health", func(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) }) r.Get("/ready", func(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) }) srv := &http.Server{ Addr: cfg.HTTPAddr, Handler: r, ReadHeaderTimeout: 5 * time.Second, } httpErr := make(chan error, 1) go func() { logger.Info("bulker http listening", zap.String("addr", cfg.HTTPAddr)) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { httpErr <- err } }() // ---- Signals --------------------------------------------------------- sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) select { case <-sigCh: logger.Info("shutdown signal received") case err := <-consumerErr: logger.Error("consumer stopped unexpectedly", zap.Error(err)) case err := <-httpErr: logger.Error("http stopped unexpectedly", zap.Error(err)) } shutCtx, shutCancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout) defer shutCancel() cancel() // stop consumer + batcher _ = srv.Shutdown(shutCtx) _ = b.FlushNow(shutCtx) return nil } func writeJSON(w http.ResponseWriter, status int, body any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(body) } func newLogger(level string) (*zap.Logger, error) { lvl, err := zap.ParseAtomicLevel(level) if err != nil { lvl = zap.NewAtomicLevelAt(zap.InfoLevel) } cfg := zap.NewProductionConfig() cfg.Level = lvl cfg.EncoderConfig.TimeKey = "ts" cfg.EncoderConfig.MessageKey = "msg" return cfg.Build() }