Skip to content

Commit 3851d22

Browse files
committed
add change sets
1 parent 9d37028 commit 3851d22

File tree

21 files changed

+2114
-723
lines changed

21 files changed

+2114
-723
lines changed

apps/workspace-engine/pkg/changeset/changeset.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type ChangeSet[T any] struct {
2424
IsInitialLoad bool
2525
Changes []Change[T]
2626
mutex sync.Mutex
27+
keyFunc func(T) string
28+
changeMap map[string]Change[T] // for deduplication when keyFunc is provided
2729
}
2830

2931
func NewChangeSet[T any]() *ChangeSet[T] {
@@ -32,6 +34,16 @@ func NewChangeSet[T any]() *ChangeSet[T] {
3234
}
3335
}
3436

37+
// NewChangeSetWithDedup creates a changeset that deduplicates entries by key
38+
// The keyFunc should return a unique identifier for each entity
39+
func NewChangeSetWithDedup[T any](keyFunc func(T) string) *ChangeSet[T] {
40+
return &ChangeSet[T]{
41+
Changes: make([]Change[T], 0),
42+
keyFunc: keyFunc,
43+
changeMap: make(map[string]Change[T]),
44+
}
45+
}
46+
3547
func (cs *ChangeSet[T]) Lock() {
3648
cs.mutex.Lock()
3749
}
@@ -44,13 +56,51 @@ func (cs *ChangeSet[T]) Record(changeType ChangeType, entity T) {
4456
cs.mutex.Lock()
4557
defer cs.mutex.Unlock()
4658

47-
cs.Changes = append(cs.Changes, Change[T]{
48-
Type: changeType,
49-
Entity: entity,
50-
Timestamp: time.Now(),
51-
})
59+
change := Change[T]{
60+
Type: changeType,
61+
Entity: entity,
62+
Timestamp: time.Now(),
63+
}
64+
65+
// If deduplication is enabled, use the map
66+
if cs.keyFunc != nil {
67+
key := cs.keyFunc(entity)
68+
cs.changeMap[key] = change
69+
} else {
70+
// No deduplication, just append
71+
cs.Changes = append(cs.Changes, change)
72+
}
73+
}
74+
75+
// Finalize converts the internal map to the Changes slice (for dedup mode)
76+
// Call this before accessing Changes when using deduplication
77+
func (cs *ChangeSet[T]) Finalize() {
78+
cs.mutex.Lock()
79+
defer cs.mutex.Unlock()
80+
81+
if cs.keyFunc != nil && len(cs.changeMap) > 0 {
82+
cs.Changes = make([]Change[T], 0, len(cs.changeMap))
83+
for _, change := range cs.changeMap {
84+
cs.Changes = append(cs.Changes, change)
85+
}
86+
}
5287
}
5388

5489
func (cs *ChangeSet[T]) Process() *Processor[T] {
90+
// Auto-finalize if using deduplication
91+
if cs.keyFunc != nil {
92+
cs.Finalize()
93+
}
5594
return NewProcessor(cs)
5695
}
96+
97+
// Count returns the number of changes (handles dedup mode automatically)
98+
func (cs *ChangeSet[T]) Count() int {
99+
cs.mutex.Lock()
100+
defer cs.mutex.Unlock()
101+
102+
if cs.keyFunc != nil {
103+
return len(cs.changeMap)
104+
}
105+
return len(cs.Changes)
106+
}

