From 0baf71c923b228c937358bc58e6be3f174cead7e Mon Sep 17 00:00:00 2001 From: Kway Yi Shen Date: Tue, 26 Nov 2024 05:12:45 -0700 Subject: [PATCH] Added logic for creating and managing DAG invocations Signed-off-by: Kway Yi Shen --- .github/configs/wordlist.txt | 3 + cmd/config_knative_trace.json | 5 +- cmd/loader.go | 3 +- data/traces/example/dag_structure.csv | 6 + data/traces/reference/.gitattributes | 1 + data/traces/reference/dag_structure.csv | 3 + docs/configuration.md | 9 +- docs/loader.md | 48 ++++- pkg/common/trace_types.go | 9 + pkg/common/utilities.go | 12 ++ pkg/config/parser.go | 3 + pkg/config/parser_test.go | 6 +- pkg/config/test_config.json | 5 +- pkg/config/test_config_aws.json | 7 +- pkg/driver/trace_driver.go | 101 ++++----- pkg/driver/trace_driver_test.go | 50 +++-- pkg/generator/dag_generation.go | 263 ++++++++++++++++++++++++ pkg/generator/dag_generation_test.go | 137 ++++++++++++ 18 files changed, 601 insertions(+), 70 deletions(-) create mode 100644 data/traces/example/dag_structure.csv create mode 100644 data/traces/reference/dag_structure.csv create mode 100644 pkg/generator/dag_generation.go create mode 100644 pkg/generator/dag_generation_test.go diff --git a/.github/configs/wordlist.txt b/.github/configs/wordlist.txt index 55c9e748d..3c403ef0f 100644 --- a/.github/configs/wordlist.txt +++ b/.github/configs/wordlist.txt @@ -23,6 +23,7 @@ workerNodeNum durations dur ACM +Acyclic addr adservice AdService @@ -153,6 +154,7 @@ Daglis DAGMode datacenter Datacenter +DAGs dataflows dataset david @@ -710,6 +712,7 @@ cgroups noop YAMLs cgo +EnableDAGDataset EnableMetricsScrapping EnableZipkinTracing EndpointPort diff --git a/cmd/config_knative_trace.json b/cmd/config_knative_trace.json index 661640fc4..515169509 100644 --- a/cmd/config_knative_trace.json +++ b/cmd/config_knative_trace.json @@ -24,5 +24,8 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false + "DAGMode": false, + "EnableDAGDataset": true, + "Width": 2, + "Depth": 2 } diff --git a/cmd/loader.go b/cmd/loader.go index e8c18e1a0..1dc95e642 100644 --- a/cmd/loader.go +++ b/cmd/loader.go @@ -32,11 +32,12 @@ import ( "strings" "time" + "golang.org/x/exp/slices" + "github.com/vhive-serverless/loader/pkg/common" "github.com/vhive-serverless/loader/pkg/config" "github.com/vhive-serverless/loader/pkg/driver" "github.com/vhive-serverless/loader/pkg/trace" - "golang.org/x/exp/slices" log "github.com/sirupsen/logrus" tracer "github.com/vhive-serverless/vSwarm/utils/tracing/go" diff --git a/data/traces/example/dag_structure.csv b/data/traces/example/dag_structure.csv new file mode 100644 index 000000000..55287135c --- /dev/null +++ b/data/traces/example/dag_structure.csv @@ -0,0 +1,6 @@ +Width,WidthPercentile,Depth,DepthPercentile,TotalNodes,TotalNodes +1,0.00%,1,0.00%,2,0.00% +1,78.66%,1,12.51%,2,55.98% +2,92.13%,2,67.96%,3,79.20% +3,95.24%,3,86.84%,4,86.63% +4,100.00%,4,100.00%,5,100.00% diff --git a/data/traces/reference/.gitattributes b/data/traces/reference/.gitattributes index f087b429e..75a4a2e5d 100644 --- a/data/traces/reference/.gitattributes +++ b/data/traces/reference/.gitattributes @@ -1 +1,2 @@ *.tar.gz filter=lfs diff=lfs merge=lfs -text +dag_structure.csv filter=lfs diff=lfs merge=lfs -text diff --git a/data/traces/reference/dag_structure.csv b/data/traces/reference/dag_structure.csv new file mode 100644 index 000000000..4d6bdf996 --- /dev/null +++ b/data/traces/reference/dag_structure.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4454eacfd79dffef7be37728c442d3f5cdfa183bd219feef8a9036f782806178 +size 76204 diff --git a/docs/configuration.md b/docs/configuration.md index 7382484a8..ad1b3c5ff 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -33,7 +33,10 @@ | MetricScrapingPeriodSeconds | int | > 0 | 15 | Period of Prometheus metrics scrapping | | GRPCConnectionTimeoutSeconds | int | > 0 | 60 | Timeout for establishing a gRPC connection | | GRPCFunctionTimeoutSeconds | int | > 0 | 90 | Maximum time given to function to execute[^5] | -| DAGMode | bool | true/false | false | Sequential invocation of all functions one after another | +| DAGMode | bool | true/false | false | Generates DAG workflows iteratively with functions in TracePath [^8]. Frequency and IAT of the DAG follows their respective entry function, while Duration and Memory of each function will follow their respective values in TracePath. | +| EnableDAGDataset | bool | true/false | true | Generate width and depth from dag_structure.csv in TracePath[^9] | +| Width | int | > 0 | 2 | Default width of DAG | +| Depth | int | > 0 | 2 | Default depth of DAG | [^1]: To run RPS experiments add suffix `-RPS`. @@ -57,6 +60,10 @@ Lambda; https://aws.amazon.com/about-aws/whats-new/2018/10/aws-lambda-supports-f [^7] It is recommended that the first 10% of cold starts are discarded from the experiment results for low cold start RPS. +[^8]: The generated DAGs consist of unique functions. The shape of each DAG is determined either ```Width,Depth``` or calculated based on ```EnableDAGDAtaset```. + +[^9]: A [data sample](https://github.com/icanforce/Orion-OSDI22/blob/main/Public_Dataset/dag_structure.xlsx) of DAG structures has been created based on past Microsoft Azure traces. Width and Depth are determined based on probabilities of this sample. + --- InVitro can cause failure on cluster manager components. To do so, please configure the `cmd/failure.json`. Make sure diff --git a/docs/loader.md b/docs/loader.md index a5330238f..4ee1bf9ad 100644 --- a/docs/loader.md +++ b/docs/loader.md @@ -179,6 +179,53 @@ For more options, please see the `Makefile`. For instructions on how to use the loader with OpenWhisk go to `openwhisk_setup/README.md`. +## Workflow Invocation +Generation of a Directed Acyclic Graph (DAG) workflow is supported by setting `"DAGMode: true"` in `cmd/config_knative_trace.json` (as specified in [`docs/configuration.md`](../docs/configuration.md)). + +Before invocation, DAGs will be iteratively generated based on the parameters: `width`,`depth`,`EnableDAGDataset`, until the remaining functions are insufficient to maintain the desired DAG structure. The remaining functions will be unused for the rest of the experiment. + +An example of the generated workflow can be seen here: + +```bash +Functions available: 20 +Width: 3 +Depth: 4 +EnableDAGDataset: false + +DAG 1: f(0) -> f(1) -> f(3) -> f(5) + \ + \ -> f(2) -> f(4) -> f(6) + \ + \ -> f(7) + +DAG 2: f(8) -> f(9) -> f(12) -> f(15) + \ + \ -> f(10) -> f(13) -> f(16) + \ + \-> f(11) -> f(14) -> f(17) +``` +In the example, a single invocation of DAG 1 will result in 8 total functions invoked, with parallel invocations per branch. Invocation Frequency and IAT of DAGs 1 and 2 will depend on entry functions f(0) and f(8) respectively. + +To obtain [reference traces](https://github.com/vhive-serverless/invitro/blob/main/docs/sampler.md#reference-traces) for DAG execution, use the following command: +```bash +git lfs pull +tar -xzf data/traces/reference/sampled_150.tar.gz -C data/traces +``` +Microsoft has publicly released Microsoft Azure traces of function invocations from 10/18/2021 to 10/31/2021. From this trace, a [data sample](https://github.com/icanforce/Orion-OSDI22/blob/main/Public_Dataset) of DAG structures, representing the cumulative distribution of width and depth of DAGs during that period, was generated. Probabilities were applied to the data to derive the shape of the DAGs. The file `data/traces/example/dag_structure.csv` provides a simplified sample of the publicly released traces. + +By default, the shape of the DAGs are automatically calculated at every iteration using the above mentioned cumulative distribution. +To manually set the shape of the DAGs, change the following parameters in `cmd/config_knative_trace.json`. Note that the number of functions in `TracePath` must be large enough to support the maximum size of the DAG. This ensures all DAGs generated will have the same width and depth. +```bash +"EnableDAGDataset": false, +"Width": , +"Depth": +``` + +Lastly, start the experiment. This invokes all the generated DAGs with their respective frequencies. +```bash +go run cmd/loader.go --config cmd/config_knative_trace.json +``` + ## Running on Cloud Using Serverless Framework **Currently supported vendors:** AWS @@ -204,7 +251,6 @@ For instructions on how to use the loader with OpenWhisk go to `openwhisk_setup/ ```bash go run cmd/loader.go --config cmd/config_knative_trace.json ``` - --- Note: - Current deployment is via container image. diff --git a/pkg/common/trace_types.go b/pkg/common/trace_types.go index 1faedd9b2..86d65e61c 100644 --- a/pkg/common/trace_types.go +++ b/pkg/common/trace_types.go @@ -24,6 +24,8 @@ package common +import "container/list" + type FunctionInvocationStats struct { HashOwner string HashApp string @@ -103,3 +105,10 @@ type Function struct { Specification *FunctionSpecification } + +type Node struct { + Function *Function + Branches []*list.List + Depth int + DAG string +} diff --git a/pkg/common/utilities.go b/pkg/common/utilities.go index 94f6f3105..c836fa833 100644 --- a/pkg/common/utilities.go +++ b/pkg/common/utilities.go @@ -135,3 +135,15 @@ func SumNumberOfInvocations(withWarmup bool, totalDuration int, functions []*Fun return result } + +func GetName(function *Function) int { + parts := strings.Split(function.Name, "-") + if parts[0] == "test" { + return 0 + } + functionId, err := strconv.Atoi(parts[2]) + if err != nil { + log.Fatal(err) + } + return functionId +} \ No newline at end of file diff --git a/pkg/config/parser.go b/pkg/config/parser.go index c6015d671..060849bea 100644 --- a/pkg/config/parser.go +++ b/pkg/config/parser.go @@ -80,6 +80,9 @@ type LoaderConfiguration struct { GRPCConnectionTimeoutSeconds int `json:"GRPCConnectionTimeoutSeconds"` GRPCFunctionTimeoutSeconds int `json:"GRPCFunctionTimeoutSeconds"` DAGMode bool `json:"DAGMode"` + EnableDAGDataset bool `json:"EnableDAGDataset"` + Width int `json:"Width"` + Depth int `json:"Depth"` } func ReadConfigurationFile(path string) LoaderConfiguration { diff --git a/pkg/config/parser_test.go b/pkg/config/parser_test.go index a4abb4058..bab6d8efc 100644 --- a/pkg/config/parser_test.go +++ b/pkg/config/parser_test.go @@ -61,7 +61,11 @@ func TestConfigParser(t *testing.T) { config.MetricScrapingPeriodSeconds != 15 || config.AutoscalingMetric != "concurrency" || config.GRPCConnectionTimeoutSeconds != 15 || - config.GRPCFunctionTimeoutSeconds != 900 { + config.GRPCFunctionTimeoutSeconds != 900 || + config.DAGMode != false || + config.EnableDAGDataset != true || + config.Width != 2 || + config.Depth != 2 { t.Error("Unexpected configuration read.") } diff --git a/pkg/config/test_config.json b/pkg/config/test_config.json index 9e6a65e30..d53ed4562 100644 --- a/pkg/config/test_config.json +++ b/pkg/config/test_config.json @@ -22,5 +22,8 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false + "DAGMode": false, + "EnableDAGDataset": true, + "Width": 2, + "Depth": 2 } diff --git a/pkg/config/test_config_aws.json b/pkg/config/test_config_aws.json index d1fa17f8b..30980e89b 100644 --- a/pkg/config/test_config_aws.json +++ b/pkg/config/test_config_aws.json @@ -21,5 +21,8 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false -} + "DAGMode": false, + "EnableDAGDataset": true, + "Width": 2, + "Depth": 2 +} \ No newline at end of file diff --git a/pkg/driver/trace_driver.go b/pkg/driver/trace_driver.go index 81ce8ef6a..f8129c7c0 100644 --- a/pkg/driver/trace_driver.go +++ b/pkg/driver/trace_driver.go @@ -28,16 +28,17 @@ import ( "container/list" "encoding/json" "fmt" - "github.com/vhive-serverless/loader/pkg/config" - "github.com/vhive-serverless/loader/pkg/driver/clients" - "github.com/vhive-serverless/loader/pkg/driver/deployment" - "github.com/vhive-serverless/loader/pkg/driver/failure" "os" "strconv" "sync" "sync/atomic" "time" + "github.com/vhive-serverless/loader/pkg/config" + "github.com/vhive-serverless/loader/pkg/driver/clients" + "github.com/vhive-serverless/loader/pkg/driver/deployment" + "github.com/vhive-serverless/loader/pkg/driver/failure" + log "github.com/sirupsen/logrus" "github.com/vhive-serverless/loader/pkg/common" "github.com/vhive-serverless/loader/pkg/generator" @@ -77,15 +78,6 @@ func (d *Driver) outputFilename(name string) string { return fmt.Sprintf("%s_%s_%d.csv", d.Configuration.LoaderConfiguration.OutputPathPrefix, name, d.Configuration.TraceDuration) } -func DAGCreation(functions []*common.Function) *list.List { - linkedList := list.New() - // Assigning nodes one after another - for _, function := range functions { - linkedList.PushBack(function) - } - return linkedList -} - ///////////////////////////////////////// // DRIVER LOGIC ///////////////////////////////////////// @@ -97,9 +89,9 @@ type InvocationMetadata struct { InvocationID string IatIndex int - SuccessCount *int64 - FailedCount *int64 - + SuccessCount *int64 + FailedCount *int64 + FunctionsInvoked *int64 RecordOutputChannel chan *mc.ExecutionRecord AnnounceDoneWG *sync.WaitGroup AnnounceDoneExe *sync.WaitGroup @@ -127,13 +119,21 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { node := metadata.RootFunction.Front() var record *mc.ExecutionRecord var runtimeSpecifications *common.RuntimeSpecification + var branches []*list.List + var invocationRetries int for node != nil { - function := node.Value.(*common.Function) + function := node.Value.(*common.Node).Function runtimeSpecifications = &function.Specification.RuntimeSpecification[metadata.IatIndex] success, record = d.Invoker.Invoke(function, runtimeSpecifications) + if !success && (d.Configuration.LoaderConfiguration.DAGMode && invocationRetries == 0) { + log.Debugf("Invocation with for function %s with ID %s failed. Retrying Invocation", function.Name, metadata.InvocationID) + invocationRetries += 1 + continue + } record.Phase = int(metadata.Phase) + record.Instance = fmt.Sprintf("%s%s", node.Value.(*common.Node).DAG, record.Instance) record.InvocationID = metadata.InvocationID if !d.Configuration.LoaderConfiguration.AsyncMode || record.AsyncResponseID == "" { @@ -142,25 +142,30 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { record.TimeToSubmitMs = record.ResponseTime d.AsyncRecords.Enqueue(record) } - + atomic.AddInt64(metadata.FunctionsInvoked, 1) if !success { log.Errorf("Invocation with for function %s with ID %s failed.", function.Name, metadata.InvocationID) + atomic.AddInt64(metadata.FailedCount, 1) break } + atomic.AddInt64(metadata.SuccessCount, 1) + branches = node.Value.(*common.Node).Branches + for i := 0; i < len(branches); i++ { + newMetadataValue := *metadata + newMetadata := &newMetadataValue + newMetadata.RootFunction = branches[i] + newMetadata.AnnounceDoneWG.Add(1) + go d.invokeFunction(newMetadata) + } node = node.Next() } - if success { - atomic.AddInt64(metadata.SuccessCount, 1) - } else { - atomic.AddInt64(metadata.FailedCount, 1) - } } -func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.WaitGroup, addInvocationsToGroup *sync.WaitGroup, totalSuccessful *int64, totalFailed *int64, totalIssued *int64, recordOutputChannel chan *mc.ExecutionRecord) { +func (d *Driver) functionsDriver(functionLinkedList *list.List, announceFunctionDone *sync.WaitGroup, addInvocationsToGroup *sync.WaitGroup, totalSuccessful *int64, totalFailed *int64, totalIssued *int64, recordOutputChannel chan *mc.ExecutionRecord) { defer announceFunctionDone.Done() - function := list.Front().Value.(*common.Function) + function := functionLinkedList.Front().Value.(*common.Node).Function invocationCount := len(function.Specification.IAT) addInvocationsToGroup.Add(invocationCount) @@ -179,6 +184,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai var successfulInvocations int64 var failedInvocations int64 + var functionsInvoked int64 var currentPhase = common.ExecutionPhase waitForInvocations := sync.WaitGroup{} @@ -208,14 +214,14 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai if !d.Configuration.TestMode { waitForInvocations.Add(1) - go d.invokeFunction(&InvocationMetadata{ - RootFunction: list, + RootFunction: functionLinkedList, Phase: currentPhase, InvocationID: composeInvocationID(d.Configuration.TraceGranularity, minuteIndex, invocationSinceTheBeginningOfMinute), IatIndex: iatIndex, SuccessCount: &successfulInvocations, FailedCount: &failedInvocations, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: recordOutputChannel, AnnounceDoneWG: &waitForInvocations, AnnounceDoneExe: addInvocationsToGroup, @@ -232,7 +238,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai StartTime: time.Now().UnixNano(), }, } - + functionsInvoked++ successfulInvocations++ } @@ -254,7 +260,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai atomic.AddInt64(totalSuccessful, successfulInvocations) atomic.AddInt64(totalFailed, failedInvocations) - atomic.AddInt64(totalIssued, int64(iatIndex)) + atomic.AddInt64(totalIssued, int64(functionsInvoked)) } func (d *Driver) announceWarmupEnd(minuteIndex int, currentPhase *common.ExperimentPhase) { @@ -356,7 +362,6 @@ func (d *Driver) internalRun() { var successfulInvocations int64 var failedInvocations int64 var invocationsIssued int64 - var functionsPerDAG int64 allFunctionsInvoked := sync.WaitGroup{} allIndividualDriversCompleted := sync.WaitGroup{} @@ -367,28 +372,29 @@ func (d *Driver) internalRun() { backgroundProcessesInitializationBarrier.Wait() if d.Configuration.LoaderConfiguration.DAGMode { + functions := d.Configuration.Functions + dagLists := generator.GenerateDAGs(d.Configuration.LoaderConfiguration, functions, false) log.Infof("Starting DAG invocation driver\n") - functionLinkedList := DAGCreation(d.Configuration.Functions) - functionsPerDAG = int64(len(d.Configuration.Functions)) - allIndividualDriversCompleted.Add(1) - go d.functionsDriver( - functionLinkedList, - &allIndividualDriversCompleted, - &allFunctionsInvoked, - &successfulInvocations, - &failedInvocations, - &invocationsIssued, - globalMetricsCollector, - ) + for i := range len(dagLists) { + allIndividualDriversCompleted.Add(1) + go d.functionsDriver( + dagLists[i], + &allIndividualDriversCompleted, + &allFunctionsInvoked, + &successfulInvocations, + &failedInvocations, + &invocationsIssued, + globalMetricsCollector, + ) + } } else { log.Infof("Starting function invocation driver\n") - functionsPerDAG = 1 for _, function := range d.Configuration.Functions { allIndividualDriversCompleted.Add(1) - linkedList := list.New() - linkedList.PushBack(function) + functionLinkedList := list.New() + functionLinkedList.PushBack(&common.Node{Function: function, Depth: 0}) go d.functionsDriver( - linkedList, + functionLinkedList, &allIndividualDriversCompleted, &allFunctionsInvoked, &successfulInvocations, @@ -410,8 +416,7 @@ func (d *Driver) internalRun() { d.writeAsyncRecordsToLog(globalMetricsCollector) } - - totalIssuedChannel <- atomic.LoadInt64(&invocationsIssued) * functionsPerDAG + totalIssuedChannel <- atomic.LoadInt64(&invocationsIssued) scraperFinishCh <- 0 // Ask the scraper to finish metrics collection allRecordsWritten.Wait() diff --git a/pkg/driver/trace_driver_test.go b/pkg/driver/trace_driver_test.go index 1e0195b31..f6c13becb 100644 --- a/pkg/driver/trace_driver_test.go +++ b/pkg/driver/trace_driver_test.go @@ -27,13 +27,14 @@ package driver import ( "container/list" "fmt" - "github.com/vhive-serverless/loader/pkg/config" "log" "os" "sync" "testing" "time" + "github.com/vhive-serverless/loader/pkg/config" + "github.com/gocarina/gocsv" "github.com/sirupsen/logrus" "github.com/vhive-serverless/loader/pkg/common" @@ -129,7 +130,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { announceDone := &sync.WaitGroup{} testDriver := createTestDriver([]int{1}) - + var functionsInvoked int64 if !test.forceFail { address, port := "localhost", test.port testDriver.Configuration.Functions[0].Endpoint = fmt.Sprintf("%s:%d", address, port) @@ -139,10 +140,10 @@ func TestInvokeFunctionFromDriver(t *testing.T) { // make sure that the gRPC server is running time.Sleep(2 * time.Second) } - + function := testDriver.Configuration.Functions[0] + node := &common.Node{Function: testDriver.Configuration.Functions[0]} list := list.New() - list.PushBack(testDriver.Configuration.Functions[0]) - function := list.Front().Value.(*common.Function) + list.PushBack(node) function.Specification.RuntimeSpecification = []common.RuntimeSpecification{{ Runtime: 1000, Memory: 128, @@ -154,6 +155,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { InvocationID: composeInvocationID(common.MinuteGranularity, 0, 0), SuccessCount: &successCount, FailedCount: &failureCount, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: invocationRecordOutputChannel, AnnounceDoneWG: announceDone, } @@ -163,11 +165,11 @@ func TestInvokeFunctionFromDriver(t *testing.T) { switch test.forceFail { case true: - if !(successCount == 0 && failureCount == 1) { + if !(successCount == 0 && failureCount == 1 && functionsInvoked == 1) { t.Error("The function somehow managed to execute.") } case false: - if !(successCount == 1 && failureCount == 0) { + if !(successCount == 1 && failureCount == 0 && functionsInvoked == 1) { t.Error("The function should not have failed.") } } @@ -187,12 +189,12 @@ func TestInvokeFunctionFromDriver(t *testing.T) { func TestDAGInvocation(t *testing.T) { var successCount int64 = 0 var failureCount int64 = 0 - var functionsToInvoke = 4 + var functionsToInvoke int = 3 + var functionsInvoked int64 invocationRecordOutputChannel := make(chan *metric.ExecutionRecord, functionsToInvoke) announceDone := &sync.WaitGroup{} testDriver := createTestDriver([]int{4}) - list := list.New() address, port := "localhost", 8085 function := testDriver.Configuration.Functions[0] function.Endpoint = fmt.Sprintf("%s:%d", address, port) @@ -202,27 +204,47 @@ func TestDAGInvocation(t *testing.T) { Runtime: 1000, Memory: 128, }} - for i := 0; i < functionsToInvoke; i++ { - function = testDriver.Configuration.Functions[0] - list.PushBack(function) + functionList := make([]*common.Function, 3) + for i := 0; i < len(functionList); i++ { + functionList[i] = function + } + originalBranch := []*list.List{ + func() *list.List { + l := list.New() + l.PushBack(&common.Node{Function: functionList[0], Depth: 0}) + l.PushBack(&common.Node{Function: functionList[1], Depth: 1}) + return l + }(), + } + + newBranch := []*list.List{ + func() *list.List { + l := list.New() + l.PushBack(&common.Node{Function: functionList[2], Depth: 1}) + return l + }(), } + rootFunction := originalBranch[0] + rootFunction.Front().Value.(*common.Node).Branches = newBranch time.Sleep(2 * time.Second) metadata := &InvocationMetadata{ - RootFunction: list, + RootFunction: rootFunction, Phase: common.ExecutionPhase, IatIndex: 0, InvocationID: composeInvocationID(common.MinuteGranularity, 0, 0), SuccessCount: &successCount, FailedCount: &failureCount, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: invocationRecordOutputChannel, AnnounceDoneWG: announceDone, } announceDone.Add(1) testDriver.invokeFunction(metadata) - if !(successCount == 1 && failureCount == 0) { // not 4 invocations, since a workflow is considered as 1 invocation + announceDone.Wait() + if !(successCount == 3 && failureCount == 0) { t.Error("Number of successful and failed invocations not as expected.") } for i := 0; i < functionsToInvoke; i++ { diff --git a/pkg/generator/dag_generation.go b/pkg/generator/dag_generation.go new file mode 100644 index 000000000..09e43deeb --- /dev/null +++ b/pkg/generator/dag_generation.go @@ -0,0 +1,263 @@ +package generator + +import ( + "container/list" + "encoding/csv" + "fmt" + "math/rand" + "os" + "strconv" + "strings" + "sync/atomic" + + log "github.com/sirupsen/logrus" + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/config" +) + +// Visual Representation for the DAG +func printDAG(DAGWorkflow *list.List) { + DAGNode := DAGWorkflow.Front() + nodeQueue := make([]*list.Element, 0) + nodeQueue = append(nodeQueue, DAGNode) + var printMessage string + var buffer string = "" + var dummyNode *list.Element + var startingNode bool = true + for len(nodeQueue) > 0 { + DAGNode = nodeQueue[0] + nodeQueue = nodeQueue[1:] + functionId := common.GetName(DAGNode.Value.(*common.Node).Function) + if startingNode { + printMessage = "|" + strconv.Itoa(functionId) + for i := 0; i < DAGNode.Value.(*common.Node).Depth; i++ { + buffer += " " + } + printMessage = buffer + printMessage + startingNode = false + } else { + printMessage = printMessage + " -> " + strconv.Itoa(functionId) + } + for i := 0; i < len(DAGNode.Value.(*common.Node).Branches); i++ { + nodeQueue = append(nodeQueue, dummyNode) + copy(nodeQueue[1:], nodeQueue) + nodeQueue[0] = DAGNode.Value.(*common.Node).Branches[i].Front() + } + if DAGNode.Next() == nil { + println(printMessage) + buffer = "" + if len(nodeQueue) > 0 { + startingNode = true + } else { + break + } + } else { + nodeQueue = append(nodeQueue, dummyNode) + copy(nodeQueue[1:], nodeQueue) + nodeQueue[0] = DAGNode.Next() + } + } +} + +// Read the Cumulative Distribution Frequency (CDF) of the widths and depths of a DAG +func generateCDF(file string) [][]float64 { + f, err := os.Open(file) + if err != nil { + log.Fatal(err) + } + defer f.Close() + csvReader := csv.NewReader(f) + records, err := csvReader.ReadAll() + if err != nil { + log.Fatal(err) + } + records = records[1:] + cdf := make([][]float64, len(records[0])) + for i := 0; i < len(records[0]); i++ { + cdf[i] = make([]float64, len(records)) + } + for i := 0; i < len(records[0]); i += 2 { + for j := 0; j < len(records); j++ { + cdfProb, _ := strconv.ParseFloat(strings.TrimSuffix(records[j][i+1], "%"), 64) + cdfValue, _ := strconv.ParseFloat(records[j][i], 64) + cdf[i+1][j] = cdfProb + cdf[i][j] = cdfValue + if cdfProb == 100.00 { + cdf[i] = cdf[i][:j+1] + cdf[i+1] = cdf[i+1][:j+1] + break + } + } + } + return cdf +} + +// Generate pseudo-random probabilities and compare it with the given CDF to obtain the depth and width of the DAG +func getDAGStats(cdf [][]float64, maxSize int, numberOfTries int) (int, int) { + var width, depth int + depthProb := rand.Float64() * 100 + widthProb := rand.Float64() * 100 + for i, value := range cdf[1] { + if value >= widthProb { + width = int(cdf[0][i]) + break + } + } + for i, value := range cdf[3] { + if value >= depthProb { + depth = int(cdf[2][i]) + break + } + } + // Re-run DAG Generation if size exceeds number of functions + if maxSize < width*(depth-1)+1 { + if numberOfTries == 10 { + return 1, maxSize + } + width, depth = getDAGStats(cdf, maxSize, numberOfTries+1) + } + return width, depth +} + +func GenerateDAGs(config *config.LoaderConfiguration, functions []*common.Function, test bool) []*list.List { + var width, depth int + var functionIndex int = 0 + var dagIdentity int = 0 + var functionLinkedList *list.List + totalDAGList := []*list.List{} + for { + if config.EnableDAGDataset { + DAGDistribution := generateCDF(fmt.Sprintf("%s/dag_structure.csv", config.TracePath)) + width, depth = getDAGStats(DAGDistribution, len(functions), 0) + } else { + // Sanity checking if max size of DAG exceeds number of functions available + width = config.Width + depth = config.Depth + if width < 1 || depth < 1 { + log.Fatalf("Invalid Width and Depth given") + } + } + if (len(functions) - functionIndex) < (depth-1)*width+1 { + log.Infof("DAGs created: %d, Total Functions used: %d, Functions Unused: %d", dagIdentity, functionIndex, len(functions)-functionIndex) + break + } + functionLinkedList, functionIndex = createDAGWorkflow(functions, functionIndex, width, depth, dagIdentity) + dagIdentity++ + if !test { + printDAG(functionLinkedList) + } + totalDAGList = append(totalDAGList, functionLinkedList) + } + return totalDAGList +} + +func createDAGWorkflow(functionList []*common.Function, functionID int, maxWidth int, maxDepth int, dagIdentity int) (*list.List, int) { + DAGList := list.New() + dagIdentifier := fmt.Sprintf("DAG %d,", dagIdentity) + var function *common.Function = functionList[functionID] + if maxDepth == 1 { + DAGList.PushBack(&common.Node{Function: function, Depth: 0, DAG: dagIdentifier}) + return DAGList, functionID + 1 + } + widthList := generateNodeDistribution(maxWidth, maxDepth) + // Implement a FIFO queue for nodes to assign functions and branches to each node. + nodeQueue := []*list.Element{} + for i := 0; i < len(widthList); i++ { + widthList[i] -= 1 + DAGList.PushBack(&common.Node{Depth: -1}) + } + DAGList.Front().Value = &common.Node{Function: function, Depth: 0, DAG: dagIdentifier} + functionID += 1 + nodeQueue = append(nodeQueue, DAGList.Front()) + for len(nodeQueue) > 0 { + listElement := nodeQueue[0] + nodeQueue = nodeQueue[1:] + node := listElement.Value.(*common.Node) + // Checks if the node has reached the maximum depth of the DAG (maxDepth -1) + if node.Depth == maxDepth-1 { + continue + } + child := &common.Node{Function: functionList[functionID], Depth: node.Depth + 1, DAG: dagIdentifier} + functionID += 1 + listElement.Next().Value = child + nodeQueue = append(nodeQueue, listElement.Next()) + // Creating parallel branches from the node, if width of next stage > width of current stage + var nodeList []*list.List + if widthList[node.Depth+1] > 0 { + nodeList, nodeQueue = addBranches(nodeQueue, widthList, node, functionList, functionID, dagIdentifier) + functionID += len(nodeList) + } else { + nodeList = []*list.List{} + } + node.Branches = nodeList + } + return DAGList, functionID +} + +func addBranches(nodeQueue []*list.Element, widthList []int, node *common.Node, functionList []*common.Function, functionID int, dagIdentifier string) ([]*list.List, []*list.Element) { + var additionalBranches int + if len(nodeQueue) < 1 || (nodeQueue[0].Value.(*common.Node).Depth > node.Depth) { + additionalBranches = widthList[node.Depth+1] + } else { + additionalBranches = rand.Intn(widthList[node.Depth+1] + 1) + } + for i := node.Depth + 1; i < len(widthList); i++ { + widthList[i] -= additionalBranches + } + nodeList := make([]*list.List, additionalBranches) + for i := 0; i < additionalBranches; i++ { + newBranch := createNewBranch(functionList, node, len(widthList), functionID, dagIdentifier) + functionID += 1 + nodeList[i] = newBranch + nodeQueue = append(nodeQueue, newBranch.Front()) + } + return nodeList, nodeQueue +} + +func createNewBranch(functionList []*common.Function, node *common.Node, maxDepth int, functionID int, dagIdentifier string) *list.List { + DAGBranch := list.New() + // Ensuring that each node is connected to a child until the maximum depth + for i := node.Depth + 1; i < maxDepth; i++ { + DAGBranch.PushBack(&common.Node{Depth: -1}) + } + child := &common.Node{Function: functionList[functionID], Depth: node.Depth + 1, DAG: dagIdentifier} + DAGBranch.Front().Value = child + return DAGBranch +} + +func generateNodeDistribution(maxWidth int, maxDepth int) []int { + // Generating the number of nodes per depth (stage). + widthList := []int{} + widthList = append(widthList, 1) + for i := 1; i < maxDepth-1; i++ { + widthList = append(widthList, (rand.Intn(maxWidth-widthList[i-1]+1) + widthList[i-1])) + } + widthList = append(widthList, maxWidth) + return widthList +} + +func GetMaxInvocation(functionList []*common.Function) []int { + maxInvocation := make([]int, len(functionList[0].InvocationStats.Invocations)) + for _, i := range functionList { + for index, invocation := range i.InvocationStats.Invocations { + maxInvocation[index] = max(maxInvocation[index], invocation) + } + } + return maxInvocation +} + +func GetDAGShape(functionLinkedList *list.List, width *int64, depth int) (int, int) { + nodeElement := functionLinkedList.Front() + for nodeElement != nil { + node := nodeElement.Value.(*common.Node) + if len(node.Branches) != 0 { + for j := 0; j < len(node.Branches); j++ { + atomic.AddInt64(width, 1) + GetDAGShape(node.Branches[j], width, depth) + } + } + nodeElement = nodeElement.Next() + depth += 1 + } + return int(*width), depth +} diff --git a/pkg/generator/dag_generation_test.go b/pkg/generator/dag_generation_test.go new file mode 100644 index 000000000..b6196f517 --- /dev/null +++ b/pkg/generator/dag_generation_test.go @@ -0,0 +1,137 @@ +/* + * MIT License + * + * Copyright (c) 2023 EASL and the vHive community + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package generator + +import ( + "fmt" + "testing" + + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/config" +) + +var fakeConfig *config.LoaderConfiguration = &config.LoaderConfiguration{ + Platform: "Knative", + InvokeProtocol: "grpc", + TracePath: "data/traces/example", + OutputPathPrefix: "test", + EnableZipkinTracing: true, + GRPCConnectionTimeoutSeconds: 5, + GRPCFunctionTimeoutSeconds: 15, + DAGMode: true, + EnableDAGDataset: false, + Width: 2, + Depth: 2, +} + +var functions []*common.Function = []*common.Function{ + { + Name: "test-function", + InvocationStats: &common.FunctionInvocationStats{ + Invocations: []int{ + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + }, + }, + RuntimeStats: &common.FunctionRuntimeStats{ + Average: 50, + Count: 100, + Minimum: 0, + Maximum: 100, + Percentile0: 0, + Percentile1: 1, + Percentile25: 25, + Percentile50: 50, + Percentile75: 75, + Percentile99: 99, + Percentile100: 100, + }, + MemoryStats: &common.FunctionMemoryStats{ + Average: 5000, + Count: 100, + Percentile1: 100, + Percentile5: 500, + Percentile25: 2500, + Percentile50: 5000, + Percentile75: 7500, + Percentile95: 9500, + Percentile99: 9900, + Percentile100: 10000, + }, + Specification: &common.FunctionSpecification{ + PerMinuteCount: []int{1}, + }, + }, +} + +func TestGenerateSingleDAG(t *testing.T) { + var functionList []*common.Function = make([]*common.Function, 3) + for i := 0; i < len(functionList); i++ { + functionList[i] = functions[0] + } + dagList := GenerateDAGs(fakeConfig, functionList, true)[0] + branch := dagList.Front().Value.(*common.Node).Branches + if dagList.Len() != 2 && len(branch) != 1 { + t.Error("Invalid DAG Generated") + } +} + +func TestGenerateMultipleDAGs(t *testing.T) { + var functionList []*common.Function = make([]*common.Function, 200) + var initialWidth int64 + for i := 0; i < len(functionList); i++ { + functionList[i] = functions[0] + } + fakeConfig.Width = 10 + fakeConfig.Depth = 5 + dagList := GenerateDAGs(fakeConfig, functionList, true) + if len(dagList) < 2 { + t.Error("Failed to create Multiple DAGs") + } + for i := 0; i < len(dagList); i++ { + initialWidth = 1 + width, depth := GetDAGShape(dagList[i], &initialWidth, 0) + if width != fakeConfig.Width || depth != fakeConfig.Depth { + errorMsg := fmt.Sprintf("Invalid DAG Shape: Expected Width = 10, Depth = 5. Got Width = %d, Depth = %d", width, depth) + t.Error(errorMsg) + } + } +} + +func TestGenerateDAGByDataset(t *testing.T) { + var functionList []*common.Function = make([]*common.Function, 10) + for i := 0; i < len(functionList); i++ { + functionList[i] = functions[0] + } + fakeConfig.EnableDAGDataset = true + fakeConfig.TracePath = fmt.Sprintf("../../%s", fakeConfig.TracePath) + + dagList := GenerateDAGs(fakeConfig, functionList, true) + if len(dagList) == 0 { + t.Error("Unable to generate DAGs by Dataset") + } +}