From eb570ac0c0d653e4f9eeccc6935e9a3331375e5d Mon Sep 17 00:00:00 2001 From: Kway Yi Shen Date: Wed, 22 May 2024 09:17:44 -0600 Subject: [PATCH] Adding logic for creating and managing DAG invocations Signed-off-by: Kway Yi Shen --- .github/configs/wordlist.txt | 3 + cmd/config_knative_trace.json | 5 +- cmd/loader.go | 3 +- data/traces/example/dag_structure.csv | 6 + data/traces/reference/.gitattributes | 1 + data/traces/reference/dag_structure.csv | 3 + docs/configuration.md | 9 +- docs/loader.md | 48 ++++- pkg/common/trace_types.go | 9 + pkg/common/utilities.go | 12 ++ pkg/config/parser.go | 4 + pkg/config/parser_test.go | 7 +- pkg/config/test_config.json | 6 +- pkg/config/test_config_aws.json | 8 +- pkg/driver/trace_driver.go | 116 +++++++----- pkg/driver/trace_driver_test.go | 43 +++-- pkg/generator/dag_generation.go | 241 ++++++++++++++++++++++++ pkg/generator/dag_generation_test.go | 100 ++++++++++ 18 files changed, 560 insertions(+), 64 deletions(-) create mode 100644 data/traces/example/dag_structure.csv create mode 100644 data/traces/reference/dag_structure.csv create mode 100644 pkg/generator/dag_generation.go create mode 100644 pkg/generator/dag_generation_test.go diff --git a/.github/configs/wordlist.txt b/.github/configs/wordlist.txt index 0db0a3eae..8e8076abb 100644 --- a/.github/configs/wordlist.txt +++ b/.github/configs/wordlist.txt @@ -23,6 +23,7 @@ workerNodeNum durations dur ACM +Acyclic addr adservice AdService @@ -153,6 +154,7 @@ Daglis DAGMode datacenter Datacenter +DAGs dataflows dataset david @@ -710,6 +712,7 @@ cgroups noop YAMLs cgo +EnableDAGDataset EnableMetricsScrapping EnableZipkinTracing EndpointPort diff --git a/cmd/config_knative_trace.json b/cmd/config_knative_trace.json index 661640fc4..515169509 100644 --- a/cmd/config_knative_trace.json +++ b/cmd/config_knative_trace.json @@ -24,5 +24,8 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false + "DAGMode": false, + "EnableDAGDataset": true, + "Width": 2, + "Depth": 2 } diff --git a/cmd/loader.go b/cmd/loader.go index bb69b80c5..1b8bfa9b8 100644 --- a/cmd/loader.go +++ b/cmd/loader.go @@ -31,11 +31,12 @@ import ( "strings" "time" + "golang.org/x/exp/slices" + "github.com/vhive-serverless/loader/pkg/common" "github.com/vhive-serverless/loader/pkg/config" "github.com/vhive-serverless/loader/pkg/driver" "github.com/vhive-serverless/loader/pkg/trace" - "golang.org/x/exp/slices" log "github.com/sirupsen/logrus" tracer "github.com/vhive-serverless/vSwarm/utils/tracing/go" diff --git a/data/traces/example/dag_structure.csv b/data/traces/example/dag_structure.csv new file mode 100644 index 000000000..0b97432cf --- /dev/null +++ b/data/traces/example/dag_structure.csv @@ -0,0 +1,6 @@ +Width,Width - Percentile,Depth,Depth - Percentile,Total Nodes,Total Nodes +1,0.00%,1,0.00%,2,0.00% +1,78.66%,1,12.51%,2,55.98% +2,92.13%,2,67.96%,3,79.20% +3,95.24%,3,86.84%,4,86.63% +4,100.00%,4,100.00%,5,100.00% diff --git a/data/traces/reference/.gitattributes b/data/traces/reference/.gitattributes index f087b429e..75a4a2e5d 100644 --- a/data/traces/reference/.gitattributes +++ b/data/traces/reference/.gitattributes @@ -1 +1,2 @@ *.tar.gz filter=lfs diff=lfs merge=lfs -text +dag_structure.csv filter=lfs diff=lfs merge=lfs -text diff --git a/data/traces/reference/dag_structure.csv b/data/traces/reference/dag_structure.csv new file mode 100644 index 000000000..ae5935843 --- /dev/null +++ b/data/traces/reference/dag_structure.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:664d4d4a1710c292fdf468bc3872653c39f21c97a89e73af5488c4a34c17eb10 +size 76212 diff --git a/docs/configuration.md b/docs/configuration.md index ceb9f31fb..bbd339164 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -30,7 +30,10 @@ | MetricScrapingPeriodSeconds | int | > 0 | 15 | Period of Prometheus metrics scrapping | | GRPCConnectionTimeoutSeconds | int | > 0 | 60 | Timeout for establishing a gRPC connection | | GRPCFunctionTimeoutSeconds | int | > 0 | 90 | Maximum time given to function to execute[^5] | -| DAGMode | bool | true/false | false | Sequential invocation of all functions one after another | +| DAGMode | bool | true/false | false | Generates DAG workflows iteratively with functions in TracePath [^6]. Frequency and IAT of the DAG follows their respective entry function, while Duration and Memory of each function will follow their respective values in TracePath. | +| EnableDAGDataset | bool | true/false | true | Generate width and depth from data/traces/example/dag_structure.csv[^7] | +| Width | int | > 0 | 2 | Default width of DAG | +| Depth | int | > 0 | 2 | Default depth of DAG | [^1]: To run RPS experiments add suffix `-RPS`. @@ -50,6 +53,10 @@ this [table](https://cloud.google.com/functions/pricing#compute_time) for Google [^5]: Function can execute for at most 15 minutes as in AWS Lambda; https://aws.amazon.com/about-aws/whats-new/2018/10/aws-lambda-supports-functions-that-can-run-up-to-15-minutes/ +[^6]: The generated DAGs consist of unique functions. The shape of each DAG is determined either ```Width,Depth``` or calculated based on ```EnableDAGDAtaset```. + +[^7]: A [data sample](https://github.com/icanforce/Orion-OSDI22/blob/main/Public_Dataset/dag_structure.xlsx) of DAG structures has been created based on past Microsoft Azure traces. Width and Depth are determined based on probabilities of this sample. + --- InVitro can cause failure on cluster manager components. To do so, please configure the `cmd/failure.json`. Make sure diff --git a/docs/loader.md b/docs/loader.md index a5330238f..4ee1bf9ad 100644 --- a/docs/loader.md +++ b/docs/loader.md @@ -179,6 +179,53 @@ For more options, please see the `Makefile`. For instructions on how to use the loader with OpenWhisk go to `openwhisk_setup/README.md`. +## Workflow Invocation +Generation of a Directed Acyclic Graph (DAG) workflow is supported by setting `"DAGMode: true"` in `cmd/config_knative_trace.json` (as specified in [`docs/configuration.md`](../docs/configuration.md)). + +Before invocation, DAGs will be iteratively generated based on the parameters: `width`,`depth`,`EnableDAGDataset`, until the remaining functions are insufficient to maintain the desired DAG structure. The remaining functions will be unused for the rest of the experiment. + +An example of the generated workflow can be seen here: + +```bash +Functions available: 20 +Width: 3 +Depth: 4 +EnableDAGDataset: false + +DAG 1: f(0) -> f(1) -> f(3) -> f(5) + \ + \ -> f(2) -> f(4) -> f(6) + \ + \ -> f(7) + +DAG 2: f(8) -> f(9) -> f(12) -> f(15) + \ + \ -> f(10) -> f(13) -> f(16) + \ + \-> f(11) -> f(14) -> f(17) +``` +In the example, a single invocation of DAG 1 will result in 8 total functions invoked, with parallel invocations per branch. Invocation Frequency and IAT of DAGs 1 and 2 will depend on entry functions f(0) and f(8) respectively. + +To obtain [reference traces](https://github.com/vhive-serverless/invitro/blob/main/docs/sampler.md#reference-traces) for DAG execution, use the following command: +```bash +git lfs pull +tar -xzf data/traces/reference/sampled_150.tar.gz -C data/traces +``` +Microsoft has publicly released Microsoft Azure traces of function invocations from 10/18/2021 to 10/31/2021. From this trace, a [data sample](https://github.com/icanforce/Orion-OSDI22/blob/main/Public_Dataset) of DAG structures, representing the cumulative distribution of width and depth of DAGs during that period, was generated. Probabilities were applied to the data to derive the shape of the DAGs. The file `data/traces/example/dag_structure.csv` provides a simplified sample of the publicly released traces. + +By default, the shape of the DAGs are automatically calculated at every iteration using the above mentioned cumulative distribution. +To manually set the shape of the DAGs, change the following parameters in `cmd/config_knative_trace.json`. Note that the number of functions in `TracePath` must be large enough to support the maximum size of the DAG. This ensures all DAGs generated will have the same width and depth. +```bash +"EnableDAGDataset": false, +"Width": , +"Depth": +``` + +Lastly, start the experiment. This invokes all the generated DAGs with their respective frequencies. +```bash +go run cmd/loader.go --config cmd/config_knative_trace.json +``` + ## Running on Cloud Using Serverless Framework **Currently supported vendors:** AWS @@ -204,7 +251,6 @@ For instructions on how to use the loader with OpenWhisk go to `openwhisk_setup/ ```bash go run cmd/loader.go --config cmd/config_knative_trace.json ``` - --- Note: - Current deployment is via container image. diff --git a/pkg/common/trace_types.go b/pkg/common/trace_types.go index 1faedd9b2..86d65e61c 100644 --- a/pkg/common/trace_types.go +++ b/pkg/common/trace_types.go @@ -24,6 +24,8 @@ package common +import "container/list" + type FunctionInvocationStats struct { HashOwner string HashApp string @@ -103,3 +105,10 @@ type Function struct { Specification *FunctionSpecification } + +type Node struct { + Function *Function + Branches []*list.List + Depth int + DAG string +} diff --git a/pkg/common/utilities.go b/pkg/common/utilities.go index 94f6f3105..c836fa833 100644 --- a/pkg/common/utilities.go +++ b/pkg/common/utilities.go @@ -135,3 +135,15 @@ func SumNumberOfInvocations(withWarmup bool, totalDuration int, functions []*Fun return result } + +func GetName(function *Function) int { + parts := strings.Split(function.Name, "-") + if parts[0] == "test" { + return 0 + } + functionId, err := strconv.Atoi(parts[2]) + if err != nil { + log.Fatal(err) + } + return functionId +} \ No newline at end of file diff --git a/pkg/config/parser.go b/pkg/config/parser.go index c6015d671..26433ba68 100644 --- a/pkg/config/parser.go +++ b/pkg/config/parser.go @@ -80,6 +80,10 @@ type LoaderConfiguration struct { GRPCConnectionTimeoutSeconds int `json:"GRPCConnectionTimeoutSeconds"` GRPCFunctionTimeoutSeconds int `json:"GRPCFunctionTimeoutSeconds"` DAGMode bool `json:"DAGMode"` + EnableDAGDataset bool `json:"EnableDAGDataset"` + DAGEntryFunction int `json:"DAGEntryFunction"` + Width int `json:"Width"` + Depth int `json:"Depth"` } func ReadConfigurationFile(path string) LoaderConfiguration { diff --git a/pkg/config/parser_test.go b/pkg/config/parser_test.go index a4abb4058..b205aadcb 100644 --- a/pkg/config/parser_test.go +++ b/pkg/config/parser_test.go @@ -61,7 +61,12 @@ func TestConfigParser(t *testing.T) { config.MetricScrapingPeriodSeconds != 15 || config.AutoscalingMetric != "concurrency" || config.GRPCConnectionTimeoutSeconds != 15 || - config.GRPCFunctionTimeoutSeconds != 900 { + config.GRPCFunctionTimeoutSeconds != 900 || + config.DAGMode != false || + config.EnableDAGDataset != true || + config.DAGEntryFunction != 0 || + config.Width != 2 || + config.Depth != 2 { t.Error("Unexpected configuration read.") } diff --git a/pkg/config/test_config.json b/pkg/config/test_config.json index 9e6a65e30..e7ffcbff4 100644 --- a/pkg/config/test_config.json +++ b/pkg/config/test_config.json @@ -22,5 +22,9 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false + "DAGMode": false, + "EnableDAGDataset": true, + "DAGEntryFunction": 0, + "Width": 2, + "Depth": 2 } diff --git a/pkg/config/test_config_aws.json b/pkg/config/test_config_aws.json index d1fa17f8b..5abd1dbcd 100644 --- a/pkg/config/test_config_aws.json +++ b/pkg/config/test_config_aws.json @@ -21,5 +21,9 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false -} + "DAGMode": false, + "EnableDAGDataset": true, + "DAGEntryFunction": 0, + "Width": 2, + "Depth": 2 +} \ No newline at end of file diff --git a/pkg/driver/trace_driver.go b/pkg/driver/trace_driver.go index fe83fd42f..bdf951d08 100644 --- a/pkg/driver/trace_driver.go +++ b/pkg/driver/trace_driver.go @@ -29,10 +29,6 @@ import ( "encoding/csv" "encoding/json" "fmt" - "github.com/vhive-serverless/loader/pkg/config" - "github.com/vhive-serverless/loader/pkg/driver/clients" - "github.com/vhive-serverless/loader/pkg/driver/deployment" - "github.com/vhive-serverless/loader/pkg/driver/failure" "math" "os" "strconv" @@ -40,6 +36,11 @@ import ( "sync/atomic" "time" + "github.com/vhive-serverless/loader/pkg/config" + "github.com/vhive-serverless/loader/pkg/driver/clients" + "github.com/vhive-serverless/loader/pkg/driver/deployment" + "github.com/vhive-serverless/loader/pkg/driver/failure" + "github.com/gocarina/gocsv" log "github.com/sirupsen/logrus" "github.com/vhive-serverless/loader/pkg/common" @@ -93,15 +94,6 @@ func (d *Driver) runCSVWriter(records chan interface{}, filename string, writerD writerDone.Done() } -func DAGCreation(functions []*common.Function) *list.List { - linkedList := list.New() - // Assigning nodes one after another - for _, function := range functions { - linkedList.PushBack(function) - } - return linkedList -} - ///////////////////////////////////////// // METRICS SCRAPPERS ///////////////////////////////////////// @@ -178,6 +170,7 @@ type InvocationMetadata struct { SuccessCount *int64 FailedCount *int64 FailedCountByMinute []int64 + FunctionsInvoked *int64 RecordOutputChannel chan interface{} AnnounceDoneWG *sync.WaitGroup @@ -207,22 +200,40 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { node := metadata.RootFunction.Front() var record *mc.ExecutionRecord var runtimeSpecifications *common.RuntimeSpecification + var branches []*list.List + var invocationRetries int + var numberOfFunctionsInvoked int64 for node != nil { - function := node.Value.(*common.Function) + function := node.Value.(*common.Node).Function runtimeSpecifications = &function.Specification.RuntimeSpecification[metadata.MinuteIndex][metadata.InvocationIndex] - success, record = d.Invoker.Invoke(function, runtimeSpecifications) + if !success && (d.Configuration.LoaderConfiguration.DAGMode && invocationRetries == 0) { + log.Debugf("Invocation failed at minute: %d for %s. Retrying Invocation", metadata.MinuteIndex, function.Name) + invocationRetries += 1 + continue + } record.Phase = int(metadata.Phase) + record.Instance = fmt.Sprintf("%s%s", node.Value.(*common.Node).DAG, record.Instance) record.InvocationID = composeInvocationID(d.Configuration.TraceGranularity, metadata.MinuteIndex, metadata.InvocationIndex) metadata.RecordOutputChannel <- record - + numberOfFunctionsInvoked += 1 if !success { log.Debugf("Invocation failed at minute: %d for %s", metadata.MinuteIndex, function.Name) break } + branches = node.Value.(*common.Node).Branches + for i := 0; i < len(branches); i++ { + newMetadataValue := *metadata + newMetadata := &newMetadataValue + newMetadata.RootFunction = branches[i] + newMetadata.AnnounceDoneWG.Add(1) + atomic.AddInt64(metadata.SuccessCount, -1) + go d.invokeFunction(newMetadata) + } node = node.Next() } + atomic.AddInt64(metadata.FunctionsInvoked, numberOfFunctionsInvoked) if success { atomic.AddInt64(metadata.SuccessCount, 1) } else { @@ -231,11 +242,11 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { } } -func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.WaitGroup, +func (d *Driver) functionsDriver(functionLinkedList *list.List, announceFunctionDone *sync.WaitGroup, addInvocationsToGroup *sync.WaitGroup, readOpenWhiskMetadata *sync.Mutex, totalSuccessful *int64, - totalFailed *int64, totalIssued *int64, recordOutputChannel chan interface{}) { + totalFailed *int64, totalIssued *int64, entriesWritten *int64, recordOutputChannel chan interface{}) { - function := list.Front().Value.(*common.Function) + function := functionLinkedList.Front().Value.(*common.Node).Function numberOfInvocations := 0 for i := 0; i < len(function.InvocationStats.Invocations); i++ { numberOfInvocations += function.InvocationStats.Invocations[i] @@ -251,6 +262,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai var failedInvocations int64 var failedInvocationByMinute = make([]int64, totalTraceDuration) var numberOfIssuedInvocations int64 + var functionsInvoked int64 var currentPhase = common.ExecutionPhase waitForInvocations := sync.WaitGroup{} @@ -310,13 +322,14 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai waitForInvocations.Add(1) go d.invokeFunction(&InvocationMetadata{ - RootFunction: list, + RootFunction: functionLinkedList, Phase: currentPhase, MinuteIndex: minuteIndex, InvocationIndex: invocationIndex, SuccessCount: &successfulInvocations, FailedCount: &failedInvocations, FailedCountByMinute: failedInvocationByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: recordOutputChannel, AnnounceDoneWG: &waitForInvocations, AnnounceDoneExe: addInvocationsToGroup, @@ -331,7 +344,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai InvocationID: composeInvocationID(d.Configuration.TraceGranularity, minuteIndex, invocationIndex), StartTime: time.Now().UnixNano(), } - + functionsInvoked++ successfulInvocations++ } numberOfIssuedInvocations++ @@ -347,6 +360,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai atomic.AddInt64(totalSuccessful, successfulInvocations) atomic.AddInt64(totalFailed, failedInvocations) atomic.AddInt64(totalIssued, numberOfIssuedInvocations) + atomic.AddInt64(entriesWritten, functionsInvoked) } func (d *Driver) proceedToNextMinute(function *common.Function, minuteIndex *int, invocationIndex *int, startOfMinute *time.Time, @@ -527,7 +541,7 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { var successfulInvocations int64 var failedInvocations int64 var invocationsIssued int64 - var functionsPerDAG int64 + var entriesWritten int64 readOpenWhiskMetadata := sync.Mutex{} allFunctionsInvoked := sync.WaitGroup{} allIndividualDriversCompleted := sync.WaitGroup{} @@ -538,19 +552,28 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { if !iatOnly { log.Info("Generating IAT and runtime specifications for all the functions") + maxInvocation := generator.GetMaxInvocation(d.Configuration.Functions) for i, function := range d.Configuration.Functions { - // Equalising all the InvocationStats to the first function - if d.Configuration.LoaderConfiguration.DAGMode { - function.InvocationStats.Invocations = d.Configuration.Functions[0].InvocationStats.Invocations - } spec := d.SpecificationGenerator.GenerateInvocationData( function, d.Configuration.IATDistribution, d.Configuration.ShiftIAT, d.Configuration.TraceGranularity, ) - d.Configuration.Functions[i].Specification = spec + // Overwriting the runtime specification to account for maximum possible invocations + if d.Configuration.LoaderConfiguration.DAGMode { + originalInvocation := function.InvocationStats.Invocations + function.InvocationStats.Invocations = maxInvocation + spec := d.SpecificationGenerator.GenerateInvocationData( + function, + d.Configuration.IATDistribution, + d.Configuration.ShiftIAT, + d.Configuration.TraceGranularity, + ) + function.InvocationStats.Invocations = originalInvocation + function.Specification.RuntimeSpecification = spec.RuntimeSpecification + } } } @@ -571,35 +594,38 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { } if d.Configuration.LoaderConfiguration.DAGMode { + functions := d.Configuration.Functions + dagLists := generator.GenerateDAGs(d.Configuration.LoaderConfiguration, functions) log.Infof("Starting DAG invocation driver\n") - functionLinkedList := DAGCreation(d.Configuration.Functions) - functionsPerDAG = int64(len(d.Configuration.Functions)) - allIndividualDriversCompleted.Add(1) - go d.functionsDriver( - functionLinkedList, - &allIndividualDriversCompleted, - &allFunctionsInvoked, - &readOpenWhiskMetadata, - &successfulInvocations, - &failedInvocations, - &invocationsIssued, - globalMetricsCollector, - ) + for i := range len(dagLists) { + allIndividualDriversCompleted.Add(1) + go d.functionsDriver( + dagLists[i], + &allIndividualDriversCompleted, + &allFunctionsInvoked, + &readOpenWhiskMetadata, + &successfulInvocations, + &failedInvocations, + &invocationsIssued, + &entriesWritten, + globalMetricsCollector, + ) + } } else { log.Infof("Starting function invocation driver\n") - functionsPerDAG = 1 for _, function := range d.Configuration.Functions { allIndividualDriversCompleted.Add(1) - linkedList := list.New() - linkedList.PushBack(function) + functionLinkedList := list.New() + functionLinkedList.PushBack(&common.Node{Function: function, Depth: 0}) go d.functionsDriver( - linkedList, + functionLinkedList, &allIndividualDriversCompleted, &allFunctionsInvoked, &readOpenWhiskMetadata, &successfulInvocations, &failedInvocations, &invocationsIssued, + &entriesWritten, globalMetricsCollector, ) } @@ -608,7 +634,7 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { if atomic.LoadInt64(&successfulInvocations)+atomic.LoadInt64(&failedInvocations) != 0 { log.Debugf("Waiting for all the invocations record to be written.\n") - totalIssuedChannel <- atomic.LoadInt64(&invocationsIssued) * functionsPerDAG + totalIssuedChannel <- atomic.LoadInt64(&entriesWritten) scraperFinishCh <- 0 // Ask the scraper to finish metrics collection allRecordsWritten.Wait() diff --git a/pkg/driver/trace_driver_test.go b/pkg/driver/trace_driver_test.go index 61d498080..4b11befae 100644 --- a/pkg/driver/trace_driver_test.go +++ b/pkg/driver/trace_driver_test.go @@ -135,7 +135,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { testDriver := createTestDriver() var failureCountByMinute = make([]int64, testDriver.Configuration.TraceDuration) - + var functionsInvoked int64 if !test.forceFail { address, port := "localhost", test.port testDriver.Configuration.Functions[0].Endpoint = fmt.Sprintf("%s:%d", address, port) @@ -145,10 +145,10 @@ func TestInvokeFunctionFromDriver(t *testing.T) { // make sure that the gRPC server is running time.Sleep(2 * time.Second) } - + function := testDriver.Configuration.Functions[0] + node := &common.Node{Function: testDriver.Configuration.Functions[0]} list := list.New() - list.PushBack(testDriver.Configuration.Functions[0]) - function := list.Front().Value.(*common.Function) + list.PushBack(node) for i := 0; i < len(function.Specification.RuntimeSpecification); i++ { function.Specification.RuntimeSpecification[i] = make([]common.RuntimeSpecification, 3) } @@ -164,6 +164,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { SuccessCount: &successCount, FailedCount: &failureCount, FailedCountByMinute: failureCountByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: invocationRecordOutputChannel, AnnounceDoneWG: announceDone, } @@ -196,13 +197,13 @@ func TestInvokeFunctionFromDriver(t *testing.T) { func TestDAGInvocation(t *testing.T) { var successCount int64 = 0 var failureCount int64 = 0 - var functionsToInvoke int = 4 + var functionsToInvoke int = 3 + var functionsInvoked int64 invocationRecordOutputChannel := make(chan interface{}, functionsToInvoke) announceDone := &sync.WaitGroup{} testDriver := createTestDriver() var failureCountByMinute = make([]int64, testDriver.Configuration.TraceDuration) - list := list.New() address, port := "localhost", 8085 function := testDriver.Configuration.Functions[0] function.Endpoint = fmt.Sprintf("%s:%d", address, port) @@ -215,28 +216,48 @@ func TestDAGInvocation(t *testing.T) { Runtime: 1000, Memory: 128, } - for i := 0; i < functionsToInvoke; i++ { - function = testDriver.Configuration.Functions[0] - list.PushBack(function) + functionList := make([]*common.Function, 3) + for i := 0; i < len(functionList); i++ { + functionList[i] = function + } + originalBranch := []*list.List{ + func() *list.List { + l := list.New() + l.PushBack(&common.Node{Function: functionList[0], Depth: 0}) + l.PushBack(&common.Node{Function: functionList[1], Depth: 1}) + return l + }(), + } + + newBranch := []*list.List{ + func() *list.List { + l := list.New() + l.PushBack(&common.Node{Function: functionList[2], Depth: 1}) + return l + }(), } + rootFunction := originalBranch[0] + rootFunction.Front().Value.(*common.Node).Branches = newBranch time.Sleep(2 * time.Second) metadata := &InvocationMetadata{ - RootFunction: list, + RootFunction: rootFunction, Phase: common.ExecutionPhase, MinuteIndex: 0, InvocationIndex: 2, SuccessCount: &successCount, FailedCount: &failureCount, FailedCountByMinute: failureCountByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: invocationRecordOutputChannel, AnnounceDoneWG: announceDone, } announceDone.Add(1) testDriver.invokeFunction(metadata) - if !(successCount == 1 && failureCount == 0) { + announceDone.Wait() + if !(functionsInvoked == 3 && failureCount == 0) { t.Error("The DAG invocation has failed.") } for i := 0; i < functionsToInvoke; i++ { diff --git a/pkg/generator/dag_generation.go b/pkg/generator/dag_generation.go new file mode 100644 index 000000000..c56050113 --- /dev/null +++ b/pkg/generator/dag_generation.go @@ -0,0 +1,241 @@ +package generator + +import ( + "container/list" + "encoding/csv" + "fmt" + "math/rand" + "os" + "strconv" + "strings" + + log "github.com/sirupsen/logrus" + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/config" +) + +// Visual Representation for the DAG +func printDAG(DAGWorkflow *list.List) { + DAGNode := DAGWorkflow.Front() + nodeQueue := make([]*list.Element, 0) + nodeQueue = append(nodeQueue, DAGNode) + var printMessage string + var buffer string = "" + var dummyNode *list.Element + var startingNode bool = true + for len(nodeQueue) > 0 { + DAGNode = nodeQueue[0] + nodeQueue = nodeQueue[1:] + functionId := common.GetName(DAGNode.Value.(*common.Node).Function) + if startingNode { + printMessage = "|" + strconv.Itoa(functionId) + for i := 0; i < DAGNode.Value.(*common.Node).Depth; i++ { + buffer += " " + } + printMessage = buffer + printMessage + startingNode = false + } else { + printMessage = printMessage + " -> " + strconv.Itoa(functionId) + } + for i := 0; i < len(DAGNode.Value.(*common.Node).Branches); i++ { + nodeQueue = append(nodeQueue, dummyNode) + copy(nodeQueue[1:], nodeQueue) + nodeQueue[0] = DAGNode.Value.(*common.Node).Branches[i].Front() + } + if DAGNode.Next() == nil { + println(printMessage) + buffer = "" + if len(nodeQueue) > 0 { + startingNode = true + } else { + break + } + } else { + nodeQueue = append(nodeQueue, dummyNode) + copy(nodeQueue[1:], nodeQueue) + nodeQueue[0] = DAGNode.Next() + } + } +} + +// Read the Cumulative Distribution Frequency (CDF) of the widths and depths of a DAG +func generateCDF(file string) [][]float64 { + f, err := os.Open(file) + if err != nil { + log.Fatal(err) + } + defer f.Close() + csvReader := csv.NewReader(f) + records, err := csvReader.ReadAll() + if err != nil { + log.Fatal(err) + } + records = records[1:] + cdf := make([][]float64, len(records[0])) + for i := 0; i < len(records[0]); i++ { + cdf[i] = make([]float64, len(records)) + } + for i := 0; i < len(records[0]); i += 2 { + for j := 0; j < len(records); j++ { + cdfProb, _ := strconv.ParseFloat(strings.TrimSuffix(records[j][i+1], "%"), 64) + cdfValue, _ := strconv.ParseFloat(records[j][i], 64) + cdf[i+1][j] = cdfProb + cdf[i][j] = cdfValue + if cdfProb == 100.00 { + cdf[i] = cdf[i][:j+1] + cdf[i+1] = cdf[i+1][:j+1] + break + } + } + } + return cdf +} + +// Generate pseudo-random probabilities and compare it with the given CDF to obtain the depth and width of the DAG +func getDAGStats(cdf [][]float64, maxSize int, numberOfTries int) (int, int) { + var width, depth int + depthProb := rand.Float64() * 100 + widthProb := rand.Float64() * 100 + for i, value := range cdf[1] { + if value >= widthProb { + width = int(cdf[0][i]) + break + } + } + for i, value := range cdf[3] { + if value >= depthProb { + depth = int(cdf[2][i]) + break + } + } + // Re-run DAG Generation if size exceeds number of functions + if maxSize < width*(depth-1)+1 { + if numberOfTries == 10 { + return 1, maxSize + } + width, depth = getDAGStats(cdf, maxSize, numberOfTries+1) + } + return width, depth +} + +func GenerateDAGs(config *config.LoaderConfiguration, functions []*common.Function) []*list.List { + var width, depth int + var functionIndex int = 0 + var dagIdentity int = 0 + var functionLinkedList *list.List + totalDAGList := []*list.List{} + for { + if config.EnableDAGDataset { + DAGDistribution := generateCDF(fmt.Sprintf("%s/dag_structure.csv", config.TracePath)) + width, depth = getDAGStats(DAGDistribution, len(functions), 0) + } else { + // Sanity checking if max size of DAG exceeds number of functions available + width = config.Width + depth = config.Depth + } + if (len(functions) - functionIndex) < (depth-1)*width+1 { + log.Infof("DAGs created: %d, Total Functions used: %d, Functions Unused: %d", dagIdentity, functionIndex, len(functions)-functionIndex) + break + } + functionLinkedList, functionIndex = createDAGWorkflow(functions, functions[functionIndex], width, depth, dagIdentity) + dagIdentity++ + printDAG(functionLinkedList) + totalDAGList = append(totalDAGList, functionLinkedList) + } + return totalDAGList +} + +func createDAGWorkflow(functionList []*common.Function, function *common.Function, maxWidth int, maxDepth int, dagIdentity int) (*list.List, int) { + DAGList := list.New() + dagIdentifier := fmt.Sprintf("DAG %d,", dagIdentity) + var functionID int = common.GetName(function) + if maxDepth == 1 { + DAGList.PushBack(&common.Node{Function: function, Depth: 0, DAG: dagIdentifier}) + return DAGList, functionID + 1 + } + widthList := generateNodeDistribution(maxWidth, maxDepth) + // Implement a FIFO queue for nodes to assign functions and branches to each node. + nodeQueue := []*list.Element{} + for i := 0; i < len(widthList); i++ { + widthList[i] -= 1 + DAGList.PushBack(&common.Node{Depth: -1}) + } + DAGList.Front().Value = &common.Node{Function: function, Depth: 0, DAG: dagIdentifier} + functionID += 1 + nodeQueue = append(nodeQueue, DAGList.Front()) + for len(nodeQueue) > 0 { + listElement := nodeQueue[0] + nodeQueue = nodeQueue[1:] + node := listElement.Value.(*common.Node) + // Checks if the node has reached the maximum depth of the DAG (maxDepth -1) + if node.Depth == maxDepth-1 { + continue + } + child := &common.Node{Function: functionList[functionID], Depth: node.Depth + 1, DAG: dagIdentifier} + functionID += 1 + listElement.Next().Value = child + nodeQueue = append(nodeQueue, listElement.Next()) + // Creating parallel branches from the node, if width of next stage > width of current stage + var nodeList []*list.List + if widthList[node.Depth+1] > 0 { + nodeList, nodeQueue = addBranches(nodeQueue, widthList, node, functionList, functionID, dagIdentifier) + functionID += len(nodeList) + } else { + nodeList = []*list.List{} + } + node.Branches = nodeList + } + return DAGList, functionID +} + +func addBranches(nodeQueue []*list.Element, widthList []int, node *common.Node, functionList []*common.Function, functionID int, dagIdentifier string) ([]*list.List, []*list.Element) { + var additionalBranches int + if len(nodeQueue) < 1 || (nodeQueue[0].Value.(*common.Node).Depth > node.Depth) { + additionalBranches = widthList[node.Depth+1] + } else { + additionalBranches = rand.Intn(widthList[node.Depth+1] + 1) + } + for i := node.Depth + 1; i < len(widthList); i++ { + widthList[i] -= additionalBranches + } + nodeList := make([]*list.List, additionalBranches) + for i := 0; i < additionalBranches; i++ { + newBranch := createNewBranch(functionList, node, len(widthList), functionID, dagIdentifier) + functionID += 1 + nodeList[i] = newBranch + nodeQueue = append(nodeQueue, newBranch.Front()) + } + return nodeList, nodeQueue +} + +func createNewBranch(functionList []*common.Function, node *common.Node, maxDepth int, functionID int, dagIdentifier string) *list.List { + DAGBranch := list.New() + // Ensuring that each node is connected to a child until the maximum depth + for i := node.Depth + 1; i < maxDepth; i++ { + DAGBranch.PushBack(&common.Node{Depth: -1}) + } + child := &common.Node{Function: functionList[functionID], Depth: node.Depth + 1, DAG: dagIdentifier} + DAGBranch.Front().Value = child + return DAGBranch +} + +func generateNodeDistribution(maxWidth int, maxDepth int) []int { + // Generating the number of nodes per depth (stage). + widthList := []int{} + widthList = append(widthList, 1) + for i := 1; i < maxDepth-1; i++ { + widthList = append(widthList, (rand.Intn(maxWidth-widthList[i-1]+1) + widthList[i-1])) + } + widthList = append(widthList, maxWidth) + return widthList +} + +func GetMaxInvocation(functionList []*common.Function) []int { + maxInvocation := make([]int, len(functionList[0].InvocationStats.Invocations)) + for _, i := range functionList { + for index, invocation := range i.InvocationStats.Invocations { + maxInvocation[index] = max(maxInvocation[index], invocation) + } + } + return maxInvocation +} diff --git a/pkg/generator/dag_generation_test.go b/pkg/generator/dag_generation_test.go new file mode 100644 index 000000000..d81c7b4a9 --- /dev/null +++ b/pkg/generator/dag_generation_test.go @@ -0,0 +1,100 @@ +/* + * MIT License + * + * Copyright (c) 2023 EASL and the vHive community + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package generator + +import ( + "testing" + + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/config" +) + +var fakeConfig *config.LoaderConfiguration = &config.LoaderConfiguration{ + Platform: "Knative", + InvokeProtocol: "grpc", + TracePath: "data/traces/example", + OutputPathPrefix: "test", + EnableZipkinTracing: true, + GRPCConnectionTimeoutSeconds: 5, + GRPCFunctionTimeoutSeconds: 15, + DAGMode: true, + EnableDAGDataset: false, + Width: 2, + Depth: 2, +} + +var functions []*common.Function = []*common.Function{ + { + Name: "test-function", + InvocationStats: &common.FunctionInvocationStats{ + Invocations: []int{ + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + }, + }, + RuntimeStats: &common.FunctionRuntimeStats{ + Average: 50, + Count: 100, + Minimum: 0, + Maximum: 100, + Percentile0: 0, + Percentile1: 1, + Percentile25: 25, + Percentile50: 50, + Percentile75: 75, + Percentile99: 99, + Percentile100: 100, + }, + MemoryStats: &common.FunctionMemoryStats{ + Average: 5000, + Count: 100, + Percentile1: 100, + Percentile5: 500, + Percentile25: 2500, + Percentile50: 5000, + Percentile75: 7500, + Percentile95: 9500, + Percentile99: 9900, + Percentile100: 10000, + }, + Specification: &common.FunctionSpecification{ + RuntimeSpecification: make([][]common.RuntimeSpecification, 1), + }, + }, +} +var functionList []*common.Function = make([]*common.Function, 3) + +func TestGenerateDAG(t *testing.T) { + for i := 0; i < len(functionList); i++ { + functionList[i] = functions[0] + } + dagList := GenerateDAGs(fakeConfig, functionList)[0] + branch := dagList.Front().Value.(*common.Node).Branches + if dagList.Len() != 2 && len(branch) != 1 { + t.Error("Invalid DAG Generated") + } +}