diff --git a/cmd/loader.go b/cmd/loader.go index bb69b80c..f17b69e7 100644 --- a/cmd/loader.go +++ b/cmd/loader.go @@ -51,6 +51,7 @@ var ( verbosity = flag.String("verbosity", "info", "Logging verbosity - choose from [info, debug, trace]") iatGeneration = flag.Bool("iatGeneration", false, "Generate iats only or run invocations as well") iatFromFile = flag.Bool("generated", false, "True if iats were already generated") + dryRun = flag.Bool("dryRun", false, "Dry run mode - do not deploy functions or generate invocations") ) func init() { @@ -104,6 +105,10 @@ func main() { log.Fatal("Unsupported platform!") } + if cfg.Platform == "Knative" || cfg.Platform == "Knative-RPS" { + common.CheckCPULimit(cfg.CPULimit) + } + if !strings.HasSuffix(cfg.Platform, "-RPS") { runTraceMode(&cfg, *iatFromFile, *iatGeneration) } else { @@ -200,6 +205,10 @@ func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGen Functions: functions, }) + if *dryRun { + return + } + log.Infof("Using %s as a service YAML specification file.\n", experimentDriver.Configuration.YAMLPath) experimentDriver.RunExperiment(justGenerateIAT, readIATFromFile) diff --git a/docs/loader.md b/docs/loader.md index a5330238..a14f4b59 100644 --- a/docs/loader.md +++ b/docs/loader.md @@ -119,6 +119,8 @@ $ go run cmd/loader.go --config cmd/config_knative_trace.json Additionally, one can specify log verbosity argument as `--verbosity [info, debug, trace]`. The default value is `info`. +To execute in a dry run mode without generating any load, set the `--dry-run` flag to `true`. This is useful for testing and validating configurations without executing actual requests. + For to configure the workload for load generator, please refer to `docs/configuration.md`. There are a couple of constants that should not be exposed to the users. They can be examined and changed diff --git a/docs/multi_loader.md b/docs/multi_loader.md new file mode 100644 index 00000000..02af5984 --- /dev/null +++ b/docs/multi_loader.md @@ -0,0 +1,107 @@ +# Multi-Loader + +A wrapper around loader to run multiple experiments at once with additional features like validation, dry-run, log collection + +## Prerequisites +As a wrapper around loader, multi-loader requires the initial cluster setup to be completed. See [vHive Loader to create a cluster](https://github.com/vhive-serverless/invitro/blob/main/docs/loader.md#create-a-cluster) + +## Configuration +### Multi-Loader Configuration +| Parameter name | Data type | Possible values | Default value | Description | +|---------------------|--------------------|-----------------|---------------|------------------------------------------------------------| +| Studies | []LoaderStudy | N/A | N/A | A list of loader studies with their respective configurations. See [LoaderStudy](#loaderstudy) | +| BaseConfigPath | string | "tools/multi_loader/base_loader_config.json" | N/A | Path to the base configuration file | +| PreScript | string | any bash command | "" | (Optional) A global script that runs once before all experiments | +| PostScript | string | any bash command | "" | (Optional) A global script that runs once after all experiments | + +### LoaderStudy +| Parameter name | Data type | Possible values | Default value | Description | +|-----------------------|------------------------|-------------------------------|---------------|--------------------------------------------------------------------| +| Config | map[string]interface{} | Any field in [LoaderConfiguration](https://github.com/vhive-serverless/invitro/blob/main/docs/configuration.md#loader-configuration-file-format) | N/A | The configuration for each loader experiment which overrides configurations in baseLoaderConfig | +| Name | string | N/A | N/A | The name of the loader experiment | +| TracesDir | string | N/A | N/A | Directory containing the traces for the experiment | +| TracesFormat | string | "data/traces/example_{}" | N/A | Format of the trace files **The format string "{}" is required** | +| TraceValues | []interface{} | ["any", 0, 1.1] | N/A | Values of the trace files Replaces the "{}" in TraceFormat | +| OutputDir | string | any | data/out/{Name} | (Optional) Output directory for experiment results | +| Verbosity | string | "info", "debug", "trace" | "info" | (Optional) Verbosity level for logging the experiment | +| IatGeneration | bool | true, false | false | (Optional) Whether to Generate iats only and skip invocations | +| Generated | bool | true, false | false | (Optional) if iats were already generated | +| PreScript | string | any bash Command | "" | (Optional) Local script that runs this specific experiment | +| PostScript | string | any bash Command | "" | (Optional) Local script that runs this specific experiment | + +> **_Important_**: Only one of the following is required: +> 1. `TracesDir`, or +> 2. `TracesFormat` and `TraceValues`, or +> 3. `TracePath` within the `LoaderExperiment`'s `Config` field +> +> If more than one is defined, the order of precedence is as follows: +> 1. `TracesDir`, +> 2. `TracesFormat` and `TraceValues`, +> 3. `TracePath` + +> **_Note_**: +> The `Config` field follows the same structure as the [LoaderConfiguration](https://github.com/vhive-serverless/invitro/blob/main/docs/configuration.md#loader-configuration-file-format). +> Any field defined in `Config` will override the corresponding value from the configuration in `BaseConfigPath`, but only for that specific experiment. +> For example, if `BaseConfigPath` has `ExperimentDuration` set to 5 minutes, and you define `ExperimentDuration` as 10 minutes in `Config`, that particular experiment will run for 10 minutes instead. + +## Command Flags + +The multi-loader accepts the almost the same command-line flags as loader. + +> **_Note_**: These flags will subsequently be used during the execution of loader.go for **every experiment**. If you would like to define these flag for specific experiments only, define it in [LoaderStudy](#loaderstudy) + +Available flags: + +- **`--multiLoaderConfig`** *(default: `tools/multi_loader/multi_loader_config.json`)*: + Specifies the path to the multi-loader configuration file. This file contains settings and parameters that define how the multi-loader operates [see above](#multi-loader-configuration) + +- **`--verbosity`** *(default: `info`)*: + Sets the logging verbosity level. You can choose from the following options: + - `info`: Standard information messages. + - `debug`: Detailed debugging messages. + - `trace`: Extremely verbose logging, including detailed execution traces. + +- **`--iatGeneration`** *(default: `false`)*: + If set to `true`, the multi-loader will generate inter-arrival times (IATs) only and skip the invocation of actual workloads. This is useful for scenarios where you want to analyze or generate IATs without executing the associated load. + +- **`--generated`** *(default: `false`)*: + Indicates whether IATs have already been generated. If set to `true`, the multi-loader will use the existing IATs instead of generating new ones. + + +## Multi-loader Overall Flow + +1. **Initialization** + - Flags for configuration file path, verbosity, IAT generation, and execution mode are parsed + - Logger is initialized based on verbosity level + +3. **Experiment Execution Flow** + - The multi-loader runner is instantiated with the provided configuration path. + - A dry run is executed to validate the setup for all studies: + - If any dry run fails, the execution terminates. + - If all dry runs succeed, proceed to actual runs: + - Global pre-scripts are executed. + - Each experiment undergoes the following steps: + 1. **Pre-Execution Setup** + - Experiment-specific pre-scripts are executed. + - Necessary directories and folders are created. + - Each sub-experiment is unpacked and prepared + 2. **Experiment Invocation** + - The loader is executed with generated configurations and related flags + 3. **Post-Execution Steps** + - Experiment-specific post-scripts are executed + - Cleanup tasks are performed + +4. **Completion** + - Global post-scripts are executed. + - Run Make Clean + +### How the Dry Run Works + +The dry run mode executes the loader with the `--dryRun` flag set to true after the unpacking of experiments defined in the multi-loader configurations. + +In this mode, the loader performs the following actions: + +- **Configuration Validation**: It verifies the experiment configurations without deploying any functions or generating invocations. +- **Error Handling**: If a fatal error occurs at any point, the experiment will halt immediately. + +The purpose is to ensure that your configurations are correct and to identify any potential issues before actual execution. \ No newline at end of file diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 20d1e27c..bf455791 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -97,3 +97,11 @@ const ( AwsRegion = "us-east-1" AwsTraceFuncRepositoryName = "invitro_trace_function_aws" ) + +// CPULimits +const ( + CPULimit1vCPU string = "1vCPU" + CPULimitGCP string = "GCP" +) + +var ValidCPULimits = []string{CPULimit1vCPU, CPULimitGCP} diff --git a/pkg/common/utilities.go b/pkg/common/utilities.go index 94f6f310..13f30be6 100644 --- a/pkg/common/utilities.go +++ b/pkg/common/utilities.go @@ -25,11 +25,15 @@ package common import ( + "encoding/json" "hash/fnv" "log" "math/rand" + "os/exec" "strconv" "strings" + + logger "github.com/sirupsen/logrus" ) type Pair struct { @@ -135,3 +139,41 @@ func SumNumberOfInvocations(withWarmup bool, totalDuration int, functions []*Fun return result } + +func DeepCopy[T any](a T) (T, error) { + var b T + byt, err := json.Marshal(a) + if err != nil { + return b, err + } + err = json.Unmarshal(byt, &b) + return b, err +} + +func RunScript(command string) { + if command == "" { + return + } + logger.Info("Running command ", command) + cmd, err := exec.Command("/bin/sh", command).Output() + if err != nil { + log.Fatal(err) + } + logger.Info(string(cmd)) +} + +func ParseLogType(logString string) string { + logTypeArr := strings.Split(logString, "level=") + if len(logTypeArr) > 1 { + return strings.Split(logTypeArr[1], " ")[0] + } + return "info" +} + +func ParseLogMessage(logString string) string { + message := strings.Split(logString, "msg=") + if len(message) > 1 { + return message[1][1 : len(message[1])-1] + } + return logString +} \ No newline at end of file diff --git a/pkg/common/validators.go b/pkg/common/validators.go new file mode 100644 index 00000000..7341b546 --- /dev/null +++ b/pkg/common/validators.go @@ -0,0 +1,45 @@ +package common + +import ( + "bytes" + "net" + "os" + "os/exec" + "slices" + + log "github.com/sirupsen/logrus" +) + +func CheckNode(node string) { + if !IsValidIP(node) { + log.Fatal("Invalid IP address for node ", node) + } + cmd := exec.Command("ssh", "-oStrictHostKeyChecking=no", "-p", "22", node, "exit") + // -oStrictHostKeyChecking=no -p 22 + out, err := cmd.CombinedOutput() + if bytes.Contains(out, []byte("Permission denied")) || err != nil { + log.Error(string(out)) + log.Fatal("Failed to connect to node ", node) + } +} + +func CheckPath(path string) { + if (path) == "" { + return + } + _, err := os.Stat(path) + if err != nil { + log.Fatal(err) + } +} + +func IsValidIP(ip string) bool { + parsedIP := net.ParseIP(ip) + return parsedIP != nil +} + +func CheckCPULimit(cpuLimit string) { + if !slices.Contains(ValidCPULimits, cpuLimit) { + log.Fatal("Invalid CPU Limit ", cpuLimit) + } +} diff --git a/tools/multi_loader/base_loader_config.json b/tools/multi_loader/base_loader_config.json new file mode 100644 index 00000000..209e3043 --- /dev/null +++ b/tools/multi_loader/base_loader_config.json @@ -0,0 +1,23 @@ +{ + "Seed": 42, + "Platform": "Knative", + "InvokeProtocol": "grpc", + "YAMLSelector": "container", + "EndpointPort": 80, + "BusyLoopOnSandboxStartup": false, + "TracePath": "data/traces/example", + "Granularity": "minute", + "OutputPathPrefix": "data/out/experiment", + "IATDistribution": "exponential", + "CPULimit": "1vCPU", + "ExperimentDuration": 5, + "WarmupDuration": 0, + "IsPartiallyPanic": false, + "EnableZipkinTracing": false, + "EnableMetricsScrapping": false, + "MetricScrapingPeriodSeconds": 15, + "AutoscalingMetric": "concurrency", + "GRPCConnectionTimeoutSeconds": 15, + "GRPCFunctionTimeoutSeconds": 900, + "DAGMode": false +} \ No newline at end of file diff --git a/tools/multi_loader/common/constants.go b/tools/multi_loader/common/constants.go new file mode 100644 index 00000000..2645c501 --- /dev/null +++ b/tools/multi_loader/common/constants.go @@ -0,0 +1,15 @@ +package common + +const ( + TraceFormatString = "{}" +) + +// Multi-loader possible collectable metrics +const ( + Activator string = "activator" + AutoScaler string = "autoscaler" + TOP string = "top" + Prometheus string = "prometheus" +) + +var ValidCollectableMetrics = []string{Activator, AutoScaler, TOP, Prometheus} diff --git a/tools/multi_loader/common/utils.go b/tools/multi_loader/common/utils.go new file mode 100644 index 00000000..a97ea49a --- /dev/null +++ b/tools/multi_loader/common/utils.go @@ -0,0 +1,40 @@ +package common + +import ( + "encoding/json" + "os" + + log "github.com/sirupsen/logrus" + + "github.com/vhive-serverless/loader/pkg/config" + "github.com/vhive-serverless/loader/tools/multi_loader/types" +) + +func ReadMultiLoaderConfigurationFile(path string) types.MultiLoaderConfiguration { + byteValue, err := os.ReadFile(path) + if err != nil { + log.Fatal(err) + } + + var config types.MultiLoaderConfiguration + err = json.Unmarshal(byteValue, &config) + if err != nil { + log.Fatal(err) + } + + return config +} + +func DeterminePlatformFromConfig(multiLoaderConfig types.MultiLoaderConfiguration) string { + // Determine platform + baseConfigByteValue, err := os.ReadFile(multiLoaderConfig.BaseConfigPath) + if err != nil { + log.Fatal(err) + } + var loaderConfig config.LoaderConfiguration + // Unmarshal base configuration + if err = json.Unmarshal(baseConfigByteValue, &loaderConfig); err != nil { + log.Fatal(err) + } + return loaderConfig.Platform +} diff --git a/tools/multi_loader/common/validators.go b/tools/multi_loader/common/validators.go new file mode 100644 index 00000000..1b818b91 --- /dev/null +++ b/tools/multi_loader/common/validators.go @@ -0,0 +1,52 @@ +package common + +import ( + "path" + "slices" + "strings" + + log "github.com/sirupsen/logrus" + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/tools/multi_loader/types" +) + +// Check general multi-loader configuration that applies to all platforms +func CheckMultiLoaderConfig(multiLoaderConfig types.MultiLoaderConfiguration) { + log.Info("Checking multi-loader configuration") + // Check if all paths are valid + common.CheckPath(multiLoaderConfig.BaseConfigPath) + // Check each study + if len(multiLoaderConfig.Studies) == 0 { + log.Fatal("No study found in configuration file") + } + for _, study := range multiLoaderConfig.Studies { + // Check trace directory + // if configs does not have TracePath or OutputPathPreix, either TracesDir or (TracesFormat and TraceValues) should be defined along with OutputDir + if study.TracesDir == "" && (study.TracesFormat == "" || len(study.TraceValues) == 0) { + if _, ok := study.Config["TracePath"]; !ok { + log.Fatal("Missing one of TracesDir, TracesFormat & TraceValues, Config.TracePath in multi_loader_config ", study.Name) + } + } + if study.TracesFormat != "" { + // check if trace format contains TRACE_FORMAT_STRING + if !strings.Contains(study.TracesFormat, TraceFormatString) { + log.Fatal("Invalid TracesFormat in multi_loader_config ", study.Name, ". Missing ", TraceFormatString, " in format") + } + } + if study.OutputDir == "" { + if _, ok := study.Config["OutputPathPrefix"]; !ok { + log.Warn("Missing one of OutputDir or Config.OutputPathPrefix in multi_loader_config ", study.Name) + // set default output directory + study.OutputDir = path.Join("data", "out", study.Name) + log.Warn("Setting default output directory to ", study.OutputDir) + } + } + } + log.Info("All experiments configs are valid") +} + +func CheckCollectableMetrics(metrics string) { + if !slices.Contains(ValidCollectableMetrics, metrics) { + log.Fatal("Invalid metrics ", metrics) + } +} diff --git a/tools/multi_loader/multi_loader.go b/tools/multi_loader/multi_loader.go new file mode 100644 index 00000000..c9ba571e --- /dev/null +++ b/tools/multi_loader/multi_loader.go @@ -0,0 +1,62 @@ +package main + +import ( + "flag" + "os" + "time" + + log "github.com/sirupsen/logrus" + "github.com/vhive-serverless/loader/tools/multi_loader/runner" +) + +var ( + multiLoaderConfigPath = flag.String("multiLoaderConfig", "tools/multi_loader/multi_loader_config.json", "Path to multi loader configuration file") + verbosity = flag.String("verbosity", "info", "Logging verbosity - choose from [info, debug, trace]") + iatGeneration = flag.Bool("iatGeneration", false, "Generate iats only and skip invocations") + generated = flag.Bool("generated", false, "If iats were already generated") +) + +func init() { + flag.Parse() + initLogger() +} + +func initLogger() { + log.SetFormatter(&log.TextFormatter{ + TimestampFormat: time.StampMilli, + FullTimestamp: true, + }) + log.SetOutput(os.Stdout) + + switch *verbosity { + case "debug": + log.SetLevel(log.DebugLevel) + case "trace": + log.SetLevel(log.TraceLevel) + default: + log.SetLevel(log.InfoLevel) + } +} + +func main() { + log.Info("Starting multiloader") + // Create multi loader driver + multiLoaderDriver, err := runner.NewMultiLoaderRunner(*multiLoaderConfigPath, *verbosity, *iatGeneration, *generated) + if err != nil { + log.Fatalf("Failed to create multi loader driver: %v", err) + } + // Dry run + multiLoaderDriver.RunDryRun() + + // Check if dry run was successful + if !multiLoaderDriver.DryRunSuccess { + log.Fatal("Dry run failed. Exiting...") + } + + // Actual run + log.Info("Running experiments") + multiLoaderDriver.RunActual() + + // Finish + log.Info("All experiments completed") +} diff --git a/tools/multi_loader/multi_loader_config.json b/tools/multi_loader/multi_loader_config.json new file mode 100644 index 00000000..55e4491b --- /dev/null +++ b/tools/multi_loader/multi_loader_config.json @@ -0,0 +1,24 @@ +{ + "Studies": [ + { + "Name": "experiment1", + "Config": { + "ExperimentDuration": 1 + }, + "IatGeneration": false, + "Generated": false, + "Verbosity": "info", + "TracesDir": "", + "TracesFormat": "data/traces/multi-exp-test/example_{}_test", + "TraceValues": [ + 1 + ], + "OutputDir": "data/out/multi-test", + "PreScript": "", + "PostScript": "" + } + ], + "PreScript": "", + "PostScript": "", + "BaseConfigPath": "tools/multi_loader/base_loader_config.json" +} \ No newline at end of file diff --git a/tools/multi_loader/runner/multi_loader_runner.go b/tools/multi_loader/runner/multi_loader_runner.go new file mode 100644 index 00000000..a183100d --- /dev/null +++ b/tools/multi_loader/runner/multi_loader_runner.go @@ -0,0 +1,384 @@ +package runner + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "path" + "strconv" + "strings" + "time" + + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/config" + ml_common "github.com/vhive-serverless/loader/tools/multi_loader/common" + "github.com/vhive-serverless/loader/tools/multi_loader/types" + + log "github.com/sirupsen/logrus" +) + +const ( + LOADER_PATH = "cmd/loader.go" + TIME_FORMAT = "Jan_02_1504" + EXPERIMENT_TEMP_CONFIG_PATH = "tools/multi_loader/current_running_config.json" + NUM_OF_RETRIES = 2 +) + +type MultiLoaderRunner struct { + MultiLoaderConfig types.MultiLoaderConfiguration + NodeGroup types.NodeGroup + DryRunSuccess bool + Verbosity string + IatGeneration bool + Generated bool + DryRun bool + Platform string +} + +// init multi loader runner +func NewMultiLoaderRunner(configPath string, verbosity string, iatGeneration bool, generated bool) (*MultiLoaderRunner, error) { + multiLoaderConfig := ml_common.ReadMultiLoaderConfigurationFile(configPath) + + // validate configuration + ml_common.CheckMultiLoaderConfig(multiLoaderConfig) + + // determine platform + platform := ml_common.DeterminePlatformFromConfig(multiLoaderConfig) + + runner := MultiLoaderRunner{ + MultiLoaderConfig: multiLoaderConfig, + DryRunSuccess: true, + Verbosity: verbosity, + IatGeneration: iatGeneration, + Generated: generated, + DryRun: false, + Platform: platform, + } + + return &runner, nil +} + +func (d *MultiLoaderRunner) RunDryRun() { + log.Info("Running dry run") + d.DryRun = true + d.run() +} + +func (d *MultiLoaderRunner) RunActual() { + log.Info("Running actual experiments") + d.DryRun = false + d.run() +} + +func (d *MultiLoaderRunner) run() { + // Run global prescript + common.RunScript(d.MultiLoaderConfig.PreScript) + // Iterate over studies and run them + for _, study := range d.MultiLoaderConfig.Studies { + log.Info("Setting up experiment: ", study.Name) + // Run pre script + common.RunScript(study.PreScript) + + // Unpack study to a list of studies with different loader configs + sparseExperiments := d.unpackStudy(study) + + // Iterate over sparse experiments, prepare and run + for _, experiment := range sparseExperiments { + if d.DryRun { + log.Info("Dry Running: ", experiment.Name) + } + // Prepare experiment: merge with base config, create output dir and write merged config to temp file + d.prepareExperiment(experiment) + + err := d.runExperiment(experiment) + + // Perform cleanup + d.performCleanup() + + // Check if should continue this study + if err != nil { + log.Info("Experiment failed: ", experiment.Name, ". Skipping remaining experiments in study...") + break + } + } + // Run post script + common.RunScript(study.PostScript) + if len(sparseExperiments) > 1 && !d.DryRun { + log.Info("All experiments for ", study.Name, " completed") + } + } + // Run global postscript + common.RunScript(d.MultiLoaderConfig.PostScript) +} + +/** +* As a study can have multiple experiments, this function will unpack the study +* but first by duplicating the study to multiple studies with different values +* in the config field. Those values will override the base loader config later + */ +func (d *MultiLoaderRunner) unpackStudy(experiment types.LoaderStudy) []types.LoaderStudy { + log.Info("Unpacking experiment ", experiment.Name) + var experiments []types.LoaderStudy + + // if user specified a trace directory + if experiment.TracesDir != "" { + experiments = d.unpackFromTraceDir(experiment) + // user define trace format and values instead of directory + } else if experiment.TracesFormat != "" && len(experiment.TraceValues) > 0 { + experiments = d.unpackFromTraceValues(experiment) + } else { + // Theres only one experiment in the study + experiments = d.unpackSingleExperiment(experiment) + } + + return experiments +} + +func (d *MultiLoaderRunner) unpackFromTraceDir(study types.LoaderStudy) []types.LoaderStudy { + var experiments []types.LoaderStudy + files, err := os.ReadDir(study.TracesDir) + if err != nil { + log.Fatal(err) + } + + for _, file := range files { + newExperiment := d.duplicateStudy(study, file.Name()) + newExperiment.Config["TracePath"] = path.Join(study.TracesDir, file.Name()) + newExperiment.Name += "_" + file.Name() + experiments = append(experiments, newExperiment) + } + return experiments +} + +func (d *MultiLoaderRunner) unpackFromTraceValues(study types.LoaderStudy) []types.LoaderStudy { + var experiments []types.LoaderStudy + for _, traceValue := range study.TraceValues { + tracePath := strings.Replace(study.TracesFormat, ml_common.TraceFormatString, fmt.Sprintf("%v", traceValue), -1) + fileName := path.Base(tracePath) + newExperiment := d.duplicateStudy(study, fileName) + newExperiment.Config["TracePath"] = tracePath + newExperiment.Name += "_" + fileName + experiments = append(experiments, newExperiment) + } + return experiments +} + +func (d *MultiLoaderRunner) unpackSingleExperiment(study types.LoaderStudy) []types.LoaderStudy { + var experiments []types.LoaderStudy + pathDir := "" + if study.Config["OutputPathPrefix"] != nil { + pathDir = path.Dir(study.Config["OutputPathPrefix"].(string)) + } else { + pathDir = study.OutputDir + } + study.OutputDir = pathDir + newExperiment := d.duplicateStudy(study, study.Name) + experiments = append(experiments, newExperiment) + return experiments +} + +/** +* Creates a deepcopy of a given study and updates relevant fields to utilise the provided new filename + */ +func (d *MultiLoaderRunner) duplicateStudy(study types.LoaderStudy, newFileName string) types.LoaderStudy { + newStudy, err := common.DeepCopy(study) + if err != nil { + log.Fatal(err) + } + + dryRunAdditionalPath := "" + if d.DryRun { + dryRunAdditionalPath = "dry_run" + } + newStudy.Config["OutputPathPrefix"] = path.Join( + study.OutputDir, + study.Name, + dryRunAdditionalPath, + time.Now().Format(TIME_FORMAT)+"_"+newFileName, + newFileName, + ) + d.addCommandFlags(newStudy) + return newStudy +} + +func (d *MultiLoaderRunner) addCommandFlags(study types.LoaderStudy) { + // Add flags to experiment config + if study.Verbosity == "" { + study.Verbosity = d.Verbosity + } + if !study.IatGeneration { + study.IatGeneration = d.IatGeneration + } + if !study.Generated { + study.Generated = d.Generated + } +} + +/** +* Prepare experiment by merging with base config, creating output directory and writing experiment config to temp file + */ +func (d *MultiLoaderRunner) prepareExperiment(experiment types.LoaderStudy) { + log.Info("Preparing ", experiment.Name) + // Merge base configs with experiment configs + experimentConfig := d.mergeConfigurations(d.MultiLoaderConfig.BaseConfigPath, experiment) + + // Create output directory + outputDir := path.Dir(experimentConfig.OutputPathPrefix) + + if err := os.MkdirAll(outputDir, 0755); err != nil { + log.Fatal(err) + } + // Write experiment configs to temp file + d.writeExperimentConfigToTempFile(experimentConfig, EXPERIMENT_TEMP_CONFIG_PATH) +} + +/** +* Merge base configs with partial loader configs + */ +func (d *MultiLoaderRunner) mergeConfigurations(baseConfigPath string, experiment types.LoaderStudy) config.LoaderConfiguration { + // Read base configuration + baseConfigByteValue, err := os.ReadFile(baseConfigPath) + if err != nil { + log.Fatal(err) + } + log.Debug("Experiment configuration ", experiment.Config) + + var mergedConfig config.LoaderConfiguration + // Unmarshal base configuration + if err = json.Unmarshal(baseConfigByteValue, &mergedConfig); err != nil { + log.Fatal(err) + } + + log.Debug("Base configuration ", mergedConfig) + + // merge experiment config onto base config + experimentConfigBytes, _ := json.Marshal(experiment.Config) + if err = json.Unmarshal(experimentConfigBytes, &mergedConfig); err != nil { + log.Fatal(err) + } + log.Debug("Merged configuration ", mergedConfig) + + return mergedConfig +} + +func (d *MultiLoaderRunner) writeExperimentConfigToTempFile(experimentConfig config.LoaderConfiguration, fileWritePath string) { + experimentConfigBytes, _ := json.Marshal(experimentConfig) + err := os.WriteFile(fileWritePath, experimentConfigBytes, 0644) + if err != nil { + log.Fatal(err) + } +} + +func (d *MultiLoaderRunner) runExperiment(experiment types.LoaderStudy) error { + log.Info("Running ", experiment.Name) + log.Debug("Experiment configuration ", experiment.Config) + + // Create the log file + logFilePath := path.Join(path.Dir(experiment.Config["OutputPathPrefix"].(string)), "loader.log") + logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Fatal(err) + } + defer logFile.Close() + + for i := 0; i < NUM_OF_RETRIES; i++ { + // Run loader.go with experiment configs + if err := d.executeLoaderCommand(experiment, logFile); err != nil { + log.Error(err) + log.Error("Experiment failed: ", experiment.Name) + logFile.WriteString("Experiment failed: " + experiment.Name + ". Error: " + err.Error() + "\n") + if i == 0 && !d.DryRun { + log.Info("Retrying experiment ", experiment.Name) + logFile.WriteString("==================================RETRYING==================================\n") + experiment.Verbosity = "debug" + } else { + // Experiment failed set dry run flag to false + d.DryRunSuccess = false + log.Error("Check log file for more information: ", logFilePath) + // should not continue with experiment + return err + } + continue + } else { + break + } + } + log.Info("Completed ", experiment.Name) + return nil +} + +func (d *MultiLoaderRunner) executeLoaderCommand(experiment types.LoaderStudy, logFile *os.File) error { + cmd := exec.Command("go", "run", LOADER_PATH, + "--config="+EXPERIMENT_TEMP_CONFIG_PATH, + "--verbosity="+experiment.Verbosity, + "--iatGeneration="+strconv.FormatBool(experiment.IatGeneration), + "--generated="+strconv.FormatBool(experiment.Generated), + "--dryRun="+strconv.FormatBool(d.DryRun)) + + stdout, _ := cmd.StdoutPipe() + stderr, _ := cmd.StderrPipe() + + if err := cmd.Start(); err != nil { + return err + } + + go d.logLoaderStdOutput(stdout, logFile) + go d.logLoaderStdError(stderr, logFile) + + return cmd.Wait() +} + +func (d *MultiLoaderRunner) logLoaderStdOutput(stdPipe io.ReadCloser, logFile *os.File) { + scanner := bufio.NewScanner(stdPipe) + for scanner.Scan() { + m := scanner.Text() + // write to log file + logFile.WriteString(m + "\n") + + // Log key information + if m == "" { + continue + } + logType := common.ParseLogType(m) + message := common.ParseLogMessage(m) + + switch logType { + case "debug": + log.Debug(message) + case "trace": + log.Trace(message) + default: + if strings.Contains(message, "Number of successful invocations:") || strings.Contains(message, "Number of failed invocations:") { + log.Info(strings.ReplaceAll(strings.ReplaceAll(message, "\\t", " "), "\\n", "")) + } + } + } +} + +func (d *MultiLoaderRunner) logLoaderStdError(stdPipe io.ReadCloser, logFile *os.File) { + scanner := bufio.NewScanner(stdPipe) + for scanner.Scan() { + m := scanner.Text() + // write to log file + logFile.WriteString(m + "\n") + + if m == "" { + continue + } + log.Error(m) + } +} + +func (d *MultiLoaderRunner) performCleanup() { + log.Info("Runnning Cleanup") + // Run make clean + if err := exec.Command("make", "clean").Run(); err != nil { + log.Error(err) + } + log.Info("Cleanup completed") + // Remove temp file + os.Remove(EXPERIMENT_TEMP_CONFIG_PATH) +} diff --git a/tools/multi_loader/runner/multi_loader_runner_test.go b/tools/multi_loader/runner/multi_loader_runner_test.go new file mode 100644 index 00000000..a914eafe --- /dev/null +++ b/tools/multi_loader/runner/multi_loader_runner_test.go @@ -0,0 +1,237 @@ +package runner + +import ( + "os" + "path/filepath" + "strings" + "testing" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + ml_common "github.com/vhive-serverless/loader/tools/multi_loader/common" + "github.com/vhive-serverless/loader/tools/multi_loader/types" +) + +var ( + multiLoaderTestConfigPath string + configPath string +) + +func init() { + wd, _ := os.Getwd() + multiLoaderTestConfigPath = filepath.Join(wd, "./test_configs/test_multi_loader_config.json") + configPath = filepath.Join(wd, "./test_configs/base_loader_config.json") + log.Info("Test config path: ", multiLoaderTestConfigPath) + log.Info("Test config path: ", configPath) +} + +func TestUnpackExperiment(t *testing.T) { + // helper func to validate unpacked experiments + validateUnpackedExperiment := func(t *testing.T, experimentConfig []types.LoaderStudy, studyConfig types.LoaderStudy, expectedNames []string, expectedOutputPrefixes []string) { + if len(experimentConfig) != len(expectedNames) { + t.Errorf("Expected %d sub-experiments, got %d", len(expectedNames), len(experimentConfig)) + return + } + + for i, subExp := range experimentConfig { + // check name + if subExp.Name != expectedNames[i] { + t.Errorf("Expected subexperiment name '%s', got '%s'", expectedNames[i], subExp.Name) + } + + // validate selected configs + if subExp.Config["ExperimentDuration"] != studyConfig.Config["ExperimentDuration"] { + t.Errorf("Expected ExperimentDuration %v, got %v", studyConfig.Config["ExperimentDuration"], subExp.Config["ExperimentDuration"]) + } + if subExp.OutputDir != studyConfig.OutputDir { + t.Errorf("Expected OutputDir '%s', got '%s'", studyConfig.OutputDir, subExp.OutputDir) + } + + // check OutputPathPrefix if needed + if len(expectedOutputPrefixes) > 0 { + if outputPathPrefix, ok := subExp.Config["OutputPathPrefix"].(string); !(ok && strings.HasSuffix(outputPathPrefix, expectedOutputPrefixes[i])) { + t.Errorf("Expected OutputPathPrefix '%s', got '%s'", expectedOutputPrefixes[i], subExp.Config["OutputPathPrefix"]) + } + } + } + } + + // create multiloader + multiLoader, err := NewMultiLoaderRunner(multiLoaderTestConfigPath, "info", false, false) + if err != nil { + t.Fatalf("Failed to create multi-loader driver: %v", err) + } + + t.Run("Unpack using TracesDir (Success)", func(t *testing.T) { + // Set TracesDir to test directory + multiLoader.MultiLoaderConfig.Studies[0].TracesDir = "./test_multi_trace" + + for _, experiment := range multiLoader.MultiLoaderConfig.Studies { + subExperiments := multiLoader.unpackStudy(experiment) + expectedNames := []string{"test-experiment_example_1_test", "test-experiment_example_2_test", "test-experiment_example_3.1_test"} + expectedOutputPrefixes := []string{"example_1_test", "example_2_test", "example_3.1_test"} + validateUnpackedExperiment(t, subExperiments, experiment, expectedNames, expectedOutputPrefixes) + } + }) + + t.Run("Unpack using TracesDir (Failure: Incorrect Dir)", func(t *testing.T) { + expectFatal(t, func() { + multiLoader.MultiLoaderConfig.Studies[0].TracesDir = "./test_multi_trace_incorrect" + for _, experiment := range multiLoader.MultiLoaderConfig.Studies { + _ = multiLoader.unpackStudy(experiment) + } + }) + }) + + t.Run("Unpack using TraceFormat and TraceValues", func(t *testing.T) { + multiLoader.MultiLoaderConfig.Studies[0].TracesDir = "" + + for _, experiment := range multiLoader.MultiLoaderConfig.Studies { + subExperiments := multiLoader.unpackStudy(experiment) + expectedNames := make([]string, len(experiment.TraceValues)) + for i, traceValue := range experiment.TraceValues { + expectedNames[i] = experiment.Name + "_" + traceValue.(string) + } + validateUnpackedExperiment(t, subExperiments, experiment, expectedNames, nil) + } + }) + + t.Run("Unpack using tracePath", func(t *testing.T) { + multiLoader.MultiLoaderConfig.Studies[0].TracesDir = "" + multiLoader.MultiLoaderConfig.Studies[0].TracesFormat = "" + multiLoader.MultiLoaderConfig.Studies[0].TraceValues = nil + + for _, experiment := range multiLoader.MultiLoaderConfig.Studies { + subExperiments := multiLoader.unpackStudy(experiment) + expectedNames := []string{experiment.Name} + validateUnpackedExperiment(t, subExperiments, experiment, expectedNames, nil) + } + }) +} + +func TestPrepareExperiment(t *testing.T) { + // Create a new multi-loader driver with the test config path + multiLoader, err := NewMultiLoaderRunner(multiLoaderTestConfigPath, "info", false, false) + if err != nil { + t.Fatalf("Failed to create multi-loader driver: %v", err) + } + + subExperiment := types.LoaderStudy{ + Name: "example_1", + Config: map[string]interface{}{ + "ExperimentDuration": 10, + "TracePath": "./test_multi_trace/example_1_test", + "OutputPathPrefix": "./test_output/example_1_test", + }, + } + + if err := os.MkdirAll(filepath.Dir(EXPERIMENT_TEMP_CONFIG_PATH), 0755); err != nil { + t.Fatalf("Failed to create temp config directory: %v", err) + } + multiLoader.prepareExperiment(subExperiment) + + // Check that the output directory and config file were created + outputDir := "./test_output" + tempConfigPath := EXPERIMENT_TEMP_CONFIG_PATH + + // Verify the output directory exists + if _, err := os.Stat(outputDir); os.IsNotExist(err) { + t.Errorf("Expected output directory '%s' to be created, but it was not", outputDir) + } + + // Verify the temporary config file exists + if _, err := os.Stat(tempConfigPath); os.IsNotExist(err) { + t.Errorf("Expected temp config file '%s' to be created, but it was not", tempConfigPath) + } + + // Clean up created files and directories + os.RemoveAll("./tools") + os.RemoveAll(outputDir) +} + +// Test mergeConfigurations method +func TestMergeConfig(t *testing.T) { + // Create a new multi-loader driver with the test config path + multiLoader, err := NewMultiLoaderRunner(multiLoaderTestConfigPath, "info", false, false) + if err != nil { + t.Fatalf("Failed to create multi-loader driver: %v", err) + } + experiment := types.LoaderStudy{ + Name: "example_1", + Config: map[string]interface{}{ + "ExperimentDuration": 10, + "TracePath": "./test_multi_trace/example_1_test", + "OutputPathPrefix": "./test_output/example_1_test", + }, + } + outputConfig := multiLoader.mergeConfigurations("./test_configs/test_base_loader_config.json", experiment) + // Check if the configurations are merged + if outputConfig.TracePath != "./test_multi_trace/example_1_test" { + t.Errorf("Expected TracePath to be './test_multi_trace/example_1_test', got %v", experiment.Config["TracePath"]) + } + if outputConfig.OutputPathPrefix != "./test_output/example_1_test" { + t.Errorf("Expected OutputPathPrefix to be './test_output/example_1_test', got %v", experiment.Config["OutputPathPrefix"]) + } + if outputConfig.ExperimentDuration != 10 { + t.Errorf("Expected ExperimentDuration to be 10, got %v", experiment.Config["ExperimentDuration"]) + } +} + +func TestMultiConfigValidator(t *testing.T) { + // Create a new multi-loader driver with the test config path + multiLoader, err := NewMultiLoaderRunner(multiLoaderTestConfigPath, "info", false, false) + if err != nil { + t.Fatalf("Failed to create multi-loader driver: %v", err) + } + t.Run("CheckMultiLoaderConfig (Success)", func(t *testing.T) { + // Check if all paths are valid + ml_common.CheckMultiLoaderConfig(multiLoader.MultiLoaderConfig) + }) + + t.Run("CheckMultiLoaderConfig (Failure: No Study)", func(t *testing.T) { + expectFatal(t, func() { + temp := multiLoader.MultiLoaderConfig.Studies + multiLoader.MultiLoaderConfig.Studies = nil + ml_common.CheckMultiLoaderConfig(multiLoader.MultiLoaderConfig) + multiLoader.MultiLoaderConfig.Studies = temp + }) + }) + + t.Run("CheckMultiLoaderConfig (Failure: Missing TracesDir, TracesFormat, TraceValues)", func(t *testing.T) { + expectFatal(t, func() { + multiLoader.MultiLoaderConfig.Studies[0].TracesDir = "" + multiLoader.MultiLoaderConfig.Studies[0].TracesFormat = "" + multiLoader.MultiLoaderConfig.Studies[0].TraceValues = nil + ml_common.CheckMultiLoaderConfig(multiLoader.MultiLoaderConfig) + }) + }) + + t.Run("CheckMultiLoaderConfig (Failure: Invalid TracesFormat)", func(t *testing.T) { + expectFatal(t, func() { + multiLoader.MultiLoaderConfig.Studies[0].TracesFormat = "invalid_format" + ml_common.CheckMultiLoaderConfig(multiLoader.MultiLoaderConfig) + }) + }) + + t.Run("CheckMultiLoaderConfig (Failure: Missing TracesValues)", func(t *testing.T) { + expectFatal(t, func() { + multiLoader.MultiLoaderConfig.Studies[0].TraceValues = nil + multiLoader.MultiLoaderConfig.Studies[0].TracesDir = "" + multiLoader.MultiLoaderConfig.Studies[0].TracesFormat = "example_{}_test" + ml_common.CheckMultiLoaderConfig(multiLoader.MultiLoaderConfig) + }) + }) +} + +func expectFatal(t *testing.T, funcToTest func()) { + fatal := false + originalExitFunc := log.StandardLogger().ExitFunc + + // replace logrus exit function + log.StandardLogger().ExitFunc = func(int) { fatal = true } + + funcToTest() + // restore original state + log.StandardLogger().ExitFunc = originalExitFunc + assert.True(t, fatal, "Expected log.Fatal to be called") +} diff --git a/tools/multi_loader/runner/test_configs/test_base_loader_config.json b/tools/multi_loader/runner/test_configs/test_base_loader_config.json new file mode 100644 index 00000000..b20768b1 --- /dev/null +++ b/tools/multi_loader/runner/test_configs/test_base_loader_config.json @@ -0,0 +1,23 @@ +{ + "Seed": 42, + "Platform": "Test", + "InvokeProtocol": "grpc", + "YAMLSelector": "container", + "EndpointPort": 80, + "BusyLoopOnSandboxStartup": false, + "TracePath": "data/traces/example", + "Granularity": "minute", + "OutputPathPrefix": "data/out/experiment", + "IATDistribution": "exponential", + "CPULimit": "1vCPU", + "ExperimentDuration": 5, + "WarmupDuration": 0, + "IsPartiallyPanic": false, + "EnableZipkinTracing": false, + "EnableMetricsScrapping": false, + "MetricScrapingPeriodSeconds": 15, + "AutoscalingMetric": "concurrency", + "GRPCConnectionTimeoutSeconds": 15, + "GRPCFunctionTimeoutSeconds": 900, + "DAGMode": false +} \ No newline at end of file diff --git a/tools/multi_loader/runner/test_configs/test_multi_loader_config.json b/tools/multi_loader/runner/test_configs/test_multi_loader_config.json new file mode 100644 index 00000000..9138a040 --- /dev/null +++ b/tools/multi_loader/runner/test_configs/test_multi_loader_config.json @@ -0,0 +1,20 @@ +{ + "Studies": [ + { + "Name": "test-experiment", + "Config": { + "ExperimentDuration": 1 + }, + "IatGeneration": false, + "Generated": false, + "Verbosity": "info", + "TracesFormat": "data/traces/{}", + "TraceValues": [ + "example", + "example2" + ], + "OutputDir": "data/out/multi-test" + } + ], + "BaseConfigPath": "./test_configs/test_base_loader_config.json" +} \ No newline at end of file diff --git a/tools/multi_loader/types/config_types.go b/tools/multi_loader/types/config_types.go new file mode 100644 index 00000000..7c9ce36a --- /dev/null +++ b/tools/multi_loader/types/config_types.go @@ -0,0 +1,27 @@ +package types + +type MultiLoaderConfiguration struct { + Studies []LoaderStudy `json:"Studies"` + BaseConfigPath string `json:"BaseConfigPath"` + // Optional + PreScript string `json:"PreScript"` + PostScript string `json:"PostScript"` +} + +type LoaderStudy struct { + Name string `json:"Name"` + Config map[string]interface{} `json:"Config"` + // A combination of format and values or just dir should be specified + TracesDir string `json:"TracesDir"` + + TracesFormat string `json:"TracesFormat"` + TraceValues []interface{} `json:"TraceValues"` + + // Optional + OutputDir string `json:"OutputDir"` + Verbosity string `json:"Verbosity"` + IatGeneration bool `json:"IatGeneration"` + Generated bool `json:"Generated"` + PreScript string `json:"PreScript"` + PostScript string `json:"PostScript"` +} diff --git a/tools/multi_loader/types/node_types.go b/tools/multi_loader/types/node_types.go new file mode 100644 index 00000000..96b11bfb --- /dev/null +++ b/tools/multi_loader/types/node_types.go @@ -0,0 +1,9 @@ +package types + +type NodeGroup struct { + MasterNode string + AutoScalerNode string + ActivatorNode string + LoaderNode string + WorkerNodes []string +}