Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions cmd/livepeer/starter/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package starter

import (
"fmt"
"net/url"
"strings"
"time"

"github.com/golang/glog"
lpmon "github.com/livepeer/go-livepeer/monitor"
)

func startEventPublisher(cfg LivepeerConfig) error {
sinkList := splitList(*cfg.EventSinkURIs)
if len(sinkList) == 0 {
legacyKafkaURI, err := buildLegacyKafkaSink(cfg)
if err != nil {
return err
}
if legacyKafkaURI != "" {
sinkList = append(sinkList, legacyKafkaURI)
}
}

if len(sinkList) == 0 {
glog.Warning("event publisher not started: no sinks configured")
return nil
}

headers, err := parseHeaderList(*cfg.EventSinkHeaders)
if err != nil {
return err
}

queueDepth := valueOrDefaultInt(cfg.EventSinkQueueDepth, 100)
batchSize := valueOrDefaultInt(cfg.EventSinkBatchSize, 100)
flushInterval := valueOrDefaultDuration(cfg.EventSinkFlushInterval, time.Second)

publisherCfg := lpmon.PublisherConfig{
GatewayAddress: valueOrDefaultString(cfg.GatewayHost, ""),
QueueSize: queueDepth,
BatchSize: batchSize,
FlushInterval: flushInterval,
SinkURLs: sinkList,
Headers: headers,
}

if err := lpmon.InitEventPublisher(publisherCfg); err != nil {
return fmt.Errorf("init event publisher: %w", err)
}

return nil
}

func splitList(raw string) []string {
if raw == "" {
return nil
}
parts := strings.FieldsFunc(raw, func(r rune) bool {
switch r {
case ',', '\n', ';':
return true
default:
return false
}
})
var out []string
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
out = append(out, p)
}
return out
}

func parseHeaderList(raw string) (map[string]string, error) {
headers := make(map[string]string)
for _, entry := range splitList(raw) {
kv := strings.SplitN(entry, "=", 2)
if len(kv) != 2 {
return nil, fmt.Errorf("invalid header %q, expected Key=Value", entry)
}
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
if key == "" {
return nil, fmt.Errorf("invalid header %q: empty key", entry)
}
headers[key] = value
}
return headers, nil
}

func buildLegacyKafkaSink(cfg LivepeerConfig) (string, error) {
if cfg.KafkaBootstrapServers == nil || cfg.KafkaGatewayTopic == nil {
return "", nil
}
brokers := strings.TrimSpace(*cfg.KafkaBootstrapServers)
topic := strings.TrimSpace(*cfg.KafkaGatewayTopic)
if brokers == "" || topic == "" {
return "", nil
}
var user, password string
if cfg.KafkaUsername != nil {
user = strings.TrimSpace(*cfg.KafkaUsername)
}
if cfg.KafkaPassword != nil {
password = strings.TrimSpace(*cfg.KafkaPassword)
}

brokerList := strings.Split(brokers, ",")
host := strings.TrimSpace(brokerList[0])
if host == "" {
return "", fmt.Errorf("invalid Kafka bootstrap server string %q", brokers)
}

u := url.URL{Scheme: "kafka", Host: host}
if user != "" {
if password != "" {
u.User = url.UserPassword(user, password)
} else {
u.User = url.User(user)
}
}
q := u.Query()
q.Set("topic", topic)
q.Set("brokers", brokers)
u.RawQuery = q.Encode()
return u.String(), nil
}

func valueOrDefaultString(ptr *string, def string) string {
if ptr == nil {
return def
}
if v := strings.TrimSpace(*ptr); v != "" {
return v
}
return def
}

func valueOrDefaultInt(ptr *int, def int) int {
if ptr != nil && *ptr > 0 {
return *ptr
}
return def
}

