Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion cmd/run/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,15 @@ func Run(_ *cobra.Command, args []string) {
} else {
mainStage.States.RunName = strings.ReplaceAll(mainStage.States.RunName, `%t`, mainStage.States.RunStartTime.Format(utils.DirectoryNameTimeFormat))
}
var workloads []string
for _, path := range args {
if st, err := processStagePath(path); err == nil {
mainStage.MergeWith(st)
// Extract workload from this path
workload := extractWorkloadFromStagePath(path)
if workload != "" {
workloads = append(workloads, workload)
}
if defaultRunNameBuilder != nil {
if defaultRunNameBuilder.Len() > 0 {
defaultRunNameBuilder.WriteByte('_')
Expand All @@ -66,12 +72,27 @@ func Run(_ *cobra.Command, args []string) {
os.Exit(-1)
}
}

// Set the workload field (join multiple workloads with comma if there are multiple)
if len(workloads) > 0 {
// Remove duplicates and join
workloadMap := make(map[string]bool)
uniqueWorkloads := []string{}
for _, w := range workloads {
if !workloadMap[w] {
workloadMap[w] = true
uniqueWorkloads = append(uniqueWorkloads, w)
}
}
mainStage.States.Workload = strings.Join(uniqueWorkloads, ",")
}

if defaultRunNameBuilder != nil {
defaultRunNameBuilder.WriteByte('_')
defaultRunNameBuilder.WriteString(mainStage.States.RunStartTime.Format(utils.DirectoryNameTimeFormat))
mainStage.States.RunName = defaultRunNameBuilder.String()
}
log.Info().Str("run_name", mainStage.States.RunName).Send()
log.Info().Str("run_name", mainStage.States.RunName).Str("workload", mainStage.States.Workload).Send()

if _, _, err := stage.ParseStageGraph(mainStage); err != nil {
log.Fatal().Err(err).Msg("failed to parse benchmark stage graph")
Expand Down Expand Up @@ -121,3 +142,34 @@ func processStagePath(path string) (st *stage.Stage, returnErr error) {
return stage.ReadStageFromFile(path)
}
}

// extractWorkloadFromStagePath extracts the workload name from stage file paths
// Examples: benchmarks/clickbench/clickbench.json -> clickbench
// benchmarks/tpc-ds/tpc-ds.json -> tpc-ds
// imdb.json -> imdb
func extractWorkloadFromStagePath(path string) string {
// Get the directory name containing the stage file
dir := filepath.Dir(path)
dirName := filepath.Base(dir)

// If the directory is "benchmarks" or current directory, use the filename
if dirName == "benchmarks" || dirName == "." {
filename := filepath.Base(path)
// Remove the .json extension
if strings.HasSuffix(filename, ".json") {
return strings.TrimSuffix(filename, ".json")
}
return filename
}

// Check if this is a known workload directory
knownWorkloads := []string{"biday3", "bolt", "catalina", "clickbench", "imdb", "nielsen", "tpc-ds", "tpch"}
for _, workload := range knownWorkloads {
if dirName == workload {
return workload
}
}

// If not a known workload, use the directory name
return dirName
}
1 change: 1 addition & 0 deletions stage/influx_run_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (i *InfluxRunRecorder) RecordQuery(ctx context.Context, s *Stage, result *Q
"run_name": s.States.RunName,
"stage_id": result.StageId,
"query_id": result.QueryId,
"workload": s.States.Workload,
}
fields := map[string]interface{}{
"query_index": result.Query.Index,
Expand Down
6 changes: 3 additions & 3 deletions stage/mysql_run_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func NewMySQLRunRecorderWithDb(db *sql.DB) *MySQLRunRecorder {
}

func (m *MySQLRunRecorder) Start(_ context.Context, s *Stage) error {
recordNewRun := `INSERT INTO pbench_runs (run_name, cluster_fqdn, start_time, queries_ran, failed, mismatch, comment)
VALUES (?, ?, ?, 0, 0, 0, ?)`
res, err := m.db.Exec(recordNewRun, s.States.RunName, s.States.ServerFQDN, s.States.RunStartTime, s.States.Comment)
recordNewRun := `INSERT INTO pbench_runs (run_name, cluster_fqdn, start_time, queries_ran, failed, mismatch, comment, workload)
VALUES (?, ?, ?, 0, 0, 0, ?, ?)`
res, err := m.db.Exec(recordNewRun, s.States.RunName, s.States.ServerFQDN, s.States.RunStartTime, s.States.Comment, s.States.Workload)
if err != nil {
log.Error().Err(err).Str("run_name", s.States.RunName).Time("start_time", s.States.RunStartTime).
Msg("failed to add a new run to the MySQL database")
Expand Down
1 change: 1 addition & 0 deletions stage/pbench_runs_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ create table if not exists pbench_runs
hidden tinyint(1) default 0 not null,
comment varchar(255) null,
rand_seed bigint null,
workload varchar(255) null,
constraint pbench_runs_run_id
unique (run_id)
);
Expand Down
1 change: 1 addition & 0 deletions stage/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type SharedStageStates struct {
RandSeedUsed bool
RunStartTime time.Time
RunFinishTime time.Time
Workload string
// OutputPath is where we store the logs, query results, query json files, query column metadata files, etc.
// It should be set by the --output/-o command-line argument. Once set there, its value gets propagated to all the stages.
OutputPath string
Expand Down