apps/workspace-engine/pkg/changeset/changeset_test.go

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,285 @@ func TestChangeSet_OrderingPreserved(t *testing.T) {
144144
}
145145
}
146146
}
147+
148+
// Test that deduplication uses the latest change type
149+
func TestChangeSetWithDedup_LatestChangeTypeWins(t *testing.T) {
150+
type testEntity struct {
151+
ID string
152+
Name string
153+
}
154+
155+
cs := NewChangeSetWithDedup(func(e testEntity) string {
156+
return e.ID
157+
})
158+
159+
entity := testEntity{ID: "resource-1", Name: "Test Resource"}
160+
161+
// Record multiple changes for the same entity
162+
cs.Record(ChangeTypeCreate, entity)
163+
cs.Record(ChangeTypeUpdate, entity)
164+
cs.Record(ChangeTypeTaint, entity)
165+
cs.Record(ChangeTypeDelete, entity)
166+
167+
// Finalize to convert map to slice
168+
cs.Finalize()
169+
170+
// Verify only one change exists
171+
if len(cs.Changes) != 1 {
172+
t.Fatalf("expected 1 change after deduplication, got %d", len(cs.Changes))
173+
}
174+
175+
// Verify it's the last change type (Delete)
176+
change := cs.Changes[0]
177+
if change.Type != ChangeTypeDelete {
178+
t.Errorf("expected ChangeType %v (last recorded), got %v", ChangeTypeDelete, change.Type)
179+
}
180+
if change.Entity.ID != "resource-1" {
181+
t.Errorf("expected entity ID 'resource-1', got %v", change.Entity.ID)
182+
}
183+
}
184+
185+
// Test that deduplication with different entities keeps all
186+
func TestChangeSetWithDedup_DifferentEntities(t *testing.T) {
187+
type testEntity struct {
188+
ID string
189+
Name string
190+
}
191+
192+
cs := NewChangeSetWithDedup(func(e testEntity) string {
193+
return e.ID
194+
})
195+
196+
// Record changes for different entities
197+
cs.Record(ChangeTypeCreate, testEntity{ID: "resource-1", Name: "Resource 1"})
198+
cs.Record(ChangeTypeCreate, testEntity{ID: "resource-2", Name: "Resource 2"})
199+
cs.Record(ChangeTypeCreate, testEntity{ID: "resource-3", Name: "Resource 3"})
200+
201+
cs.Finalize()
202+
203+
// Verify all three changes exist
204+
if len(cs.Changes) != 3 {
205+
t.Fatalf("expected 3 changes, got %d", len(cs.Changes))
206+
}
207+
208+
// Verify all are create types
209+
for i, change := range cs.Changes {
210+
if change.Type != ChangeTypeCreate {
211+
t.Errorf("change %d: expected ChangeType %v, got %v", i, ChangeTypeCreate, change.Type)
212+
}
213+
}
214+
}
215+
216+
// Test that deduplication with mixed entities works correctly
217+
func TestChangeSetWithDedup_MixedEntities(t *testing.T) {
218+
type testEntity struct {
219+
ID string
220+
Name string
221+
}
222+
223+
cs := NewChangeSetWithDedup(func(e testEntity) string {
224+
return e.ID
225+
})
226+
227+
// Record changes for multiple entities with some duplicates
228+
cs.Record(ChangeTypeCreate, testEntity{ID: "resource-1", Name: "Resource 1"})
229+
cs.Record(ChangeTypeCreate, testEntity{ID: "resource-2", Name: "Resource 2"})
230+
cs.Record(ChangeTypeUpdate, testEntity{ID: "resource-1", Name: "Resource 1 Updated"}) // Duplicate
231+
cs.Record(ChangeTypeCreate, testEntity{ID: "resource-3", Name: "Resource 3"})
232+
cs.Record(ChangeTypeTaint, testEntity{ID: "resource-2", Name: "Resource 2"}) // Duplicate
233+
cs.Record(ChangeTypeDelete, testEntity{ID: "resource-1", Name: "Resource 1"}) // Duplicate again
234+
235+
cs.Finalize()
236+
237+
// Verify only 3 changes exist (one per unique ID)
238+
if len(cs.Changes) != 3 {
239+
t.Fatalf("expected 3 changes after deduplication, got %d", len(cs.Changes))
240+
}
241+
242+
// Build a map to verify the final state of each entity
243+
finalStates := make(map[string]ChangeType)
244+
for _, change := range cs.Changes {
245+
finalStates[change.Entity.ID] = change.Type
246+
}
247+
248+
// Verify the latest change type for each entity
249+
expectedStates := map[string]ChangeType{
250+
"resource-1": ChangeTypeDelete, // Last was Delete
251+
"resource-2": ChangeTypeTaint, // Last was Taint
252+
"resource-3": ChangeTypeCreate, // Only had Create
253+
}
254+
255+
for id, expectedType := range expectedStates {
256+
actualType, exists := finalStates[id]
257+
if !exists {
258+
t.Errorf("expected entity %s to exist in final changes", id)
259+
continue
260+
}
261+
if actualType != expectedType {
262+
t.Errorf("entity %s: expected final ChangeType %v, got %v", id, expectedType, actualType)
263+
}
264+
}
265+
}
266+
267+
// Test Count() method with deduplication
268+
func TestChangeSetWithDedup_Count(t *testing.T) {
269+
type testEntity struct {
270+
ID string
271+
}
272+
273+
cs := NewChangeSetWithDedup(func(e testEntity) string {
274+
return e.ID
275+
})
276+
277+
// Record multiple changes for the same entities
278+
cs.Record(ChangeTypeCreate, testEntity{ID: "1"})
279+
cs.Record(ChangeTypeCreate, testEntity{ID: "2"})
280+
cs.Record(ChangeTypeUpdate, testEntity{ID: "1"}) // Duplicate
281+
cs.Record(ChangeTypeCreate, testEntity{ID: "3"})
282+
cs.Record(ChangeTypeTaint, testEntity{ID: "2"}) // Duplicate
283+
284+
// Count should reflect deduplicated count (3 unique entities)
285+
count := cs.Count()
286+
if count != 3 {
287+
t.Errorf("expected Count() to return 3, got %d", count)
288+
}
289+
}
290+
291+
// Test Process() auto-finalizes deduplication
292+
func TestChangeSetWithDedup_ProcessAutoFinalizes(t *testing.T) {
293+
type testEntity struct {
294+
ID string
295+
Name string
296+
}
297+
298+
cs := NewChangeSetWithDedup(func(e testEntity) string {
299+
return e.ID
300+
})
301+
302+
// Record multiple changes for the same entity
303+
cs.Record(ChangeTypeCreate, testEntity{ID: "resource-1", Name: "Resource 1"})
304+
cs.Record(ChangeTypeUpdate, testEntity{ID: "resource-1", Name: "Resource 1 Updated"})
305+
cs.Record(ChangeTypeDelete, testEntity{ID: "resource-1", Name: "Resource 1"})
306+
307+
// Process should auto-finalize
308+
processor := cs.Process()
309+
if processor == nil {
310+
t.Fatal("expected non-nil processor")
311+
}
312+
313+
// Verify Changes slice was populated
314+
if len(cs.Changes) != 1 {
315+
t.Errorf("expected 1 change after Process(), got %d", len(cs.Changes))
316+
}
317+
318+
// Verify it's the last change
319+
if cs.Changes[0].Type != ChangeTypeDelete {
320+
t.Errorf("expected ChangeType %v, got %v", ChangeTypeDelete, cs.Changes[0].Type)
321+
}
322+
}
323+
324+
// Test concurrent recording with deduplication
325+
func TestChangeSetWithDedup_ConcurrentRecord(t *testing.T) {
326+
type testEntity struct {
327+
ID string
328+
Value int
329+
}
330+
331+
cs := NewChangeSetWithDedup(func(e testEntity) string {
332+
return e.ID
333+
})
334+
335+
numGoroutines := 50
336+
changesPerGoroutine := 20
337+
numUniqueEntities := 10
338+
339+
var wg sync.WaitGroup
340+
wg.Add(numGoroutines)
341+
342+
// Launch multiple goroutines that record changes
343+
// Each goroutine cycles through the same set of entity IDs
344+
for i := 0; i < numGoroutines; i++ {
345+
go func(goroutineID int) {
346+
defer wg.Done()
347+
for j := 0; j < changesPerGoroutine; j++ {
348+
entityID := j % numUniqueEntities // Cycle through entity IDs
349+
entity := testEntity{
350+
ID: string(rune('A' + entityID)),
351+
Value: goroutineID*1000 + j,
352+
}
353+
changeType := []ChangeType{
354+
ChangeTypeCreate,
355+
ChangeTypeUpdate,
356+
ChangeTypeTaint,
357+
ChangeTypeDelete,
358+
}[j%4]
359+
cs.Record(changeType, entity)
360+
}
361+
}(i)
362+
}
363+
364+
wg.Wait()
365+
366+
// Finalize to get final state
367+
cs.Finalize()
368+
369+
// Should have exactly numUniqueEntities entries (deduplicated)
370+
if len(cs.Changes) != numUniqueEntities {
371+
t.Errorf("expected %d unique changes after deduplication, got %d", numUniqueEntities, len(cs.Changes))
372+
}
373+
}
374+
375+
// Test that regular changeset (without dedup) still works
376+
func TestChangeSet_NoDedup(t *testing.T) {
377+
type testEntity struct {
378+
ID string
379+
}
380+
381+
cs := NewChangeSet[testEntity]()
382+
383+
// Record multiple changes for the same entity
384+
cs.Record(ChangeTypeCreate, testEntity{ID: "resource-1"})
385+
cs.Record(ChangeTypeUpdate, testEntity{ID: "resource-1"})
386+
cs.Record(ChangeTypeDelete, testEntity{ID: "resource-1"})
387+
388+
// All changes should be present (no deduplication)
389+
if len(cs.Changes) != 3 {
390+
t.Errorf("expected 3 changes without deduplication, got %d", len(cs.Changes))
391+
}
392+
}
393+
394+
// Test timestamp is updated on each duplicate
395+
func TestChangeSetWithDedup_TimestampUpdated(t *testing.T) {
396+
type testEntity struct {
397+
ID string
398+
}
399+
400+
cs := NewChangeSetWithDedup(func(e testEntity) string {
401+
return e.ID
402+
})
403+
404+
entity := testEntity{ID: "resource-1"}
405+
406+
// Record first change
407+
cs.Record(ChangeTypeCreate, entity)
408+
time.Sleep(10 * time.Millisecond)
409+
410+
// Record second change for same entity
411+
cs.Record(ChangeTypeUpdate, entity)
412+
413+
cs.Finalize()
414+
415+
// Verify the timestamp is from the second record
416+
if len(cs.Changes) != 1 {
417+
t.Fatalf("expected 1 change, got %d", len(cs.Changes))
418+
}
419+
420+
change := cs.Changes[0]
421+
now := time.Now()
422+
timeDiff := now.Sub(change.Timestamp)
423+
424+
// Timestamp should be very recent (within 1 second)
425+
if timeDiff > time.Second {
426+
t.Errorf("timestamp seems to be from first record, not updated on second record")
427+
}
428+
}

apps/workspace-engine/pkg/events/handler/deployment/deployment.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ func HandleDeploymentCreated(
2222
if err := ws.Deployments().Upsert(ctx, deployment); err != nil {
2323
return err
2424
}
25-
ws.ReleaseManager().TaintDeploymentsReleaseTargets(deployment.Id)
2625

2726
return nil
2827
}
@@ -40,7 +39,6 @@ func HandleDeploymentUpdated(
4039
if err := ws.Deployments().Upsert(ctx, deployment); err != nil {
4140
return err
4241
}
43-
ws.ReleaseManager().TaintDeploymentsReleaseTargets(deployment.Id)
4442

4543
return nil
4644
}
@@ -56,7 +54,6 @@ func HandleDeploymentDeleted(
5654
}
5755

5856
ws.Deployments().Remove(ctx, deployment.Id)
59-
ws.ReleaseManager().TaintDeploymentsReleaseTargets(deployment.Id)
6057

6158
return nil
6259
}

0 commit comments

Comments
 (0)