Skip to content

Commit c737de6

Browse files
authored
cmd: apply pprof flags in Before so all commands can be profiled (#3266)
1 parent 9661763 commit c737de6

File tree

2 files changed

+54
-66
lines changed

2 files changed

+54
-66
lines changed

flow/cmd/worker.go

Lines changed: 4 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,8 @@ package cmd
33
import (
44
"context"
55
"fmt"
6-
"log"
76
"log/slog"
8-
"net/http"
9-
//nolint:gosec
10-
_ "net/http/pprof"
117
"os"
12-
"runtime"
13-
"time"
148

159
"go.temporal.io/sdk/client"
1610
temporalotel "go.temporal.io/sdk/contrib/opentelemetry"
@@ -31,10 +25,8 @@ type WorkerSetupOptions struct {
3125
TemporalNamespace string
3226
TemporalMaxConcurrentActivities int
3327
TemporalMaxConcurrentWorkflowTasks int
34-
EnableProfiling bool
3528
EnableOtelMetrics bool
3629
UseMaintenanceTaskQueue bool
37-
PprofPort int // Port for pprof HTTP server
3830
}
3931

4032
type WorkerSetupResponse struct {
@@ -51,35 +43,7 @@ func (w *WorkerSetupResponse) Close(ctx context.Context) {
5143
}
5244
}
5345

54-
func setupPprof(opts *WorkerSetupOptions) {
55-
// Set default pprof port if not specified
56-
pprofPort := opts.PprofPort
57-
58-
// Enable mutex and block profiling
59-
runtime.SetMutexProfileFraction(5)
60-
runtime.SetBlockProfileRate(5)
61-
62-
// Start HTTP server with pprof endpoints
63-
go func() {
64-
pprofAddr := fmt.Sprintf(":%d", pprofPort)
65-
slog.Info("Starting pprof HTTP server on " + pprofAddr)
66-
server := &http.Server{
67-
Addr: pprofAddr,
68-
ReadTimeout: 1 * time.Minute,
69-
WriteTimeout: 11 * time.Minute,
70-
}
71-
72-
if err := server.ListenAndServe(); err != nil {
73-
log.Fatalf("Failed to start pprof HTTP server: %v", err)
74-
}
75-
}()
76-
}
77-
7846
func WorkerSetup(ctx context.Context, opts *WorkerSetupOptions) (*WorkerSetupResponse, error) {
79-
if opts.EnableProfiling {
80-
setupPprof(opts)
81-
}
82-
8347
conn, err := internal.GetCatalogConnectionPoolFromEnv(ctx)
8448
if err != nil {
8549
return nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
@@ -115,11 +79,10 @@ func WorkerSetup(ctx context.Context, opts *WorkerSetupOptions) (*WorkerSetupRes
11579
}
11680
taskQueue := internal.PeerFlowTaskQueueName(queueId)
11781
slog.Info(
118-
fmt.Sprintf("Creating temporal worker for queue %v: %v workflow workers %v activity workers",
119-
taskQueue,
120-
opts.TemporalMaxConcurrentWorkflowTasks,
121-
opts.TemporalMaxConcurrentActivities,
122-
),
82+
"Creating temporal worker",
83+
slog.String("taskQueue", taskQueue),
84+
slog.Int("workflowConcurrency", opts.TemporalMaxConcurrentWorkflowTasks),
85+
slog.Int("activityConcurrency", opts.TemporalMaxConcurrentActivities),
12386
)
12487
w := worker.New(c, taskQueue, worker.Options{
12588
EnableSessionWorker: true,

flow/main.go

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@ package main
22

33
import (
44
"context"
5+
"fmt"
56
"log"
67
"log/slog"
8+
"net/http"
9+
//nolint:gosec
10+
_ "net/http/pprof"
711
"os"
812
"os/signal"
913
"runtime"
1014
"syscall"
15+
"time"
1116

1217
"github.com/urfave/cli/v3"
1318
"go.temporal.io/sdk/worker"
@@ -42,7 +47,7 @@ func main() {
4247
Sources: cli.EnvVars("ENABLE_OTEL_METRICS"),
4348
}
4449

45-
pprofPortFlag := &cli.IntFlag{
50+
pprofPortFlag := &cli.Uint16Flag{
4651
Name: "pprof-port",
4752
Value: 6060,
4853
Usage: "Port for pprof HTTP server",
@@ -133,20 +138,45 @@ func main() {
133138

134139
app := &cli.Command{
135140
Name: "PeerDB Flows CLI",
141+
Before: func(ctx context.Context, clicmd *cli.Command) (context.Context, error) {
142+
if clicmd.Bool(profilingFlag.Name) {
143+
// Enable mutex and block profiling
144+
runtime.SetMutexProfileFraction(5)
145+
runtime.SetBlockProfileRate(5)
146+
pprofPort := clicmd.Uint16(pprofPortFlag.Name)
147+
pprofAddr := fmt.Sprintf(":%d", pprofPort)
148+
149+
// Start HTTP server with pprof endpoints
150+
go func() {
151+
slog.Info("Starting pprof HTTP server", slog.String("address", pprofAddr))
152+
153+
server := &http.Server{
154+
Addr: pprofAddr,
155+
ReadTimeout: 1 * time.Minute,
156+
WriteTimeout: 11 * time.Minute,
157+
}
158+
if err := server.ListenAndServe(); err != nil {
159+
log.Fatalf("Failed to start pprof HTTP server: %v", err)
160+
}
161+
}()
162+
}
163+
return nil, nil
164+
},
165+
Flags: []cli.Flag{
166+
profilingFlag,
167+
pprofPortFlag,
168+
},
136169
Commands: []*cli.Command{
137170
{
138171
Name: "worker",
139172
Action: func(ctx context.Context, clicmd *cli.Command) error {
140-
temporalHostPort := clicmd.String("temporal-host-port")
141173
res, err := cmd.WorkerSetup(ctx, &cmd.WorkerSetupOptions{
142-
TemporalHostPort: temporalHostPort,
143-
EnableProfiling: clicmd.Bool("enable-profiling"),
144-
EnableOtelMetrics: clicmd.Bool("enable-otel-metrics"),
145-
TemporalNamespace: clicmd.String("temporal-namespace"),
146-
TemporalMaxConcurrentActivities: clicmd.Int("temporal-max-concurrent-activities"),
147-
TemporalMaxConcurrentWorkflowTasks: clicmd.Int("temporal-max-concurrent-workflow-tasks"),
174+
TemporalHostPort: clicmd.String(temporalHostPortFlag.Name),
175+
TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name),
176+
TemporalMaxConcurrentActivities: clicmd.Int(temporalMaxConcurrentActivitiesFlag.Name),
177+
TemporalMaxConcurrentWorkflowTasks: clicmd.Int(temporalMaxConcurrentWorkflowTasksFlag.Name),
148178
UseMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name),
149-
PprofPort: clicmd.Int(pprofPortFlag.Name),
179+
EnableOtelMetrics: clicmd.Bool(otelMetricsFlag.Name),
150180
})
151181
if err != nil {
152182
return err
@@ -156,23 +186,20 @@ func main() {
156186
},
157187
Flags: []cli.Flag{
158188
temporalHostPortFlag,
159-
profilingFlag,
160-
otelMetricsFlag,
161-
pprofPortFlag,
162189
temporalNamespaceFlag,
163190
temporalMaxConcurrentActivitiesFlag,
164191
temporalMaxConcurrentWorkflowTasksFlag,
165192
useMaintenanceTaskQueueFlag,
193+
otelMetricsFlag,
166194
},
167195
},
168196
{
169197
Name: "snapshot-worker",
170198
Action: func(ctx context.Context, clicmd *cli.Command) error {
171-
temporalHostPort := clicmd.String("temporal-host-port")
172199
res, err := cmd.SnapshotWorkerMain(ctx, &cmd.SnapshotWorkerOptions{
173-
EnableOtelMetrics: clicmd.Bool("enable-otel-metrics"),
174-
TemporalHostPort: temporalHostPort,
175-
TemporalNamespace: clicmd.String("temporal-namespace"),
200+
TemporalHostPort: clicmd.String(temporalHostPortFlag.Name),
201+
TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name),
202+
EnableOtelMetrics: clicmd.Bool(otelMetricsFlag.Name),
176203
})
177204
if err != nil {
178205
return err
@@ -181,21 +208,21 @@ func main() {
181208
return res.Worker.Run(worker.InterruptCh())
182209
},
183210
Flags: []cli.Flag{
184-
otelMetricsFlag,
185211
temporalHostPortFlag,
186212
temporalNamespaceFlag,
213+
otelMetricsFlag,
187214
},
188215
},
189216
{
190217
Name: "api",
191218
Flags: []cli.Flag{
192-
&cli.UintFlag{
219+
&cli.Uint16Flag{
193220
Name: "port",
194221
Aliases: []string{"p"},
195222
Value: 8110,
196223
},
197224
// gateway port is the port that the grpc-gateway listens on
198-
&cli.UintFlag{
225+
&cli.Uint16Flag{
199226
Name: "gateway-port",
200227
Value: 8111,
201228
},
@@ -204,13 +231,11 @@ func main() {
204231
otelMetricsFlag,
205232
},
206233
Action: func(ctx context.Context, clicmd *cli.Command) error {
207-
temporalHostPort := clicmd.String("temporal-host-port")
208-
209234
return cmd.APIMain(ctx, &cmd.APIServerParams{
210-
Port: uint16(clicmd.Uint("port")),
211-
TemporalHostPort: temporalHostPort,
212-
GatewayPort: uint16(clicmd.Uint("gateway-port")),
213-
TemporalNamespace: clicmd.String("temporal-namespace"),
235+
Port: clicmd.Uint16("port"),
236+
GatewayPort: clicmd.Uint16("gateway-port"),
237+
TemporalHostPort: clicmd.String(temporalHostPortFlag.Name),
238+
TemporalNamespace: clicmd.String(temporalNamespaceFlag.Name),
214239
EnableOtelMetrics: clicmd.Bool(otelMetricsFlag.Name),
215240
})
216241
},

0 commit comments

Comments
 (0)