diff --git a/cmd/livepeer/starter/events.go b/cmd/livepeer/starter/events.go new file mode 100644 index 0000000000..810f4aa055 --- /dev/null +++ b/cmd/livepeer/starter/events.go @@ -0,0 +1,155 @@ +package starter + +import ( + "fmt" + "net/url" + "strings" + "time" + + "github.com/golang/glog" + lpmon "github.com/livepeer/go-livepeer/monitor" +) + +func startEventPublisher(cfg LivepeerConfig) error { + sinkList := splitList(*cfg.EventSinkURIs) + if len(sinkList) == 0 { + legacyKafkaURI, err := buildLegacyKafkaSink(cfg) + if err != nil { + return err + } + if legacyKafkaURI != "" { + sinkList = append(sinkList, legacyKafkaURI) + } + } + + if len(sinkList) == 0 { + glog.Warning("event publisher not started: no sinks configured") + return nil + } + + headers, err := parseHeaderList(*cfg.EventSinkHeaders) + if err != nil { + return err + } + + queueDepth := valueOrDefaultInt(cfg.EventSinkQueueDepth, 100) + batchSize := valueOrDefaultInt(cfg.EventSinkBatchSize, 100) + flushInterval := valueOrDefaultDuration(cfg.EventSinkFlushInterval, time.Second) + + publisherCfg := lpmon.PublisherConfig{ + GatewayAddress: valueOrDefaultString(cfg.GatewayHost, ""), + QueueSize: queueDepth, + BatchSize: batchSize, + FlushInterval: flushInterval, + SinkURLs: sinkList, + Headers: headers, + } + + if err := lpmon.InitEventPublisher(publisherCfg); err != nil { + return fmt.Errorf("init event publisher: %w", err) + } + + return nil +} + +func splitList(raw string) []string { + if raw == "" { + return nil + } + parts := strings.FieldsFunc(raw, func(r rune) bool { + switch r { + case ',', '\n', ';': + return true + default: + return false + } + }) + var out []string + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + continue + } + out = append(out, p) + } + return out +} + +func parseHeaderList(raw string) (map[string]string, error) { + headers := make(map[string]string) + for _, entry := range splitList(raw) { + kv := strings.SplitN(entry, "=", 2) + if len(kv) != 2 { + return nil, fmt.Errorf("invalid header %q, expected Key=Value", entry) + } + key := strings.TrimSpace(kv[0]) + value := strings.TrimSpace(kv[1]) + if key == "" { + return nil, fmt.Errorf("invalid header %q: empty key", entry) + } + headers[key] = value + } + return headers, nil +} + +func buildLegacyKafkaSink(cfg LivepeerConfig) (string, error) { + if cfg.KafkaBootstrapServers == nil || cfg.KafkaGatewayTopic == nil { + return "", nil + } + brokers := strings.TrimSpace(*cfg.KafkaBootstrapServers) + topic := strings.TrimSpace(*cfg.KafkaGatewayTopic) + if brokers == "" || topic == "" { + return "", nil + } + var user, password string + if cfg.KafkaUsername != nil { + user = strings.TrimSpace(*cfg.KafkaUsername) + } + if cfg.KafkaPassword != nil { + password = strings.TrimSpace(*cfg.KafkaPassword) + } + + brokerList := strings.Split(brokers, ",") + host := strings.TrimSpace(brokerList[0]) + if host == "" { + return "", fmt.Errorf("invalid Kafka bootstrap server string %q", brokers) + } + + u := url.URL{Scheme: "kafka", Host: host} + if user != "" { + if password != "" { + u.User = url.UserPassword(user, password) + } else { + u.User = url.User(user) + } + } + q := u.Query() + q.Set("topic", topic) + q.Set("brokers", brokers) + u.RawQuery = q.Encode() + return u.String(), nil +} + +func valueOrDefaultString(ptr *string, def string) string { + if ptr == nil { + return def + } + if v := strings.TrimSpace(*ptr); v != "" { + return v + } + return def +} + +func valueOrDefaultInt(ptr *int, def int) int { + if ptr != nil && *ptr > 0 { + return *ptr + } + return def +} + +func valueOrDefaultDuration(ptr *time.Duration, def time.Duration) time.Duration { + if ptr != nil && *ptr > 0 { + return *ptr + } + return def +} diff --git a/cmd/livepeer/starter/flags.go b/cmd/livepeer/starter/flags.go index ff3395623d..ff320650a6 100644 --- a/cmd/livepeer/starter/flags.go +++ b/cmd/livepeer/starter/flags.go @@ -143,6 +143,11 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig { cfg.KafkaUsername = fs.String("kafkaUser", *cfg.KafkaUsername, "Kafka Username") cfg.KafkaPassword = fs.String("kafkaPassword", *cfg.KafkaPassword, "Kafka Password") cfg.KafkaGatewayTopic = fs.String("kafkaGatewayTopic", *cfg.KafkaGatewayTopic, "Kafka Topic used to send gateway logs") + cfg.EventSinkURIs = fs.String("eventSinks", *cfg.EventSinkURIs, "List of outbound event sink URIs (comma/semicolon/newline delimited). Scheme selects backend, e.g. kafka://, https://, wss://, grpc://, mqtt://") + cfg.EventSinkHeaders = fs.String("eventSinkHeader", *cfg.EventSinkHeaders, "Extra headers for HTTP/WebSocket/gRPC sinks, format Key=Value; applied to every matching sink") + cfg.EventSinkQueueDepth = fs.Int("eventSinkQueueDepth", *cfg.EventSinkQueueDepth, "Publisher queue size before new events are dropped (default 100)") + cfg.EventSinkBatchSize = fs.Int("eventSinkBatchSize", *cfg.EventSinkBatchSize, "Max events per flush to each sink (default 100)") + cfg.EventSinkFlushInterval = fs.Duration("eventSinkFlushInterval", *cfg.EventSinkFlushInterval, "Max buffering time before flushing a partial batch (default 1s)") return cfg } diff --git a/cmd/livepeer/starter/kafka.go b/cmd/livepeer/starter/kafka.go deleted file mode 100644 index 5c2250bc4b..0000000000 --- a/cmd/livepeer/starter/kafka.go +++ /dev/null @@ -1,26 +0,0 @@ -package starter - -import ( - "github.com/golang/glog" - lpmon "github.com/livepeer/go-livepeer/monitor" -) - -func startKafkaProducer(cfg LivepeerConfig) error { - if *cfg.KafkaBootstrapServers == "" || *cfg.KafkaGatewayTopic == "" { - glog.Warning("not starting Kafka producer as producer config values aren't present") - return nil - } - - var gatewayHost = "" - if cfg.GatewayHost != nil { - gatewayHost = *cfg.GatewayHost - } - - return lpmon.InitKafkaProducer( - *cfg.KafkaBootstrapServers, - *cfg.KafkaUsername, - *cfg.KafkaPassword, - *cfg.KafkaGatewayTopic, - gatewayHost, - ) -} diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 4c7caf27db..002fa00a87 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -177,6 +177,11 @@ type LivepeerConfig struct { KafkaUsername *string KafkaPassword *string KafkaGatewayTopic *string + EventSinkURIs *string + EventSinkHeaders *string + EventSinkQueueDepth *int + EventSinkBatchSize *int + EventSinkFlushInterval *time.Duration MediaMTXApiPassword *string LiveAIAuthApiKey *string LiveAIHeartbeatURL *string @@ -306,6 +311,11 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultKafkaUsername := "" defaultKafkaPassword := "" defaultKafkaGatewayTopic := "" + defaultEventSinkURIs := "" + defaultEventSinkHeaders := "" + defaultEventSinkQueueDepth := 100 + defaultEventSinkBatchSize := 100 + defaultEventSinkFlushInterval := time.Second return LivepeerConfig{ // Network & Addresses: @@ -422,10 +432,15 @@ func DefaultLivepeerConfig() LivepeerConfig { TestOrchAvail: &defaultTestOrchAvail, // Gateway logs - KafkaBootstrapServers: &defaultKafkaBootstrapServers, - KafkaUsername: &defaultKafkaUsername, - KafkaPassword: &defaultKafkaPassword, - KafkaGatewayTopic: &defaultKafkaGatewayTopic, + KafkaBootstrapServers: &defaultKafkaBootstrapServers, + KafkaUsername: &defaultKafkaUsername, + KafkaPassword: &defaultKafkaPassword, + KafkaGatewayTopic: &defaultKafkaGatewayTopic, + EventSinkURIs: &defaultEventSinkURIs, + EventSinkHeaders: &defaultEventSinkHeaders, + EventSinkQueueDepth: &defaultEventSinkQueueDepth, + EventSinkBatchSize: &defaultEventSinkBatchSize, + EventSinkFlushInterval: &defaultEventSinkFlushInterval, } } @@ -445,6 +460,7 @@ func (cfg LivepeerConfig) PrintConfig(w io.Writer) { "MediaMTXApiPassword": true, "LiveAIAuthApiKey": true, "FVfailGsKey": true, + "EventSinkHeaders": true, } for i := 0; i < cfgType.NumField(); i++ { @@ -730,10 +746,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { lpmon.InitCensus(nodeType, core.LivepeerVersion) } - // Start Kafka producer + // Start event publisher if *cfg.Monitor { - if err := startKafkaProducer(cfg); err != nil { - exit("Error while starting Kafka producer", err) + if err := startEventPublisher(cfg); err != nil { + exit("Error while starting event publisher", err) } } diff --git a/core/livepeernode.go b/core/livepeernode.go index fdb501f0ca..6d45c79e9a 100644 --- a/core/livepeernode.go +++ b/core/livepeernode.go @@ -338,7 +338,7 @@ func (n *LivepeerNode) UpdateNetworkCapabilities(orchNetworkCapabilities []*comm n.NetworkCapabilities.Orchestrators = orchNetworkCapabilities if lpmon.Enabled { - lpmon.SendQueueEventAsync("network_capabilities", orchNetworkCapabilities) + lpmon.QueueEvent("network_capabilities", orchNetworkCapabilities) } return nil diff --git a/discovery/discovery.go b/discovery/discovery.go index 0135aa85a9..3ccb3e7984 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -319,7 +319,7 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator "latency_ms": strconv.FormatInt(o.LocalInfo.Latency.Milliseconds(), 10), }) } - monitor.SendQueueEventAsync("discovery_results", discoveryResults) + monitor.QueueEvent("discovery_results", discoveryResults) } clog.Infof(ctx, "Done fetching orch info orchs=%d/%d responses=%d/%d timedOut=%t", len(ods), numOrchestrators, nbResp, maxOrchNodes, timedOut) diff --git a/monitor/backend_grpc.go b/monitor/backend_grpc.go new file mode 100644 index 0000000000..20f628167b --- /dev/null +++ b/monitor/backend_grpc.go @@ -0,0 +1,150 @@ +package monitor + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/url" + "strings" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/metadata" +) + +const grpcCodecName = "json" + +type jsonCodec struct{} + +func (jsonCodec) Name() string { + return grpcCodecName +} + +func (jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (jsonCodec) Unmarshal(data []byte, v interface{}) error { + if v == nil { + return nil + } + return json.Unmarshal(data, v) +} + +type grpcBackend struct { + conn *grpc.ClientConn + method string + headers map[string]string + timeout time.Duration +} + +func init() { + encoding.RegisterCodec(jsonCodec{}) + RegisterBackendFactory("grpc", newGRPCBackend) + RegisterBackendFactory("grpcs", newGRPCBackend) +} + +func newGRPCBackend(u *url.URL, opts BackendOptions) (EventBackend, error) { + method := strings.TrimSpace(u.Fragment) + if method == "" { + method = strings.TrimPrefix(u.Path, "/") + } + if method == "" { + return nil, fmt.Errorf("grpc sink %q missing method (set path or fragment)", u.String()) + } + if !strings.Contains(method, ".") && !strings.Contains(method, "/") { + return nil, fmt.Errorf("grpc sink method %q must be fully qualified", method) + } + if !strings.HasPrefix(method, "/") { + method = "/" + method + method = strings.ReplaceAll(method, "//", "/") + } + + timeout := 10 * time.Second + if rawTimeout := u.Query().Get("timeout"); rawTimeout != "" { + dur, err := time.ParseDuration(rawTimeout) + if err != nil { + return nil, fmt.Errorf("invalid timeout %q for grpc sink %s: %w", rawTimeout, u.String(), err) + } + timeout = dur + } + + dialCtx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + dialOpts := []grpc.DialOption{grpc.WithBlock()} + + if strings.EqualFold(u.Scheme, "grpcs") { + tlsCfg := &tls.Config{} + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))) + } else { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + targetHost := u.Host + if targetHost == "" { + return nil, fmt.Errorf("grpc sink %q missing host", u.String()) + } + + conn, err := grpc.DialContext(dialCtx, targetHost, dialOpts...) + if err != nil { + return nil, fmt.Errorf("dial grpc sink %s: %w", u.String(), err) + } + + return &grpcBackend{ + conn: conn, + method: method, + headers: opts.Headers, + timeout: timeout, + }, nil +} + +func (b *grpcBackend) Start(_ context.Context) error { + return nil +} + +func (b *grpcBackend) Publish(ctx context.Context, batch []EventEnvelope) error { + if len(batch) == 0 { + return nil + } + + callCtx, cancel := context.WithTimeout(ctx, b.timeout) + defer cancel() + + if len(b.headers) > 0 { + pairs := make([]string, 0, len(b.headers)*2) + for k, v := range b.headers { + key := strings.ToLower(k) + pairs = append(pairs, key, v) + } + md := metadata.Pairs(pairs...) + callCtx = metadata.NewOutgoingContext(callCtx, md) + } + + var resp json.RawMessage + if err := b.conn.Invoke(callCtx, b.method, batch, &resp, grpc.CallContentSubtype(grpcCodecName)); err != nil { + return err + } + return nil +} + +func (b *grpcBackend) Stop(ctx context.Context) error { + done := make(chan struct{}) + go func() { + defer close(done) + if err := b.conn.Close(); err != nil { + // connection close errors are logged but not fatal + } + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/monitor/backend_http.go b/monitor/backend_http.go new file mode 100644 index 0000000000..af1005980e --- /dev/null +++ b/monitor/backend_http.go @@ -0,0 +1,100 @@ +package monitor + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "time" +) + +type httpBackend struct { + client *http.Client + target string + headers map[string]string + method string +} + +func init() { + RegisterBackendFactory("http", newHTTPBackend) + RegisterBackendFactory("https", newHTTPBackend) +} + +func newHTTPBackend(u *url.URL, opts BackendOptions) (EventBackend, error) { + query := u.Query() + + method := strings.ToUpper(strings.TrimSpace(query.Get("method"))) + if method == "" { + method = http.MethodPost + } + + timeout := 10 * time.Second + if rawTimeout := strings.TrimSpace(query.Get("timeout")); rawTimeout != "" { + dur, err := time.ParseDuration(rawTimeout) + if err != nil { + return nil, fmt.Errorf("invalid timeout %q for http sink %s: %w", rawTimeout, u.String(), err) + } + timeout = dur + } + + // remove internal configuration parameters + query.Del("method") + query.Del("timeout") + cleaned := *u + cleaned.RawQuery = query.Encode() + + headers := make(map[string]string, len(opts.Headers)) + for k, v := range opts.Headers { + headers[k] = v + } + + return &httpBackend{ + client: &http.Client{Timeout: timeout}, + target: cleaned.String(), + headers: headers, + method: method, + }, nil +} + +func (b *httpBackend) Start(_ context.Context) error { + return nil +} + +func (b *httpBackend) Publish(ctx context.Context, batch []EventEnvelope) error { + if len(batch) == 0 { + return nil + } + + body, err := json.Marshal(batch) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, b.method, b.target, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + for k, v := range b.headers { + req.Header.Set(k, v) + } + + resp, err := b.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + + return fmt.Errorf("http sink %s returned status %d", b.target, resp.StatusCode) +} + +func (b *httpBackend) Stop(_ context.Context) error { + return nil +} diff --git a/monitor/backend_kafka.go b/monitor/backend_kafka.go new file mode 100644 index 0000000000..7bc4e0b818 --- /dev/null +++ b/monitor/backend_kafka.go @@ -0,0 +1,285 @@ +package monitor + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/golang/glog" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" +) + +const ( + KafkaBatchInterval = 1 * time.Second + KafkaRequestTimeout = 60 * time.Second + KafkaBatchSize = 100 + KafkaChannelSize = 100 +) + +type gatewayEvent struct { + ID *string `json:"id,omitempty"` + Type *string `json:"type"` + Timestamp *string `json:"timestamp"` + Gateway *string `json:"gateway,omitempty"` + Data interface{} `json:"data"` +} + +type kafkaProducer struct { + writer *kafka.Writer + topic string + events chan gatewayEvent + gatewayAddress string +} + +type kafkaBackend struct { + producer *kafkaProducer + wg sync.WaitGroup +} + +func init() { + RegisterBackendFactory("kafka", newKafkaBackend) +} + +func newKafkaBackend(u *url.URL, opts BackendOptions) (EventBackend, error) { + query := u.Query() + + topic := strings.TrimSpace(query.Get("topic")) + if topic == "" { + return nil, fmt.Errorf("kafka sink %q missing topic", u.String()) + } + + brokers := splitAndTrim(query.Get("brokers"), ",") + if len(brokers) == 0 { + host := strings.TrimSpace(u.Host) + if host == "" { + return nil, fmt.Errorf("kafka sink %q missing brokers", u.String()) + } + brokers = []string{host} + } + + username := "" + password := "" + if u.User != nil { + username = u.User.Username() + password, _ = u.User.Password() + } + + producer, err := newKafkaProducer(brokers, username, password, topic, opts.GatewayAddress) + if err != nil { + return nil, err + } + + return &kafkaBackend{producer: producer}, nil +} + +func newKafkaProducer(brokers []string, user, password, topic, gatewayAddress string) (*kafkaProducer, error) { + if len(brokers) == 0 { + return nil, fmt.Errorf("kafka producer requires at least one broker") + } + + dialer := &kafka.Dialer{Timeout: KafkaRequestTimeout, DualStack: true} + + if user != "" { + dialer.TLS = &tls.Config{MinVersion: tls.VersionTLS12} + dialer.SASLMechanism = &plain.Mechanism{Username: user, Password: password} + } + + writer := kafka.NewWriter(kafka.WriterConfig{ + Brokers: brokers, + Topic: topic, + Balancer: kafka.CRC32Balancer{}, + Dialer: dialer, + }) + + return &kafkaProducer{ + writer: writer, + topic: topic, + events: make(chan gatewayEvent, KafkaChannelSize), + gatewayAddress: gatewayAddress, + }, nil +} + +func (b *kafkaBackend) Start(ctx context.Context) error { + b.wg.Add(1) + go func() { + defer b.wg.Done() + b.producer.run(ctx) + }() + return nil +} + +func (b *kafkaBackend) Publish(_ context.Context, batch []EventEnvelope) error { + for _, evt := range batch { + payload, err := decodePayload(evt.Payload) + if err != nil { + glog.Errorf("kafka backend payload decode error for event %s: %v", evt.ID, err) + payload = string(evt.Payload) + } + + id := evt.ID + ts := evt.Timestamp + tsMillis := ts.UnixMilli() + tsStr := strconv.FormatInt(tsMillis, 10) + + kafkaEvt := gatewayEvent{ + Data: payload, + Timestamp: stringPtr(tsStr), + } + + if id != "" { + kafkaEvt.ID = stringPtr(id) + } + + if evt.Type != "" { + kafkaEvt.Type = stringPtr(evt.Type) + } + + if b.producer.gatewayAddress != "" { + kafkaEvt.Gateway = stringPtr(b.producer.gatewayAddress) + } + + b.producer.enqueue(kafkaEvt) + } + return nil +} + +func (b *kafkaBackend) Stop(ctx context.Context) error { + done := make(chan struct{}) + go func() { + defer close(done) + b.wg.Wait() + }() + + select { + case <-done: + case <-ctx.Done(): + } + + return b.producer.close() +} + +func (p *kafkaProducer) enqueue(evt gatewayEvent) { + select { + case p.events <- evt: + default: + if evt.Type != nil { + glog.Warningf("kafka producer event queue is full, dropping event %q", *evt.Type) + } else { + glog.Warning("kafka producer event queue is full, dropping event with unknown type") + } + } +} + +func (p *kafkaProducer) run(ctx context.Context) { + ticker := time.NewTicker(KafkaBatchInterval) + defer ticker.Stop() + + eventsBatch := make([]kafka.Message, 0, KafkaBatchSize) + + flush := func() { + if len(eventsBatch) == 0 { + return + } + p.sendBatch(eventsBatch) + eventsBatch = eventsBatch[:0] + } + + for { + select { + case <-ctx.Done(): + flush() + return + case event, ok := <-p.events: + if !ok { + flush() + return + } + + value, err := json.Marshal(event) + if err != nil { + glog.Errorf("error while marshalling gateway event to Kafka, err=%v", err) + continue + } + + key := "" + if event.ID != nil { + key = *event.ID + } + if key == "" { + key = strconv.FormatInt(time.Now().UnixNano(), 10) + } + + msg := kafka.Message{Key: []byte(key), Value: value} + eventsBatch = append(eventsBatch, msg) + + if len(eventsBatch) >= KafkaBatchSize { + flush() + } + case <-ticker.C: + flush() + } + } +} + +func (p *kafkaProducer) sendBatch(eventsBatch []kafka.Message) { + if len(eventsBatch) == 0 { + return + } + + kafkaWriteRetries := 3 + var writeErr error + for i := 0; i < kafkaWriteRetries; i++ { + writeErr = p.writer.WriteMessages(context.Background(), eventsBatch...) + if writeErr == nil { + return + } + glog.Warningf("error while sending gateway log batch to Kafka, retrying, topic=%s, try=%d, err=%v", p.topic, i, writeErr) + } + if writeErr != nil { + glog.Errorf("error while sending gateway log batch to Kafka, the gateway logs are lost, err=%v", writeErr) + } +} + +func (p *kafkaProducer) close() error { + return p.writer.Close() +} + +func decodePayload(raw json.RawMessage) (interface{}, error) { + if raw == nil { + return nil, nil + } + if len(raw) == 0 { + return nil, nil + } + var decoded interface{} + if err := json.Unmarshal(raw, &decoded); err != nil { + return nil, err + } + return decoded, nil +} + +func splitAndTrim(raw, sep string) []string { + if raw == "" { + return nil + } + parts := strings.Split(raw, sep) + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +func stringPtr(s string) *string { + return &s +} diff --git a/monitor/backend_mqtt.go b/monitor/backend_mqtt.go new file mode 100644 index 0000000000..651fddbdbf --- /dev/null +++ b/monitor/backend_mqtt.go @@ -0,0 +1,262 @@ +package monitor + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "math/rand" + "net" + "net/url" + "strconv" + "strings" + "time" +) + +type mqttBackend struct { + address string + topic string + username string + password string + useTLS bool + keepAlive uint16 + qos byte + retain bool + timeout time.Duration +} + +func init() { + RegisterBackendFactory("mqtt", newMQTTBackend) + RegisterBackendFactory("mqtts", newMQTTBackend) +} + +func newMQTTBackend(u *url.URL, _ BackendOptions) (EventBackend, error) { + if u.Host == "" { + return nil, fmt.Errorf("mqtt sink %q missing host", u.String()) + } + + topic := strings.TrimPrefix(u.Path, "/") + if topic == "" { + topic = u.Query().Get("topic") + } + if topic == "" { + return nil, fmt.Errorf("mqtt sink %q missing topic", u.String()) + } + + keepAlive := uint16(60) + if rawKA := u.Query().Get("keepalive"); rawKA != "" { + val, err := strconv.Atoi(rawKA) + if err != nil || val < 0 || val > 0xFFFF { + return nil, fmt.Errorf("invalid keepalive %q for mqtt sink %s", rawKA, u.String()) + } + keepAlive = uint16(val) + } + + qos := byte(0) + if rawQoS := u.Query().Get("qos"); rawQoS != "" { + val, err := strconv.Atoi(rawQoS) + if err != nil || val != 0 { + return nil, fmt.Errorf("unsupported qos %q for mqtt sink %s (only QoS 0 supported)", rawQoS, u.String()) + } + } + + retain := false + if rawRetain := u.Query().Get("retain"); rawRetain != "" { + retain = rawRetain == "1" || strings.EqualFold(rawRetain, "true") + } + + timeout := 5 * time.Second + if rawTimeout := u.Query().Get("timeout"); rawTimeout != "" { + dur, err := time.ParseDuration(rawTimeout) + if err != nil { + return nil, fmt.Errorf("invalid timeout %q for mqtt sink %s: %w", rawTimeout, u.String(), err) + } + timeout = dur + } + + username := "" + password := "" + if u.User != nil { + username = u.User.Username() + password, _ = u.User.Password() + } + + useTLS := strings.EqualFold(u.Scheme, "mqtts") || u.Query().Get("tls") == "1" || strings.EqualFold(u.Query().Get("tls"), "true") + + return &mqttBackend{ + address: u.Host, + topic: topic, + username: username, + password: password, + useTLS: useTLS, + keepAlive: keepAlive, + qos: qos, + retain: retain, + timeout: timeout, + }, nil +} + +func (b *mqttBackend) Start(_ context.Context) error { + return nil +} + +func (b *mqttBackend) Publish(ctx context.Context, batch []EventEnvelope) error { + if len(batch) == 0 { + return nil + } + + payload, err := json.Marshal(batch) + if err != nil { + return err + } + + dialer := &net.Dialer{Timeout: b.timeout} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } + + var conn net.Conn + if b.useTLS { + tlsConfig := &tls.Config{ServerName: hostOnly(b.address)} + conn, err = tls.DialWithDialer(dialer, "tcp", b.address, tlsConfig) + } else { + conn, err = dialer.DialContext(ctx, "tcp", b.address) + } + if err != nil { + return err + } + defer conn.Close() + + clientID := fmt.Sprintf("livepeer-%06d", rand.Intn(999999)) + + if err := sendConnectPacket(conn, clientID, b.username, b.password, b.keepAlive); err != nil { + return err + } + + if err := awaitConnAck(conn, b.timeout); err != nil { + return err + } + + if err := sendPublishPacket(conn, b.topic, payload, b.qos, b.retain); err != nil { + return err + } + + sendDisconnectPacket(conn) + return nil +} + +func (b *mqttBackend) Stop(_ context.Context) error { + return nil +} + +func hostOnly(address string) string { + if idx := strings.LastIndex(address, ":"); idx != -1 { + return address[:idx] + } + return address +} + +func sendConnectPacket(conn net.Conn, clientID, username, password string, keepAlive uint16) error { + var payload []byte + payload = appendString(payload, clientID) + + flags := byte(0) + if username != "" { + flags |= 0x80 + payload = appendString(payload, username) + } + if password != "" { + flags |= 0x40 + payload = appendString(payload, password) + } + + variableHeader := []byte{ + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, // protocol level 4 + flags, + byte(keepAlive >> 8), byte(keepAlive & 0xFF), + } + + remainingLength := len(variableHeader) + len(payload) + packet := []byte{0x10} + packet = append(packet, encodeRemainingLength(remainingLength)...) + packet = append(packet, variableHeader...) + packet = append(packet, payload...) + + _, err := conn.Write(packet) + return err +} + +func awaitConnAck(conn net.Conn, timeout time.Duration) error { + if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { + return err + } + header := make([]byte, 2) + if _, err := io.ReadFull(conn, header); err != nil { + return err + } + if header[0]>>4 != 0x02 { + return fmt.Errorf("unexpected MQTT packet 0x%X", header[0]) + } + remLen := int(header[1]) + buf := make([]byte, remLen) + if _, err := conn.Read(buf); err != nil { + return err + } + if len(buf) < 2 || buf[1] != 0 { + return fmt.Errorf("mqtt connection refused: %v", buf) + } + return nil +} + +func sendPublishPacket(conn net.Conn, topic string, payload []byte, qos byte, retain bool) error { + fixedHeader := byte(0x30) + if retain { + fixedHeader |= 0x01 + } + fixedHeader |= qos << 1 + + var packet []byte + packet = append(packet, fixedHeader) + + variableHeader := appendString(nil, topic) + if qos > 0 { + // Packet Identifier set to 1 for simplicity + variableHeader = append(variableHeader, 0x00, 0x01) + } + + remainingLength := len(variableHeader) + len(payload) + packet = append(packet, encodeRemainingLength(remainingLength)...) + packet = append(packet, variableHeader...) + packet = append(packet, payload...) + + _, err := conn.Write(packet) + return err +} + +func sendDisconnectPacket(conn net.Conn) { + conn.Write([]byte{0xE0, 0x00}) +} + +func appendString(dst []byte, s string) []byte { + dst = append(dst, byte(len(s)>>8), byte(len(s))) + dst = append(dst, s...) + return dst +} + +func encodeRemainingLength(length int) []byte { + var encoded []byte + for { + digit := byte(length % 128) + length /= 128 + if length > 0 { + digit |= 0x80 + } + encoded = append(encoded, digit) + if length == 0 { + break + } + } + return encoded +} diff --git a/monitor/backend_websocket.go b/monitor/backend_websocket.go new file mode 100644 index 0000000000..337bc8153e --- /dev/null +++ b/monitor/backend_websocket.go @@ -0,0 +1,99 @@ +package monitor + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "net/url" + "time" + + "golang.org/x/net/websocket" +) + +type websocketBackend struct { + target string + origin string + headers http.Header + timeout time.Duration +} + +func init() { + RegisterBackendFactory("ws", newWebSocketBackend) + RegisterBackendFactory("wss", newWebSocketBackend) +} + +func newWebSocketBackend(u *url.URL, opts BackendOptions) (EventBackend, error) { + query := u.Query() + + origin := query.Get("origin") + if origin == "" { + origin = fmt.Sprintf("http://%s", u.Host) + } + + timeout := 10 * time.Second + if rawTimeout := query.Get("timeout"); rawTimeout != "" { + dur, err := time.ParseDuration(rawTimeout) + if err != nil { + return nil, fmt.Errorf("invalid timeout %q for websocket sink %s: %w", rawTimeout, u.String(), err) + } + timeout = dur + } + + query.Del("origin") + query.Del("timeout") + cleaned := *u + cleaned.RawQuery = query.Encode() + + header := make(http.Header, len(opts.Headers)) + for k, v := range opts.Headers { + header.Set(k, v) + } + + return &websocketBackend{ + target: cleaned.String(), + origin: origin, + headers: header, + timeout: timeout, + }, nil +} + +func (b *websocketBackend) Start(_ context.Context) error { + return nil +} + +func (b *websocketBackend) Publish(ctx context.Context, batch []EventEnvelope) error { + if len(batch) == 0 { + return nil + } + + payload, err := json.Marshal(batch) + if err != nil { + return err + } + + cfg, err := websocket.NewConfig(b.target, b.origin) + if err != nil { + return err + } + cfg.Header = b.headers.Clone() + + dialer := &net.Dialer{Timeout: b.timeout} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } + cfg.Dialer = dialer + + conn, err := websocket.DialConfig(cfg) + if err != nil { + return err + } + defer conn.Close() + + return websocket.Message.Send(conn, payload) +} + +func (b *websocketBackend) Stop(_ context.Context) error { + return nil +} diff --git a/monitor/kafka.go b/monitor/kafka.go deleted file mode 100644 index 4629d0a14d..0000000000 --- a/monitor/kafka.go +++ /dev/null @@ -1,177 +0,0 @@ -package monitor - -import ( - "context" - "crypto/tls" - "encoding/json" - "fmt" - "time" - - "github.com/golang/glog" - "github.com/google/uuid" - "github.com/segmentio/kafka-go" - "github.com/segmentio/kafka-go/sasl/plain" -) - -const ( - KafkaBatchInterval = 1 * time.Second - KafkaRequestTimeout = 60 * time.Second - KafkaBatchSize = 100 - KafkaChannelSize = 100 -) - -type KafkaProducer struct { - writer *kafka.Writer - topic string - events chan GatewayEvent - gatewayAddress string -} - -type GatewayEvent struct { - ID *string `json:"id,omitempty"` - Type *string `json:"type"` - Timestamp *string `json:"timestamp"` - Gateway *string `json:"gateway,omitempty"` - Data interface{} `json:"data"` -} - -type PipelineStatus struct { - Pipeline string `json:"pipeline"` - StartTime float64 `json:"start_time"` - LastParamsUpdateTime float64 `json:"last_params_update_time"` - LastParams interface{} `json:"last_params"` - LastParamsHash string `json:"last_params_hash"` - InputFPS float64 `json:"input_fps"` - OutputFPS float64 `json:"output_fps"` - LastInputTime float64 `json:"last_input_time"` - LastOutputTime float64 `json:"last_output_time"` - RestartCount int `json:"restart_count"` - LastRestartTime float64 `json:"last_restart_time"` - LastRestartLogs []string `json:"last_restart_logs"` - LastError *string `json:"last_error"` - StreamID *string `json:"stream_id"` -} - -var kafkaProducer *KafkaProducer - -func InitKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress string) error { - producer, err := newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress) - if err != nil { - return err - } - kafkaProducer = producer - go producer.processEvents() - return nil -} - -func newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress string) (*KafkaProducer, error) { - dialer := &kafka.Dialer{ - Timeout: KafkaRequestTimeout, - DualStack: true, - } - - if user != "" && password != "" { - tls := &tls.Config{ - MinVersion: tls.VersionTLS12, - } - sasl := &plain.Mechanism{ - Username: user, - Password: password, - } - dialer.SASLMechanism = sasl - dialer.TLS = tls - } - - writer := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{bootstrapServers}, - Topic: topic, - Balancer: kafka.CRC32Balancer{}, - Dialer: dialer, - }) - - return &KafkaProducer{ - writer: writer, - topic: topic, - events: make(chan GatewayEvent, KafkaChannelSize), - gatewayAddress: gatewayAddress, - }, nil -} - -func (p *KafkaProducer) processEvents() { - ticker := time.NewTicker(KafkaBatchInterval) - defer ticker.Stop() - - var eventsBatch []kafka.Message - - for { - select { - case event := <-p.events: - value, err := json.Marshal(event) - if err != nil { - glog.Errorf("error while marshalling gateway log to Kafka, err=%v", err) - continue - } - - msg := kafka.Message{ - Key: []byte(*event.ID), - Value: value, - } - eventsBatch = append(eventsBatch, msg) - - // Send batch if it reaches the defined size - if len(eventsBatch) >= KafkaBatchSize { - p.sendBatch(eventsBatch) - eventsBatch = nil - } - - case <-ticker.C: - if len(eventsBatch) > 0 { - p.sendBatch(eventsBatch) - eventsBatch = nil - } - } - } -} - -func (p *KafkaProducer) sendBatch(eventsBatch []kafka.Message) { - // We retry sending messages to Kafka in case of a failure - kafkaWriteRetries := 3 - var writeErr error - for i := 0; i < kafkaWriteRetries; i++ { - writeErr = p.writer.WriteMessages(context.Background(), eventsBatch...) - if writeErr == nil { - return - } - glog.Warningf("error while sending gateway log batch to Kafka, retrying, topic=%s, try=%d, err=%v", p.topic, i, writeErr) - } - if writeErr != nil { - glog.Errorf("error while sending gateway log batch to Kafka, the gateway logs are lost, err=%v", writeErr) - } -} - -func SendQueueEventAsync(eventType string, data interface{}) { - if kafkaProducer == nil { - return - } - - randomID := uuid.New().String() - timestampMs := time.Now().UnixMilli() - - event := GatewayEvent{ - ID: stringPtr(randomID), - Gateway: stringPtr(kafkaProducer.gatewayAddress), - Type: &eventType, - Timestamp: stringPtr(fmt.Sprint(timestampMs)), - Data: data, - } - - select { - case kafkaProducer.events <- event: - default: - glog.Warningf("kafka producer event queue is full, dropping event %q", eventType) - } -} - -func stringPtr(s string) *string { - return &s -} diff --git a/monitor/publisher.go b/monitor/publisher.go new file mode 100644 index 0000000000..8edff45cdc --- /dev/null +++ b/monitor/publisher.go @@ -0,0 +1,322 @@ +package monitor + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/golang/glog" + "github.com/google/uuid" +) + +type EventEnvelope struct { + ID string `json:"id"` + Type string `json:"type"` + Timestamp time.Time `json:"timestamp"` + Gateway string `json:"gateway,omitempty"` + Payload json.RawMessage `json:"payload"` +} + +type PublisherConfig struct { + GatewayAddress string + SinkURLs []string + Headers map[string]string + QueueSize int + BatchSize int + FlushInterval time.Duration +} + +type BackendOptions struct { + Headers map[string]string + GatewayAddress string +} + +type EventBackend interface { + Start(ctx context.Context) error + Publish(ctx context.Context, batch []EventEnvelope) error + Stop(ctx context.Context) error +} + +type BackendFactory func(u *url.URL, opts BackendOptions) (EventBackend, error) + +type backendEntry struct { + name string + backend EventBackend +} + +type publisher struct { + ctx context.Context + cancel context.CancelFunc + queue chan EventEnvelope + batchSize int + flushInterval time.Duration + gateway string + backends []backendEntry + wg sync.WaitGroup +} + +var ( + publisherMu sync.RWMutex + activePublisher *publisher + backendFactories = make(map[string]BackendFactory) + factoriesMu sync.RWMutex +) + +func RegisterBackendFactory(scheme string, factory BackendFactory) { + factoriesMu.Lock() + defer factoriesMu.Unlock() + backendFactories[strings.ToLower(scheme)] = factory +} + +func InitEventPublisher(cfg PublisherConfig) error { + if len(cfg.SinkURLs) == 0 { + return fmt.Errorf("event publisher requires at least one sink URL") + } + + factoriesMu.RLock() + defer factoriesMu.RUnlock() + if len(backendFactories) == 0 { + return fmt.Errorf("no event backend factories registered") + } + + pub, err := newPublisher(cfg) + if err != nil { + return err + } + + publisherMu.Lock() + if activePublisher != nil { + go func(old *publisher) { + if err := old.Stop(context.Background()); err != nil { + glog.Errorf("event publisher shutdown error: %v", err) + } + }(activePublisher) + } + activePublisher = pub + publisherMu.Unlock() + + return nil +} + +func ShutdownEventPublisher(ctx context.Context) error { + publisherMu.Lock() + pub := activePublisher + activePublisher = nil + publisherMu.Unlock() + if pub == nil { + return nil + } + return pub.Stop(ctx) +} + +func QueueEvent(eventType string, payload interface{}) { + publisherMu.RLock() + pub := activePublisher + publisherMu.RUnlock() + + if pub == nil { + return + } + + envelope, err := pub.buildEnvelope(eventType, payload) + if err != nil { + glog.Errorf("event publisher failed to encode payload for %s: %v", eventType, err) + return + } + + select { + case pub.queue <- envelope: + default: + glog.Warningf("event publisher queue full, dropping event %q", eventType) + } +} + +func newPublisher(cfg PublisherConfig) (*publisher, error) { + queueSize := cfg.QueueSize + if queueSize <= 0 { + queueSize = 100 + } + batchSize := cfg.BatchSize + if batchSize <= 0 { + batchSize = 100 + } + flushInterval := cfg.FlushInterval + if flushInterval <= 0 { + flushInterval = time.Second + } + + ctx, cancel := context.WithCancel(context.Background()) + pub := &publisher{ + ctx: ctx, + cancel: cancel, + queue: make(chan EventEnvelope, queueSize), + batchSize: batchSize, + flushInterval: flushInterval, + gateway: cfg.GatewayAddress, + } + + opts := BackendOptions{ + Headers: cfg.Headers, + GatewayAddress: cfg.GatewayAddress, + } + + for _, raw := range cfg.SinkURLs { + raw = strings.TrimSpace(raw) + if raw == "" { + continue + } + u, err := url.Parse(raw) + if err != nil { + cancel() + return nil, fmt.Errorf("parse sink url %q: %w", raw, err) + } + factory, ok := backendFactories[strings.ToLower(u.Scheme)] + if !ok { + cancel() + return nil, fmt.Errorf("no backend registered for scheme %q", u.Scheme) + } + backend, err := factory(u, opts) + if err != nil { + cancel() + return nil, fmt.Errorf("init backend %q: %w", raw, err) + } + if err := backend.Start(ctx); err != nil { + cancel() + backend.Stop(context.Background()) + return nil, fmt.Errorf("start backend %q: %w", raw, err) + } + pub.backends = append(pub.backends, backendEntry{name: raw, backend: backend}) + } + + if len(pub.backends) == 0 { + cancel() + return nil, fmt.Errorf("no valid event sinks configured") + } + + pub.wg.Add(1) + go pub.run() + + return pub, nil +} + +func (p *publisher) Stop(ctx context.Context) error { + p.cancel() + close(p.queue) + + stopped := make(chan struct{}) + go func() { + defer close(stopped) + p.wg.Wait() + }() + + select { + case <-stopped: + case <-ctx.Done(): + } + + var errs []string + for _, entry := range p.backends { + if err := entry.backend.Stop(ctx); err != nil { + errs = append(errs, fmt.Sprintf("%s: %v", entry.name, err)) + } + } + if len(errs) > 0 { + return fmt.Errorf("backend shutdown errors: %s", strings.Join(errs, "; ")) + } + return nil +} + +func (p *publisher) run() { + defer p.wg.Done() + + ticker := time.NewTimer(p.flushInterval) + defer ticker.Stop() + + batch := make([]EventEnvelope, 0, p.batchSize) + + flush := func() { + if len(batch) == 0 { + return + } + copyBatch := make([]EventEnvelope, len(batch)) + copy(copyBatch, batch) + for _, entry := range p.backends { + if err := entry.backend.Publish(p.ctx, copyBatch); err != nil { + glog.Errorf("event publisher backend %s publish error: %v", entry.name, err) + } + } + batch = batch[:0] + } + + for { + select { + case <-p.ctx.Done(): + flush() + return + case evt, ok := <-p.queue: + if !ok { + flush() + return + } + batch = append(batch, evt) + if len(batch) >= p.batchSize { + flush() + if !ticker.Stop() { + <-ticker.C + } + ticker.Reset(p.flushInterval) + } + case <-ticker.C: + flush() + ticker.Reset(p.flushInterval) + } + } +} + +func (p *publisher) buildEnvelope(eventType string, payload interface{}) (EventEnvelope, error) { + raw, err := normalizePayload(payload) + if err != nil { + return EventEnvelope{}, err + } + return EventEnvelope{ + ID: uuid.NewString(), + Type: eventType, + Timestamp: time.Now().UTC(), + Gateway: p.gateway, + Payload: raw, + }, nil +} + +func normalizePayload(payload interface{}) (json.RawMessage, error) { + if payload == nil { + return json.RawMessage([]byte("null")), nil + } + + switch v := payload.(type) { + case json.RawMessage: + clone := make([]byte, len(v)) + copy(clone, v) + return json.RawMessage(clone), nil + case []byte: + if json.Valid(v) { + clone := make([]byte, len(v)) + copy(clone, v) + return json.RawMessage(clone), nil + } + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + return json.RawMessage(b), nil + default: + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + return json.RawMessage(b), nil + } +} diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 12e1898c5d..67a29935cd 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -156,7 +156,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara // no error, all done, let's leave if monitor.Enabled && firstSegment { firstSegment = false - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "gateway_send_first_ingest_segment", "timestamp": time.Now().UnixMilli(), "stream_id": params.liveParams.streamID, @@ -335,7 +335,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa delayMs := time.Since(params.liveParams.startTime).Milliseconds() if monitor.Enabled { monitor.AIFirstSegmentDelay(delayMs, params.liveParams.sess.OrchestratorInfo) - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "gateway_receive_first_processed_segment", "timestamp": time.Now().UnixMilli(), "stream_id": params.liveParams.streamID, @@ -353,7 +353,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa if segmentsReceived == 3 && monitor.Enabled { // We assume that after receiving 3 segments, the runner started successfully // and we should be able to start the playback - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "gateway_receive_few_processed_segments", "timestamp": time.Now().UnixMilli(), "stream_id": params.liveParams.streamID, @@ -538,7 +538,7 @@ func startControlPublish(ctx context.Context, control *url.URL, params aiRequest reportUpdate := func(data []byte) { // send the param update to kafka - monitor.SendQueueEventAsync("ai_stream_events", map[string]interface{}{ + monitor.QueueEvent("ai_stream_events", map[string]interface{}{ "type": "params_update", "stream_id": params.liveParams.streamID, "request_id": params.liveParams.requestID, @@ -759,7 +759,7 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar StreamStatusStore.Store(streamId, event) } - monitor.SendQueueEventAsync(queueEventType, event) + monitor.QueueEvent(queueEventType, event) } }() @@ -853,7 +853,7 @@ func LiveErrorEventSender(ctx context.Context, streamID string, event map[string ev := maps.Clone(event) ev["capability"] = clog.GetVal(ctx, "capability") ev["message"] = err.Error() - monitor.SendQueueEventAsync("ai_stream_events", ev) + monitor.QueueEvent("ai_stream_events", ev) } } diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index f368f9feb8..3d9a96e831 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -572,7 +572,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { GatewayStatus.Clear(streamID) GatewayStatus.StoreKey(streamID, "whep_url", whepURL) - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "gateway_receive_stream_request", "timestamp": streamRequestTime, "stream_id": streamID, @@ -647,7 +647,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { ms := media.MediaSegmenter{Workdir: ls.LivepeerNode.WorkDir, MediaMTXClient: mediaMTXClient} ms.RunSegmentation(segmenterCtx, mediaMTXInputURL, ssr.Read) sendErrorEvent(errors.New("mediamtx ingest disconnected")) - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "gateway_ingest_stream_closed", "timestamp": time.Now().UnixMilli(), "stream_id": streamID, @@ -726,7 +726,7 @@ func processStream(ctx context.Context, params aiRequestParams, req worker.GenLi err = errors.New("unknown swap reason") } // report the swap - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "orchestrator_swap", "stream_id": params.liveParams.streamID, "request_id": params.liveParams.requestID, @@ -1043,7 +1043,7 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler { GatewayStatus.Clear(streamID) GatewayStatus.StoreKey(streamID, "whep_url", whepURL) - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "gateway_receive_stream_request", "timestamp": streamRequestTime, "stream_id": streamID, @@ -1070,7 +1070,7 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler { err = errors.New("whip disconnected") } sendErrorEvent(err) - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "gateway_ingest_stream_closed", "timestamp": time.Now().UnixMilli(), "stream_id": streamID, @@ -1169,7 +1169,7 @@ func runStats(ctx context.Context, whipConn *media.WHIPConnection, streamID stri "stats": stats, }) - monitor.SendQueueEventAsync("stream_ingest_metrics", map[string]interface{}{ + monitor.QueueEvent("stream_ingest_metrics", map[string]interface{}{ "timestamp": time.Now().UnixMilli(), "stream_id": streamID, "pipeline_id": pipelineID, diff --git a/server/ai_process.go b/server/ai_process.go index cc50b380dd..74f60487eb 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -1561,7 +1561,7 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface if monitor.Enabled { monitor.AIRequestError(errMsg, monitor.ToPipeline(capName), modelID, nil) } - monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + monitor.QueueEvent("stream_trace", map[string]interface{}{ "type": "gateway_no_orchestrators_available", "timestamp": time.Now().UnixMilli(), "stream_id": params.liveParams.streamID, diff --git a/server/segment_rpc.go b/server/segment_rpc.go index 58b030958a..3f2d964e05 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -874,7 +874,7 @@ func genPayment(ctx context.Context, sess *BroadcastSession, numTickets int) (st requestID := clog.GetVal(ctx, "request_id") capability := clog.GetVal(ctx, "capability") - monitor.SendQueueEventAsync("create_new_payment", map[string]string{ + monitor.QueueEvent("create_new_payment", map[string]string{ "clientIP": clientIP, "requestID": requestID, "capability": capability,