@@ -6,8 +6,13 @@ import (
66
77 "github.com/charmbracelet/log"
88 "github.com/confluentinc/confluent-kafka-go/v2/kafka"
9+ "go.opentelemetry.io/otel"
10+ "go.opentelemetry.io/otel/attribute"
11+ "go.opentelemetry.io/otel/trace"
912)
1013
14+ var tracer = otel .Tracer ("kafka/consumer" )
15+
1116// createConsumer initializes a new Kafka consumer with the configured settings
1217func createConsumer () (* kafka.Consumer , error ) {
1318 log .Info ("Connecting to Kafka" , "brokers" , Brokers )
@@ -55,6 +60,9 @@ func getEarliestOffset(snapshots map[string]*db.WorkspaceSnapshot) int64 {
5560}
5661
5762func setOffsets (ctx context.Context , consumer * kafka.Consumer , partitionWorkspaceMap map [int32 ][]string ) error {
63+ ctx , span := tracer .Start (ctx , "setOffsets" )
64+ defer span .End ()
65+
5866 for partition , workspaceIDs := range partitionWorkspaceMap {
5967 snapshots , err := db .GetLatestWorkspaceSnapshots (ctx , workspaceIDs )
6068 if err != nil {
@@ -67,6 +75,14 @@ func setOffsets(ctx context.Context, consumer *kafka.Consumer, partitionWorkspac
6775 if effectiveOffset > 0 {
6876 effectiveOffset = effectiveOffset + 1
6977 }
78+
79+ span .AddEvent (
80+ "seeking to earliest offset for partition" ,
81+ trace .WithAttributes (
82+ attribute .Int ("partition" , int (partition )),
83+ attribute .Int ("effective_offset" , int (effectiveOffset )),
84+ ),
85+ )
7086 if err := consumer .Seek (kafka.TopicPartition {
7187 Topic : & Topic ,
7288 Partition : partition ,
0 commit comments