Skip to content

Commit e2c97ed

Browse files
authored
Merge pull request #9141 from starius/goroutines2
fn: add goroutine manager
2 parents 35759de + 9eb405e commit e2c97ed

File tree

2 files changed

+198
-0
lines changed

2 files changed

+198
-0
lines changed

fn/goroutine_manager.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package fn
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
)
8+
9+
// ErrStopping is returned when trying to add a new goroutine while stopping.
10+
var ErrStopping = errors.New("can not add goroutine, stopping")
11+
12+
// GoroutineManager is used to launch goroutines until context expires or the
13+
// manager is stopped. The Stop method blocks until all started goroutines stop.
14+
type GoroutineManager struct {
15+
wg sync.WaitGroup
16+
mu sync.Mutex
17+
ctx context.Context
18+
cancel func()
19+
}
20+
21+
// NewGoroutineManager constructs and returns a new instance of
22+
// GoroutineManager.
23+
func NewGoroutineManager(ctx context.Context) *GoroutineManager {
24+
ctx, cancel := context.WithCancel(ctx)
25+
26+
return &GoroutineManager{
27+
ctx: ctx,
28+
cancel: cancel,
29+
}
30+
}
31+
32+
// Go starts a new goroutine if the manager is not stopping.
33+
func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
34+
// Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race
35+
// condition, since it is not clear should Wait() block or not. This
36+
// kind of race condition is detected by Go runtime and results in a
37+
// crash if running with `-race`. To prevent this, whole Go method is
38+
// protected with a mutex. The call to wg.Wait() inside Stop() can still
39+
// run in parallel with Go, but in that case g.ctx is in expired state,
40+
// because cancel() was called in Stop, so Go returns before wg.Add(1)
41+
// call.
42+
g.mu.Lock()
43+
defer g.mu.Unlock()
44+
45+
if g.ctx.Err() != nil {
46+
return ErrStopping
47+
}
48+
49+
g.wg.Add(1)
50+
go func() {
51+
defer g.wg.Done()
52+
f(g.ctx)
53+
}()
54+
55+
return nil
56+
}
57+
58+
// Stop prevents new goroutines from being added and waits for all running
59+
// goroutines to finish.
60+
func (g *GoroutineManager) Stop() {
61+
g.mu.Lock()
62+
g.cancel()
63+
g.mu.Unlock()
64+
65+
// Wait for all goroutines to finish. Note that this wg.Wait() call is
66+
// safe, since it can't run in parallel with wg.Add(1) call in Go, since
67+
// we just cancelled the context and even if Go call starts running here
68+
// after acquiring the mutex, it would see that the context has expired
69+
// and return ErrStopping instead of calling wg.Add(1).
70+
g.wg.Wait()
71+
}
72+
73+
// Done returns a channel which is closed when either the context passed to
74+
// NewGoroutineManager expires or when Stop is called.
75+
func (g *GoroutineManager) Done() <-chan struct{} {
76+
return g.ctx.Done()
77+
}

fn/goroutine_manager_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package fn
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
// TestGoroutineManager tests that the GoroutineManager starts goroutines until
12+
// ctx expires. It also makes sure it fails to start new goroutines after the
13+
// context expired and the GoroutineManager is in the process of waiting for
14+
// already started goroutines in the Stop method.
15+
func TestGoroutineManager(t *testing.T) {
16+
t.Parallel()
17+
18+
m := NewGoroutineManager(context.Background())
19+
20+
taskChan := make(chan struct{})
21+
22+
require.NoError(t, m.Go(func(ctx context.Context) {
23+
<-taskChan
24+
}))
25+
26+
t1 := time.Now()
27+
28+
// Close taskChan in 1s, causing the goroutine to stop.
29+
time.AfterFunc(time.Second, func() {
30+
close(taskChan)
31+
})
32+
33+
m.Stop()
34+
stopDelay := time.Since(t1)
35+
36+
// Make sure Stop was waiting for the goroutine to stop.
37+
require.Greater(t, stopDelay, time.Second)
38+
39+
// Make sure new goroutines do not start after Stop.
40+
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
41+
42+
// When Stop() is called, the internal context expires and m.Done() is
43+
// closed. Test this.
44+
select {
45+
case <-m.Done():
46+
default:
47+
t.Errorf("Done() channel must be closed at this point")
48+
}
49+
}
50+
51+
// TestGoroutineManagerContextExpires tests the effect of context expiry.
52+
func TestGoroutineManagerContextExpires(t *testing.T) {
53+
t.Parallel()
54+
55+
ctx, cancel := context.WithCancel(context.Background())
56+
57+
m := NewGoroutineManager(ctx)
58+
59+
require.NoError(t, m.Go(func(ctx context.Context) {
60+
<-ctx.Done()
61+
}))
62+
63+
// The Done channel of the manager should not be closed, so the
64+
// following call must block.
65+
select {
66+
case <-m.Done():
67+
t.Errorf("Done() channel must not be closed at this point")
68+
default:
69+
}
70+
71+
cancel()
72+
73+
// The Done channel of the manager should be closed, so the following
74+
// call must not block.
75+
select {
76+
case <-m.Done():
77+
default:
78+
t.Errorf("Done() channel must be closed at this point")
79+
}
80+
81+
// Make sure new goroutines do not start after context expiry.
82+
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
83+
84+
// Stop will wait for all goroutines to stop.
85+
m.Stop()
86+
}
87+
88+
// TestGoroutineManagerStress starts many goroutines while calling Stop. It
89+
// is needed to make sure the GoroutineManager does not crash if this happen.
90+
// If the mutex was not used, it would crash because of a race condition between
91+
// wg.Add(1) and wg.Wait().
92+
func TestGoroutineManagerStress(t *testing.T) {
93+
t.Parallel()
94+
95+
m := NewGoroutineManager(context.Background())
96+
97+
stopChan := make(chan struct{})
98+
99+
time.AfterFunc(1*time.Millisecond, func() {
100+
m.Stop()
101+
close(stopChan)
102+
})
103+
104+
// Starts 100 goroutines sequentially. Sequential order is needed to
105+
// keep wg.counter low (0 or 1) to increase probability of race
106+
// condition to be caught if it exists. If mutex is removed in the
107+
// implementation, this test crashes under `-race`.
108+
for i := 0; i < 100; i++ {
109+
taskChan := make(chan struct{})
110+
err := m.Go(func(ctx context.Context) {
111+
close(taskChan)
112+
})
113+
// If goroutine was started, wait for its completion.
114+
if err == nil {
115+
<-taskChan
116+
}
117+
}
118+
119+
// Wait for Stop to complete.
120+
<-stopChan
121+
}

0 commit comments

Comments
 (0)