@@ -2,19 +2,68 @@ package kafka
22
33import (
44 "context"
5+ "fmt"
56 "workspace-engine/pkg/db"
67
7- "github.com/spaolacci/murmur3"
8+ "go.opentelemetry.io/otel"
9+ "go.opentelemetry.io/otel/attribute"
10+ "go.opentelemetry.io/otel/trace"
811)
912
13+ var tracer = otel .Tracer ("workspace/kafka/state" )
14+
1015// PartitionForWorkspace computes which partition a workspace ID should be routed to
11- // using Murmur3 hash (Kafka-compatible partitioning )
16+ // using Murmur2 hash (matching Kafka's default partitioner and kafkajs )
1217func PartitionForWorkspace (workspaceID string , numPartitions int32 ) int32 {
13- h := murmur3 . Sum32 ([]byte (workspaceID ))
18+ h := murmur2 ([]byte (workspaceID ))
1419 positive := int32 (h & 0x7fffffff ) // mask sign bit like Kafka
1520 return positive % numPartitions
1621}
1722
23+ // murmur2 implements the Murmur2 hash algorithm used by Kafka's default partitioner
24+ func murmur2 (data []byte ) uint32 {
25+ const (
26+ seed uint32 = 0x9747b28c
27+ m uint32 = 0x5bd1e995
28+ r = 24
29+ )
30+
31+ h := seed ^ uint32 (len (data ))
32+ length := len (data )
33+
34+ for length >= 4 {
35+ k := uint32 (data [0 ]) | uint32 (data [1 ])<< 8 | uint32 (data [2 ])<< 16 | uint32 (data [3 ])<< 24
36+
37+ k *= m
38+ k ^= k >> r
39+ k *= m
40+
41+ h *= m
42+ h ^= k
43+
44+ data = data [4 :]
45+ length -= 4
46+ }
47+
48+ switch length {
49+ case 3 :
50+ h ^= uint32 (data [2 ]) << 16
51+ fallthrough
52+ case 2 :
53+ h ^= uint32 (data [1 ]) << 8
54+ fallthrough
55+ case 1 :
56+ h ^= uint32 (data [0 ])
57+ h *= m
58+ }
59+
60+ h ^= h >> 13
61+ h *= m
62+ h ^= h >> 15
63+
64+ return h
65+ }
66+
1867// FilterWorkspaceIDsForPartition filters the given workspaceIDs and returns only those
1968// that would be routed to the specified partition out of numPartitions.
2069func FilterWorkspaceIDsForPartition (workspaceIDs []string , targetPartition int32 , numPartitions int32 ) []string {
@@ -30,6 +79,13 @@ func FilterWorkspaceIDsForPartition(workspaceIDs []string, targetPartition int32
3079type WorkspaceIDDiscoverer func (ctx context.Context , targetPartition int32 , numPartitions int32 ) ([]string , error )
3180
3281func GetAssignedWorkspaceIDs (ctx context.Context , assignedPartitions []int32 , numPartitions int32 ) (map [int32 ][]string , error ) {
82+ ctx , span := tracer .Start (ctx , "GetAssignedWorkspaceIDs" )
83+ defer span .End ()
84+
85+ span .SetAttributes (attribute .Int ("assigned.partitions.count" , len (assignedPartitions )))
86+ span .SetAttributes (attribute .Int ("num.partitions" , int (numPartitions )))
87+ span .SetAttributes (attribute .String ("assigned.partitions" , fmt .Sprintf ("%+v" , assignedPartitions )))
88+
3389 workspaceIDs , err := db .GetAllWorkspaceIDs (ctx )
3490 if err != nil {
3591 return nil , err
@@ -44,6 +100,12 @@ func GetAssignedWorkspaceIDs(ctx context.Context, assignedPartitions []int32, nu
44100 for _ , workspaceID := range workspaceIDs {
45101 partition := PartitionForWorkspace (workspaceID , numPartitions )
46102 if assignedSet [partition ] {
103+ span .AddEvent ("workspace ID discovered" ,
104+ trace .WithAttributes (
105+ attribute .String ("workspaceID" , workspaceID ),
106+ attribute .Int ("partition" , int (partition )),
107+ ),
108+ )
47109 result [partition ] = append (result [partition ], workspaceID )
48110 }
49111 }
0 commit comments