func valueOrDefaultDuration(ptr *time.Duration, def time.Duration) time.Duration {
if ptr != nil && *ptr > 0 {
return *ptr
}
return def
}
5 changes: 5 additions & 0 deletions cmd/livepeer/starter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
cfg.KafkaUsername = fs.String("kafkaUser", *cfg.KafkaUsername, "Kafka Username")
cfg.KafkaPassword = fs.String("kafkaPassword", *cfg.KafkaPassword, "Kafka Password")
cfg.KafkaGatewayTopic = fs.String("kafkaGatewayTopic", *cfg.KafkaGatewayTopic, "Kafka Topic used to send gateway logs")
cfg.EventSinkURIs = fs.String("eventSinks", *cfg.EventSinkURIs, "List of outbound event sink URIs (comma/semicolon/newline delimited). Scheme selects backend, e.g. kafka://, https://, wss://, grpc://, mqtt://")
cfg.EventSinkHeaders = fs.String("eventSinkHeader", *cfg.EventSinkHeaders, "Extra headers for HTTP/WebSocket/gRPC sinks, format Key=Value; applied to every matching sink")
cfg.EventSinkQueueDepth = fs.Int("eventSinkQueueDepth", *cfg.EventSinkQueueDepth, "Publisher queue size before new events are dropped (default 100)")
cfg.EventSinkBatchSize = fs.Int("eventSinkBatchSize", *cfg.EventSinkBatchSize, "Max events per flush to each sink (default 100)")
cfg.EventSinkFlushInterval = fs.Duration("eventSinkFlushInterval", *cfg.EventSinkFlushInterval, "Max buffering time before flushing a partial batch (default 1s)")

return cfg
}
Expand Down
26 changes: 0 additions & 26 deletions cmd/livepeer/starter/kafka.go

This file was deleted.

30 changes: 23 additions & 7 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ type LivepeerConfig struct {
KafkaUsername *string
KafkaPassword *string
KafkaGatewayTopic *string
EventSinkURIs *string
EventSinkHeaders *string
EventSinkQueueDepth *int
EventSinkBatchSize *int
EventSinkFlushInterval *time.Duration
MediaMTXApiPassword *string
LiveAIAuthApiKey *string
LiveAIHeartbeatURL *string
Expand Down Expand Up @@ -306,6 +311,11 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultKafkaUsername := ""
defaultKafkaPassword := ""
defaultKafkaGatewayTopic := ""
defaultEventSinkURIs := ""
defaultEventSinkHeaders := ""
defaultEventSinkQueueDepth := 100
defaultEventSinkBatchSize := 100
defaultEventSinkFlushInterval := time.Second

return LivepeerConfig{
// Network & Addresses:
Expand Down Expand Up @@ -422,10 +432,15 @@ func DefaultLivepeerConfig() LivepeerConfig {
TestOrchAvail: &defaultTestOrchAvail,

// Gateway logs
KafkaBootstrapServers: &defaultKafkaBootstrapServers,
KafkaUsername: &defaultKafkaUsername,
KafkaPassword: &defaultKafkaPassword,
KafkaGatewayTopic: &defaultKafkaGatewayTopic,
KafkaBootstrapServers: &defaultKafkaBootstrapServers,
KafkaUsername: &defaultKafkaUsername,
KafkaPassword: &defaultKafkaPassword,
KafkaGatewayTopic: &defaultKafkaGatewayTopic,
EventSinkURIs: &defaultEventSinkURIs,
EventSinkHeaders: &defaultEventSinkHeaders,
EventSinkQueueDepth: &defaultEventSinkQueueDepth,
EventSinkBatchSize: &defaultEventSinkBatchSize,
EventSinkFlushInterval: &defaultEventSinkFlushInterval,
}
}

Expand All @@ -445,6 +460,7 @@ func (cfg LivepeerConfig) PrintConfig(w io.Writer) {
"MediaMTXApiPassword": true,
"LiveAIAuthApiKey": true,
"FVfailGsKey": true,
"EventSinkHeaders": true,
}

for i := 0; i < cfgType.NumField(); i++ {
Expand Down Expand Up @@ -730,10 +746,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
lpmon.InitCensus(nodeType, core.LivepeerVersion)
}

// Start Kafka producer
// Start event publisher
if *cfg.Monitor {
if err := startKafkaProducer(cfg); err != nil {
exit("Error while starting Kafka producer", err)
if err := startEventPublisher(cfg); err != nil {
exit("Error while starting event publisher", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (n *LivepeerNode) UpdateNetworkCapabilities(orchNetworkCapabilities []*comm
n.NetworkCapabilities.Orchestrators = orchNetworkCapabilities

if lpmon.Enabled {
lpmon.SendQueueEventAsync("network_capabilities", orchNetworkCapabilities)
lpmon.QueueEvent("network_capabilities", orchNetworkCapabilities)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
"latency_ms": strconv.FormatInt(o.LocalInfo.Latency.Milliseconds(), 10),
})
}
monitor.SendQueueEventAsync("discovery_results", discoveryResults)
monitor.QueueEvent("discovery_results", discoveryResults)
}
clog.Infof(ctx, "Done fetching orch info orchs=%d/%d responses=%d/%d timedOut=%t",
len(ods), numOrchestrators, nbResp, maxOrchNodes, timedOut)
Expand Down
Loading
Loading