From fdb5e726051fde467bd494996e9e61e15318219c Mon Sep 17 00:00:00 2001 From: Minhan Cao Date: Wed, 20 Aug 2025 16:57:17 -0700 Subject: [PATCH] Improved organization of pbench output directory and detection of duplicate stages in nested stages. --- stage/map.go | 28 +++++++++++++++++++++++----- stage/run_recorder.go | 2 +- stage/stage.go | 6 +++++- stage/stage_utils.go | 22 +++++++++++++++++++--- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/stage/map.go b/stage/map.go index c08e99ac..e1ac4644 100644 --- a/stage/map.go +++ b/stage/map.go @@ -23,7 +23,7 @@ func ParseStageGraph(startingStage *Stage) (*Stage, Map, error) { stages := make(Map) startingStage, err := ParseStage(startingStage, stages) if err == nil { - err = checkStageLinks(startingStage) + err = checkStageLinks(startingStage, make(map[string]bool), make(map[string]bool)) } if err != nil { return nil, nil, err @@ -35,7 +35,7 @@ func ParseStageGraphFromFile(startingFile string) (*Stage, Map, error) { stages := make(Map) startingStage, err := ParseStageFromFile(startingFile, stages) if err == nil { - err = checkStageLinks(startingStage) + err = checkStageLinks(startingStage, make(map[string]bool), make(map[string]bool)) } if err != nil { return nil, nil, err @@ -137,16 +137,34 @@ func fileNameWithoutPathAndExt(filePath string) string { return filePath[lastPathSeparator+1 : lastDot] } -func checkStageLinks(stage *Stage) error { +func checkStageLinks(stage *Stage, visited map[string]bool, visiting map[string]bool) error { + // Detect cycles + if visiting[stage.Id] { + return fmt.Errorf("cycle detected at stage %s", stage.Id) + } + // Skip already fully-visited stages + if visited[stage.Id] { + return nil + } + + visiting[stage.Id] = true + defer delete(visiting, stage.Id) + visited[stage.Id] = true + nextStageMap := make(map[string]bool) for _, nextStage := range stage.NextStages { + if nextStage == nil { + continue + } if nextStageMap[nextStage.Id] { - return fmt.Errorf("stage %s got duplicated next stages %s", stage.Id, nextStage.Id) + return fmt.Errorf("stage %s has duplicated next stage %s", stage.Id, nextStage.Id) } nextStageMap[nextStage.Id] = true - if err := checkStageLinks(nextStage); err != nil { + + if err := checkStageLinks(nextStage, visited, visiting); err != nil { return err } } + return nil } diff --git a/stage/run_recorder.go b/stage/run_recorder.go index d2a2f10f..c6b1e64d 100644 --- a/stage/run_recorder.go +++ b/stage/run_recorder.go @@ -42,5 +42,5 @@ func (f *FileBasedRunRecorder) RecordQuery(_ context.Context, _ *Stage, result * } func (f *FileBasedRunRecorder) RecordRun(_ context.Context, s *Stage, _ []*QueryResult) { - _ = os.WriteFile(filepath.Join(s.States.OutputPath, s.Id+"_summary.csv"), []byte(f.summaryBuilder.String()), 0644) + _ = os.WriteFile(filepath.Join(s.States.OutputPath, s.States.RunName+"_summary.csv"), []byte(f.summaryBuilder.String()), 0644) } diff --git a/stage/stage.go b/stage/stage.go index f0ec5192..1587ebd2 100644 --- a/stage/stage.go +++ b/stage/stage.go @@ -29,6 +29,8 @@ import ( type Stage struct { // Id is used to uniquely identify a stage. It is usually the file name without its directory path and extension. Id string `json:"-"` + // OutputPath is used to uniquely store the full path to store output for the current stage. + OutputPath string `json:"-"` // The values in Catalog, Schema, and SessionParams are inherited by the descendant stages. Please note that if // they have new values assigned in a stage, those values are NOT applied tn the Presto client until a stage // creates its own client by setting StartOnNewClient = true. @@ -129,6 +131,7 @@ func (s *Stage) Run(ctx context.Context) int { s.States.OutputPath = s.BaseDir } s.States.OutputPath = filepath.Join(s.States.OutputPath, s.States.RunName) + s.OutputPath = filepath.Join(s.States.OutputPath, s.Id) utils.PrepareOutputDirectory(s.States.OutputPath) // also start to write logs to the output directory from this point on. @@ -233,6 +236,7 @@ func (s *Stage) run(ctx context.Context) (returnErr error) { s.setDefaults() s.prepareClient() s.propagateStates() + s.createNextStagesOutputDirectories() preStageErr := s.runShellScripts(ctx, s.PreStageShellScripts) if preStageErr != nil { return fmt.Errorf("pre-stage script execution failed: %w", preStageErr) @@ -524,7 +528,7 @@ func (s *Stage) runQuery(ctx context.Context, query *Query) (result *QueryResult ) if *s.SaveOutput { queryOutputFile, err = os.OpenFile( - filepath.Join(s.States.OutputPath, querySourceStr)+".output", + filepath.Join(s.OutputPath, querySourceStr)+".output", utils.OpenNewFileFlags, 0644) if err != nil { return result, err diff --git a/stage/stage_utils.go b/stage/stage_utils.go index de4782bb..d60837fb 100644 --- a/stage/stage_utils.go +++ b/stage/stage_utils.go @@ -290,7 +290,7 @@ func (s *Stage) saveQueryJsonFile(result *QueryResult) { querySourceStr := s.querySourceString(result) { queryJsonFile, err := os.OpenFile( - filepath.Join(s.States.OutputPath, querySourceStr)+".json", + filepath.Join(s.OutputPath, querySourceStr)+".json", utils.OpenNewFileFlags, 0644) checkErr(err) if err == nil { @@ -302,7 +302,7 @@ func (s *Stage) saveQueryJsonFile(result *QueryResult) { } if result.QueryError != nil { queryErrorFile, err := os.OpenFile( - filepath.Join(s.States.OutputPath, querySourceStr)+".error.json", + filepath.Join(s.OutputPath, querySourceStr)+".error.json", utils.OpenNewFileFlags, 0644) checkErr(err) if err == nil { @@ -332,7 +332,7 @@ func (s *Stage) saveColumnMetadataFile(qr *presto.QueryResults, result *QueryRes } }() columnMetadataFile, ioErr := os.OpenFile( - filepath.Join(s.States.OutputPath, querySourceStr)+".cols.json", + filepath.Join(s.OutputPath, querySourceStr)+".cols.json", utils.OpenNewFileFlags, 0644) if ioErr != nil { return ioErr @@ -372,3 +372,19 @@ func (s *Stage) querySourceString(result *QueryResult) (sourceStr string) { } return } + +func (s *Stage) createNextStagesOutputDirectories() { + // Create parent stage id file path + utils.PrepareOutputDirectory(s.OutputPath) + for _, nextStage := range s.NextStages { + // Construct the output directory path + stageOutputPath := filepath.Join(s.OutputPath, nextStage.Id) + nextStage.OutputPath = stageOutputPath + + // Create the directory + utils.PrepareOutputDirectory(stageOutputPath) + + // Recursively call the function on the child stage + nextStage.createNextStagesOutputDirectories() + } +}