Skip to content

Commit fdb5e72

Browse files
committed
Improved organization of pbench output directory and detection of duplicate stages in nested stages.
1 parent 8fa7a6f commit fdb5e72

File tree

4 files changed

+48
-10
lines changed

4 files changed

+48
-10
lines changed

stage/map.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func ParseStageGraph(startingStage *Stage) (*Stage, Map, error) {
2323
stages := make(Map)
2424
startingStage, err := ParseStage(startingStage, stages)
2525
if err == nil {
26-
err = checkStageLinks(startingStage)
26+
err = checkStageLinks(startingStage, make(map[string]bool), make(map[string]bool))
2727
}
2828
if err != nil {
2929
return nil, nil, err
@@ -35,7 +35,7 @@ func ParseStageGraphFromFile(startingFile string) (*Stage, Map, error) {
3535
stages := make(Map)
3636
startingStage, err := ParseStageFromFile(startingFile, stages)
3737
if err == nil {
38-
err = checkStageLinks(startingStage)
38+
err = checkStageLinks(startingStage, make(map[string]bool), make(map[string]bool))
3939
}
4040
if err != nil {
4141
return nil, nil, err
@@ -137,16 +137,34 @@ func fileNameWithoutPathAndExt(filePath string) string {
137137
return filePath[lastPathSeparator+1 : lastDot]
138138
}
139139

140-
func checkStageLinks(stage *Stage) error {
140+
func checkStageLinks(stage *Stage, visited map[string]bool, visiting map[string]bool) error {
141+
// Detect cycles
142+
if visiting[stage.Id] {
143+
return fmt.Errorf("cycle detected at stage %s", stage.Id)
144+
}
145+
// Skip already fully-visited stages
146+
if visited[stage.Id] {
147+
return nil
148+
}
149+
150+
visiting[stage.Id] = true
151+
defer delete(visiting, stage.Id)
152+
visited[stage.Id] = true
153+
141154
nextStageMap := make(map[string]bool)
142155
for _, nextStage := range stage.NextStages {
156+
if nextStage == nil {
157+
continue
158+
}
143159
if nextStageMap[nextStage.Id] {
144-
return fmt.Errorf("stage %s got duplicated next stages %s", stage.Id, nextStage.Id)
160+
return fmt.Errorf("stage %s has duplicated next stage %s", stage.Id, nextStage.Id)
145161
}
146162
nextStageMap[nextStage.Id] = true
147-
if err := checkStageLinks(nextStage); err != nil {
163+
164+
if err := checkStageLinks(nextStage, visited, visiting); err != nil {
148165
return err
149166
}
150167
}
168+
151169
return nil
152170
}

stage/run_recorder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,5 @@ func (f *FileBasedRunRecorder) RecordQuery(_ context.Context, _ *Stage, result *
4242
}
4343

4444
func (f *FileBasedRunRecorder) RecordRun(_ context.Context, s *Stage, _ []*QueryResult) {
45-
_ = os.WriteFile(filepath.Join(s.States.OutputPath, s.Id+"_summary.csv"), []byte(f.summaryBuilder.String()), 0644)
45+
_ = os.WriteFile(filepath.Join(s.States.OutputPath, s.States.RunName+"_summary.csv"), []byte(f.summaryBuilder.String()), 0644)
4646
}

stage/stage.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
type Stage struct {
3030
// Id is used to uniquely identify a stage. It is usually the file name without its directory path and extension.
3131
Id string `json:"-"`
32+
// OutputPath is used to uniquely store the full path to store output for the current stage.
33+
OutputPath string `json:"-"`
3234
// The values in Catalog, Schema, and SessionParams are inherited by the descendant stages. Please note that if
3335
// they have new values assigned in a stage, those values are NOT applied tn the Presto client until a stage
3436
// creates its own client by setting StartOnNewClient = true.
@@ -129,6 +131,7 @@ func (s *Stage) Run(ctx context.Context) int {
129131
s.States.OutputPath = s.BaseDir
130132
}
131133
s.States.OutputPath = filepath.Join(s.States.OutputPath, s.States.RunName)
134+
s.OutputPath = filepath.Join(s.States.OutputPath, s.Id)
132135
utils.PrepareOutputDirectory(s.States.OutputPath)
133136

134137
// also start to write logs to the output directory from this point on.
@@ -233,6 +236,7 @@ func (s *Stage) run(ctx context.Context) (returnErr error) {
233236
s.setDefaults()
234237
s.prepareClient()
235238
s.propagateStates()
239+
s.createNextStagesOutputDirectories()
236240
preStageErr := s.runShellScripts(ctx, s.PreStageShellScripts)
237241
if preStageErr != nil {
238242
return fmt.Errorf("pre-stage script execution failed: %w", preStageErr)
@@ -524,7 +528,7 @@ func (s *Stage) runQuery(ctx context.Context, query *Query) (result *QueryResult
524528
)
525529
if *s.SaveOutput {
526530
queryOutputFile, err = os.OpenFile(
527-
filepath.Join(s.States.OutputPath, querySourceStr)+".output",
531+
filepath.Join(s.OutputPath, querySourceStr)+".output",
528532
utils.OpenNewFileFlags, 0644)
529533
if err != nil {
530534
return result, err

stage/stage_utils.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func (s *Stage) saveQueryJsonFile(result *QueryResult) {
290290
querySourceStr := s.querySourceString(result)
291291
{
292292
queryJsonFile, err := os.OpenFile(
293-
filepath.Join(s.States.OutputPath, querySourceStr)+".json",
293+
filepath.Join(s.OutputPath, querySourceStr)+".json",
294294
utils.OpenNewFileFlags, 0644)
295295
checkErr(err)
296296
if err == nil {
@@ -302,7 +302,7 @@ func (s *Stage) saveQueryJsonFile(result *QueryResult) {
302302
}
303303
if result.QueryError != nil {
304304
queryErrorFile, err := os.OpenFile(
305-
filepath.Join(s.States.OutputPath, querySourceStr)+".error.json",
305+
filepath.Join(s.OutputPath, querySourceStr)+".error.json",
306306
utils.OpenNewFileFlags, 0644)
307307
checkErr(err)
308308
if err == nil {
@@ -332,7 +332,7 @@ func (s *Stage) saveColumnMetadataFile(qr *presto.QueryResults, result *QueryRes
332332
}
333333
}()
334334
columnMetadataFile, ioErr := os.OpenFile(
335-
filepath.Join(s.States.OutputPath, querySourceStr)+".cols.json",
335+
filepath.Join(s.OutputPath, querySourceStr)+".cols.json",
336336
utils.OpenNewFileFlags, 0644)
337337
if ioErr != nil {
338338
return ioErr
@@ -372,3 +372,19 @@ func (s *Stage) querySourceString(result *QueryResult) (sourceStr string) {
372372
}
373373
return
374374
}
375+
376+
func (s *Stage) createNextStagesOutputDirectories() {
377+
// Create parent stage id file path
378+
utils.PrepareOutputDirectory(s.OutputPath)
379+
for _, nextStage := range s.NextStages {
380+
// Construct the output directory path
381+
stageOutputPath := filepath.Join(s.OutputPath, nextStage.Id)
382+
nextStage.OutputPath = stageOutputPath
383+
384+
// Create the directory
385+
utils.PrepareOutputDirectory(stageOutputPath)
386+
387+
// Recursively call the function on the child stage
388+
nextStage.createNextStagesOutputDirectories()
389+
}
390+
}

0 commit comments

Comments
 (0)