diff --git a/.github/configs/wordlist.txt b/.github/configs/wordlist.txt index c24bc022f..f0cbc79a3 100644 --- a/.github/configs/wordlist.txt +++ b/.github/configs/wordlist.txt @@ -151,6 +151,7 @@ Ctrl currencyservice Daglis DAGMode +DAGTracePath datacenter Datacenter dataflows diff --git a/cmd/config.json b/cmd/config.json index 3fbfa0f83..85de66dd7 100644 --- a/cmd/config.json +++ b/cmd/config.json @@ -21,5 +21,6 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false + "DAGMode": false, + "DAGTracePath": "data/traces/sampled_150/20" } \ No newline at end of file diff --git a/cmd/loader.go b/cmd/loader.go index 3c062762e..e09e208ba 100644 --- a/cmd/loader.go +++ b/cmd/loader.go @@ -27,10 +27,12 @@ package main import ( "flag" "fmt" - "golang.org/x/exp/slices" "os" + "os/exec" "time" + "golang.org/x/exp/slices" + "github.com/vhive-serverless/loader/pkg/common" "github.com/vhive-serverless/loader/pkg/config" "github.com/vhive-serverless/loader/pkg/driver" @@ -101,6 +103,14 @@ func main() { log.Fatal("Unsupported platform! Supported platforms are [Knative, OpenWhisk, AWSLambda, Dirigent]") } + if cfg.DAGMode { + cfg.TracePath = cfg.DAGTracePath + _, err := os.Stat(cfg.DAGTracePath) + if err != nil { + getTrace() + } + } + runTraceMode(&cfg, *iatGeneration, *generated) } @@ -188,3 +198,17 @@ func runTraceMode(cfg *config.LoaderConfiguration, iatOnly bool, generated bool) experimentDriver.RunExperiment(iatOnly, generated) } + +func getTrace() bool { + _, err := exec.Command("git", "lfs", "pull").CombinedOutput() + if err != nil { + log.Warnf("Failed to get traces") + return false + } + _, err = exec.Command("tar", "-xzf", "data/traces/reference/sampled_150.tar.gz", "-C", "data/traces").CombinedOutput() + if err != nil { + log.Warnf("Failed to extract sample") + return false + } + return true +} diff --git a/data/traces/example/dag_structure.xlsx b/data/traces/example/dag_structure.xlsx new file mode 100644 index 000000000..eb243876e Binary files /dev/null and b/data/traces/example/dag_structure.xlsx differ diff --git a/docs/configuration.md b/docs/configuration.md index adb76bc28..e18cacad0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -19,7 +19,8 @@ | MetricScrapingPeriodSeconds | int | > 0 | 15 | Period of Prometheus metrics scrapping | | GRPCConnectionTimeoutSeconds | int | > 0 | 60 | Timeout for establishing a gRPC connection | | GRPCFunctionTimeoutSeconds | int | > 0 | 90 | Maximum time given to function to execute[^4] | -| DAGMode | bool | true/false | false | Sequential invocation of all functions one after another | +| DAGMode | bool | true/false | false | Parallel invocation of DAG with each function acting as entry function | +| DAGTracePath | string | string | data/traces/sampled_150/20 | Folder with Azure trace dimensions used for DAG Invocation. | [^1]: The second granularity feature interprets each column of the trace as a second, rather than as a minute, and generates IAT for each second. This feature is useful for fine-grained and precise invocation scheduling in experiments involving stable low load. diff --git a/go.mod b/go.mod index 20e73a13d..17f7c4956 100644 --- a/go.mod +++ b/go.mod @@ -35,8 +35,13 @@ require ( github.com/go-pdf/fpdf v0.9.0 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/kr/fs v0.1.0 // indirect + github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/pkg/sftp v1.13.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/richardlehane/mscfb v1.0.4 // indirect + github.com/richardlehane/msoleps v1.0.3 // indirect + github.com/xuri/efp v0.0.0-20231025114914-d1ff6096ae53 // indirect + github.com/xuri/nfp v0.0.0-20230919160717-d98342af3f05 // indirect go.opentelemetry.io/otel/exporters/zipkin v1.8.0 // indirect golang.org/x/crypto v0.23.0 // indirect golang.org/x/image v0.14.0 // indirect @@ -46,6 +51,7 @@ require ( require ( github.com/containerd/containerd v1.6.13 // indirect github.com/openzipkin/zipkin-go v0.4.0 // indirect + github.com/xuri/excelize/v2 v2.8.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.33.0 go.opentelemetry.io/otel v1.11.1 // indirect go.opentelemetry.io/otel/sdk v1.8.0 // indirect diff --git a/go.sum b/go.sum index 54c19da35..e2276a2b4 100644 --- a/go.sum +++ b/go.sum @@ -124,6 +124,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= +github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -145,6 +147,11 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/richardlehane/mscfb v1.0.4 h1:WULscsljNPConisD5hR0+OyZjwK46Pfyr6mPu5ZawpM= +github.com/richardlehane/mscfb v1.0.4/go.mod h1:YzVpcZg9czvAuhk9T+a3avCpcFPMUWm7gK3DypaEsUk= +github.com/richardlehane/msoleps v1.0.1/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= +github.com/richardlehane/msoleps v1.0.3 h1:aznSZzrwYRl3rLKRT3gUk9am7T/mLNSnJINvN0AQoVM= +github.com/richardlehane/msoleps v1.0.3/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sfreiberg/simplessh v0.0.0-20220719182921-185eafd40485 h1:ZMBZ2DKX1sScUSo9ZUwGI7jCMukslPNQNfZaw9vVyfY= @@ -167,6 +174,12 @@ github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20230926064847-68cc9b github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= +github.com/xuri/efp v0.0.0-20231025114914-d1ff6096ae53 h1:Chd9DkqERQQuHpXjR/HSV1jLZA6uaoiwwH3vSuF3IW0= +github.com/xuri/efp v0.0.0-20231025114914-d1ff6096ae53/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI= +github.com/xuri/excelize/v2 v2.8.1 h1:pZLMEwK8ep+CLIUWpWmvW8IWE/yxqG0I1xcN6cVMGuQ= +github.com/xuri/excelize/v2 v2.8.1/go.mod h1:oli1E4C3Pa5RXg1TBXn4ENCXDV5JUMlBluUhG7c+CEE= +github.com/xuri/nfp v0.0.0-20230919160717-d98342af3f05 h1:qhbILQo1K3mphbwKh1vNm4oGezE1eF9fQWmNiIpSfI4= +github.com/xuri/nfp v0.0.0-20230919160717-d98342af3f05/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.33.0 h1:z6rnla1Asjzn0FrhohzIbDi4bxbtc6EMmQ7f5ZPn+pA= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.33.0/go.mod h1:y/SlJpJQPd2UzfBCj0E9Flk9FDCtTyqUmaCB41qFrWI= diff --git a/pkg/common/trace_types.go b/pkg/common/trace_types.go index 05bf3b06c..08ca542ff 100644 --- a/pkg/common/trace_types.go +++ b/pkg/common/trace_types.go @@ -24,6 +24,8 @@ package common +import "container/list" + type FunctionInvocationStats struct { HashOwner string HashApp string @@ -98,3 +100,9 @@ type Function struct { Specification *FunctionSpecification } + +type Node struct { + Function *Function + Branches []*list.List + Depth int +} diff --git a/pkg/config/parser.go b/pkg/config/parser.go index ee78140e4..64f75e0b5 100644 --- a/pkg/config/parser.go +++ b/pkg/config/parser.go @@ -56,6 +56,7 @@ type LoaderConfiguration struct { GRPCConnectionTimeoutSeconds int `json:"GRPCConnectionTimeoutSeconds"` GRPCFunctionTimeoutSeconds int `json:"GRPCFunctionTimeoutSeconds"` DAGMode bool `json:"DAGMode"` + DAGTracePath string `json:"DAGTracePath"` } func ReadConfigurationFile(path string) LoaderConfiguration { diff --git a/pkg/driver/trace_driver.go b/pkg/driver/trace_driver.go index c9e61b037..c0309cd87 100644 --- a/pkg/driver/trace_driver.go +++ b/pkg/driver/trace_driver.go @@ -30,8 +30,10 @@ import ( "encoding/json" "fmt" "math" + "math/rand" "os" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -43,6 +45,7 @@ import ( "github.com/vhive-serverless/loader/pkg/generator" mc "github.com/vhive-serverless/loader/pkg/metric" "github.com/vhive-serverless/loader/pkg/trace" + "github.com/xuri/excelize/v2" ) type DriverConfiguration struct { @@ -100,13 +103,217 @@ func (d *Driver) runCSVWriter(records chan interface{}, filename string, writerD writerDone.Done() } -func DAGCreation(functions []*common.Function) *list.List { - linkedList := list.New() - // Assigning nodes one after another - for _, function := range functions { - linkedList.PushBack(function) +func createDAGWorkflow(functionList []*common.Function, function *common.Function, maxWidth int, maxDepth int) *list.List { + // Generating the number of nodes per depth (stage). + widthList := []int{} + widthList = append(widthList, 1) + // Using -1 as indicator for dummy node + dummyNode := &common.Node{Depth: -1} + node := &common.Node{Function: function, Depth: 0} + DAGList := list.New() + if maxDepth == 1 { + DAGList.PushBack(node) + return DAGList } - return linkedList + for i := 1; i < maxDepth-1; i++ { + widthList = append(widthList, (rand.Intn(maxWidth-widthList[i-1]+1) + widthList[i-1])) + } + widthList = append(widthList, maxWidth) + // Implement a FIFO queue for nodes to assign functions and branches to each node. + nodeQueue := []*list.Element{} + functionId := getName(function) + var additionalBranches int + var listElement *list.Element + functionId = (functionId + 1) % len(functionList) + for i := 0; i < len(widthList); i++ { + widthList[i] -= 1 + DAGList.PushBack(dummyNode) + } + DAGList.Front().Value = node + nodeQueue = append(nodeQueue, DAGList.Front()) + for len(nodeQueue) > 0 { + listElement = nodeQueue[0] + nodeQueue = nodeQueue[1:] + node = listElement.Value.(*common.Node) + // Checks if the node has reached the maximum depth of the DAG (maxDepth -1) + if node.Depth == maxDepth-1 { + continue + } + child := &common.Node{Function: functionList[functionId], Depth: node.Depth + 1} + functionId = (functionId + 1) % len(functionList) + listElement.Next().Value = child + nodeQueue = append(nodeQueue, listElement.Next()) + + // Creating parallel branches from the node, if width of next stage > width of current stage + if widthList[node.Depth+1] > 0 { + if len(nodeQueue) < 1 || (nodeQueue[0].Value.(*common.Node).Depth > node.Depth) { + additionalBranches = widthList[node.Depth+1] + } else { + additionalBranches = rand.Intn(widthList[node.Depth+1] + 1) + } + for i := node.Depth + 1; i < len(widthList); i++ { + widthList[i] -= additionalBranches + } + } else { + additionalBranches = 0 + } + + nodeList := make([]*list.List, additionalBranches) + // Populate additional branches of the node with dummy nodes, and add them to the queue to be assigned + for i := 0; i < additionalBranches; i++ { + DAGBranch := list.New() + // Ensuring that each node is connected to a child until the maximum depth + for i := node.Depth + 1; i < maxDepth; i++ { + DAGBranch.PushBack(dummyNode) + } + child = &common.Node{Function: functionList[functionId], Depth: node.Depth + 1} + functionId = (functionId + 1) % len(functionList) + DAGBranch.Front().Value = child + nodeList[i] = DAGBranch + nodeQueue = append(nodeQueue, DAGBranch.Front()) + } + node.Branches = nodeList + } + return DAGList +} + +func printDAG(DAGWorkflow *list.List) { + DAGNode := DAGWorkflow.Front() + nodeQueue := make([]*list.Element, 0) + nodeQueue = append(nodeQueue, DAGNode) + var printMessage string + var buffer string = "" + var dummyNode *list.Element + var startingNode bool = true + for len(nodeQueue) > 0 { + DAGNode = nodeQueue[0] + nodeQueue = nodeQueue[1:] + functionId := getName(DAGNode.Value.(*common.Node).Function) + if startingNode { + printMessage = "|" + strconv.Itoa(functionId) + for i := 0; i < DAGNode.Value.(*common.Node).Depth; i++ { + buffer += " " + } + printMessage = buffer + printMessage + startingNode = false + } else { + printMessage = printMessage + " -> " + strconv.Itoa(functionId) + } + for i := 0; i < len(DAGNode.Value.(*common.Node).Branches); i++ { + nodeQueue = append(nodeQueue, dummyNode) + copy(nodeQueue[1:], nodeQueue) + nodeQueue[0] = DAGNode.Value.(*common.Node).Branches[i].Front() + } + if DAGNode.Next() == nil { + println(printMessage) + buffer = "" + if len(nodeQueue) > 0 { + startingNode = true + } else { + break + } + } else { + nodeQueue = append(nodeQueue, dummyNode) + copy(nodeQueue[1:], nodeQueue) + nodeQueue[0] = DAGNode.Next() + } + } + +} + +func getMaxInvocation(functionList []*common.Function) []int { + maxInvocation := make([]int, len(functionList[0].InvocationStats.Invocations)) + for _, i := range functionList { + for index, invocation := range i.InvocationStats.Invocations { + maxInvocation[index] = max(maxInvocation[index], invocation) + } + } + return maxInvocation + +} + +func getName(function *common.Function) int { + parts := strings.Split(function.Name, "-") + if parts[0] == "test" { + return 0 + } + functionId, err := strconv.Atoi(parts[2]) + if err != nil { + log.Fatal(err) + } + return functionId +} + +// Read the Cumulative Distribution Frequency (CDF) of the widths and depths of a DAG +func generateCDF(file string) [][]float64 { + sheetName := "data" + f, err := excelize.OpenFile(file) + if err != nil { + log.Fatal(err) + } + defer func() { + if err := f.Close(); err != nil { + fmt.Println(err) + } + }() + // Removing the first 2 header rows + _ = f.RemoveRow(sheetName, 1) + _ = f.RemoveRow(sheetName, 1) + cols, _ := f.GetCols(sheetName) + + cdf := make([][]float64, len(cols)) + for i := range cols { + cdf[i] = make([]float64, len(cols[i])) + } + for i := 0; i < 6; i += 2 { + for j := 0; j < len(cols[i]); j++ { + cdfProb, _ := strconv.ParseFloat(strings.TrimSuffix(cols[i+1][j], "%"), 64) + cdfValue, _ := strconv.ParseFloat(cols[i][j], 64) + cdfProb = math.Round(cdfProb*100) / 100 + cdf[i+1][j] = cdfProb + cdf[i][j] = cdfValue + if cdfProb == 100.00 { + cdf[i] = cdf[i][:j+1] + cdf[i+1] = cdf[i+1][:j+1] + break + } + + } + } + return cdf +} + +// Generate pseudo-random probabilities and compare it with the given CDF to obtain the depth and width of the DAG +func getDAGStats(cdf [][]float64, maxSize int) (int, int) { + var width int + var depth int + depthProb := math.Round((rand.Float64() * 10000)) / 100 + widthProb := math.Round((rand.Float64() * 10000)) / 100 + for i, value := range cdf[1] { + if value == widthProb { + width = int(cdf[0][i]) + break + } + if value > widthProb { + width = int(cdf[0][i-1]) + break + } + } + for i, value := range cdf[3] { + if value == depthProb { + depth = int(cdf[2][i]) + break + } + if value > depthProb { + depth = int(cdf[2][i-1]) + break + } + } + // Re-run DAG Generation if size exceeds number of functions + if maxSize < width*(depth-1) { + width, depth = getDAGStats(cdf, maxSize) + } + return width, depth } ///////////////////////////////////////// @@ -185,6 +392,7 @@ type InvocationMetadata struct { SuccessCount *int64 FailedCount *int64 FailedCountByMinute []int64 + FunctionsInvoked *int64 RecordOutputChannel chan interface{} AnnounceDoneWG *sync.WaitGroup @@ -214,8 +422,11 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { node := metadata.RootFunction.Front() var record *mc.ExecutionRecord var runtimeSpecifications *common.RuntimeSpecification + var branches []*list.List + var invocationRetries int + var numberOfFunctionsInvoked int64 for node != nil { - function := node.Value.(*common.Function) + function := node.Value.(*common.Node).Function runtimeSpecifications = &function.Specification.RuntimeSpecification[metadata.MinuteIndex][metadata.InvocationIndex] switch d.Configuration.LoaderConfiguration.Platform { case "Knative": @@ -246,16 +457,31 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { default: log.Fatal("Unsupported platform.") } + if !success && (d.Configuration.LoaderConfiguration.DAGMode && invocationRetries == 0) { + log.Debugf("Invocation failed at minute: %d for %s. Retrying Invocation", metadata.MinuteIndex, function.Name) + invocationRetries += 1 + continue + } record.Phase = int(metadata.Phase) record.InvocationID = composeInvocationID(d.Configuration.TraceGranularity, metadata.MinuteIndex, metadata.InvocationIndex) metadata.RecordOutputChannel <- record - + numberOfFunctionsInvoked += 1 if !success { log.Debugf("Invocation failed at minute: %d for %s", metadata.MinuteIndex, function.Name) break } + branches = node.Value.(*common.Node).Branches + for i := 0; i < len(branches); i++ { + newMetadataValue := *metadata + newMetadata := &newMetadataValue + newMetadata.RootFunction = branches[i] + newMetadata.AnnounceDoneWG.Add(1) + atomic.AddInt64(metadata.SuccessCount, -1) + go d.invokeFunction(newMetadata) + } node = node.Next() } + atomic.AddInt64(metadata.FunctionsInvoked, numberOfFunctionsInvoked) if success { atomic.AddInt64(metadata.SuccessCount, 1) } else { @@ -264,11 +490,11 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { } } -func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.WaitGroup, +func (d *Driver) functionsDriver(functionLinkedList *list.List, announceFunctionDone *sync.WaitGroup, addInvocationsToGroup *sync.WaitGroup, readOpenWhiskMetadata *sync.Mutex, totalSuccessful *int64, - totalFailed *int64, totalIssued *int64, recordOutputChannel chan interface{}) { + totalFailed *int64, totalIssued *int64, entriesWritten *int64, recordOutputChannel chan interface{}) { - function := list.Front().Value.(*common.Function) + function := functionLinkedList.Front().Value.(*common.Node).Function numberOfInvocations := 0 for i := 0; i < len(function.InvocationStats.Invocations); i++ { numberOfInvocations += function.InvocationStats.Invocations[i] @@ -284,6 +510,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai var failedInvocations int64 var failedInvocationByMinute = make([]int64, totalTraceDuration) var numberOfIssuedInvocations int64 + var functionsInvoked int64 var currentPhase = common.ExecutionPhase waitForInvocations := sync.WaitGroup{} @@ -343,13 +570,14 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai waitForInvocations.Add(1) go d.invokeFunction(&InvocationMetadata{ - RootFunction: list, + RootFunction: functionLinkedList, Phase: currentPhase, MinuteIndex: minuteIndex, InvocationIndex: invocationIndex, SuccessCount: &successfulInvocations, FailedCount: &failedInvocations, FailedCountByMinute: failedInvocationByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: recordOutputChannel, AnnounceDoneWG: &waitForInvocations, AnnounceDoneExe: addInvocationsToGroup, @@ -364,7 +592,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai InvocationID: composeInvocationID(d.Configuration.TraceGranularity, minuteIndex, invocationIndex), StartTime: time.Now().UnixNano(), } - + functionsInvoked++ successfulInvocations++ } numberOfIssuedInvocations++ @@ -375,11 +603,11 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai waitForInvocations.Wait() log.Debugf("All the invocations for function %s have been completed.\n", function.Name) - announceFunctionDone.Done() - atomic.AddInt64(totalSuccessful, successfulInvocations) atomic.AddInt64(totalFailed, failedInvocations) atomic.AddInt64(totalIssued, numberOfIssuedInvocations) + atomic.AddInt64(entriesWritten, functionsInvoked) + announceFunctionDone.Done() } func (d *Driver) proceedToNextMinute(function *common.Function, minuteIndex *int, invocationIndex *int, startOfMinute *time.Time, @@ -560,7 +788,7 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { var successfulInvocations int64 var failedInvocations int64 var invocationsIssued int64 - var functionsPerDAG int64 + var entriesWritten int64 readOpenWhiskMetadata := sync.Mutex{} allFunctionsInvoked := sync.WaitGroup{} allIndividualDriversCompleted := sync.WaitGroup{} @@ -571,19 +799,28 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { if !iatOnly { log.Info("Generating IAT and runtime specifications for all the functions") + maxInvocation := getMaxInvocation(d.Configuration.Functions) for i, function := range d.Configuration.Functions { - // Equalising all the InvocationStats to the first function - if d.Configuration.LoaderConfiguration.DAGMode { - function.InvocationStats.Invocations = d.Configuration.Functions[0].InvocationStats.Invocations - } spec := d.SpecificationGenerator.GenerateInvocationData( function, d.Configuration.IATDistribution, d.Configuration.ShiftIAT, d.Configuration.TraceGranularity, ) - d.Configuration.Functions[i].Specification = spec + // Overwriting the runtime specification to account for maximum possible invocations + if d.Configuration.LoaderConfiguration.DAGMode { + originalInvocation := function.InvocationStats.Invocations + function.InvocationStats.Invocations = maxInvocation + spec := d.SpecificationGenerator.GenerateInvocationData( + function, + d.Configuration.IATDistribution, + d.Configuration.ShiftIAT, + d.Configuration.TraceGranularity, + ) + function.InvocationStats.Invocations = originalInvocation + function.Specification.RuntimeSpecification = spec.RuntimeSpecification + } } } @@ -605,34 +842,40 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { if d.Configuration.LoaderConfiguration.DAGMode { log.Infof("Starting DAG invocation driver\n") - functionLinkedList := DAGCreation(d.Configuration.Functions) - functionsPerDAG = int64(len(d.Configuration.Functions)) - allIndividualDriversCompleted.Add(1) - go d.functionsDriver( - functionLinkedList, - &allIndividualDriversCompleted, - &allFunctionsInvoked, - &readOpenWhiskMetadata, - &successfulInvocations, - &failedInvocations, - &invocationsIssued, - globalMetricsCollector, - ) + for _, function := range d.Configuration.Functions { + + DAGDistribution := generateCDF("data/traces/example/dag_structure.xlsx") + width, depth := getDAGStats(DAGDistribution, len(d.Configuration.Functions)) + functionLinkedList := createDAGWorkflow(d.Configuration.Functions, function, width, depth) + printDAG(functionLinkedList) + allIndividualDriversCompleted.Add(1) + go d.functionsDriver( + functionLinkedList, + &allIndividualDriversCompleted, + &allFunctionsInvoked, + &readOpenWhiskMetadata, + &successfulInvocations, + &failedInvocations, + &invocationsIssued, + &entriesWritten, + globalMetricsCollector, + ) + } } else { log.Infof("Starting function invocation driver\n") - functionsPerDAG = 1 for _, function := range d.Configuration.Functions { allIndividualDriversCompleted.Add(1) - linkedList := list.New() - linkedList.PushBack(function) + functionLinkedList := list.New() + functionLinkedList.PushBack(&common.Node{Function: function, Depth: 0}) go d.functionsDriver( - linkedList, + functionLinkedList, &allIndividualDriversCompleted, &allFunctionsInvoked, &readOpenWhiskMetadata, &successfulInvocations, &failedInvocations, &invocationsIssued, + &entriesWritten, globalMetricsCollector, ) } @@ -641,7 +884,7 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { if atomic.LoadInt64(&successfulInvocations)+atomic.LoadInt64(&failedInvocations) != 0 { log.Debugf("Waiting for all the invocations record to be written.\n") - totalIssuedChannel <- atomic.LoadInt64(&invocationsIssued) * functionsPerDAG + totalIssuedChannel <- atomic.LoadInt64(&entriesWritten) scraperFinishCh <- 0 // Ask the scraper to finish metrics collection allRecordsWritten.Wait() diff --git a/pkg/driver/trace_driver_test.go b/pkg/driver/trace_driver_test.go index ee8515bca..e3926cb17 100644 --- a/pkg/driver/trace_driver_test.go +++ b/pkg/driver/trace_driver_test.go @@ -123,7 +123,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { testDriver := createTestDriver() var failureCountByMinute = make([]int64, testDriver.Configuration.TraceDuration) - + var functionsInvoked int64 if !test.forceFail { address, port := "localhost", test.port testDriver.Configuration.Functions[0].Endpoint = fmt.Sprintf("%s:%d", address, port) @@ -133,10 +133,10 @@ func TestInvokeFunctionFromDriver(t *testing.T) { // make sure that the gRPC server is running time.Sleep(2 * time.Second) } - + function := testDriver.Configuration.Functions[0] + node := &common.Node{Function: testDriver.Configuration.Functions[0]} list := list.New() - list.PushBack(testDriver.Configuration.Functions[0]) - function := list.Front().Value.(*common.Function) + list.PushBack(node) for i := 0; i < len(function.Specification.RuntimeSpecification); i++ { function.Specification.RuntimeSpecification[i] = make([]common.RuntimeSpecification, 3) } @@ -152,6 +152,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { SuccessCount: &successCount, FailedCount: &failureCount, FailedCountByMinute: failureCountByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: invocationRecordOutputChannel, AnnounceDoneWG: announceDone, } @@ -184,7 +185,8 @@ func TestInvokeFunctionFromDriver(t *testing.T) { func TestDAGInvocation(t *testing.T) { var successCount int64 = 0 var failureCount int64 = 0 - var functionsToInvoke int = 4 + var functionsToInvoke int = 3 + var functionsInvoked int64 invocationRecordOutputChannel := make(chan interface{}, functionsToInvoke) announceDone := &sync.WaitGroup{} @@ -203,7 +205,12 @@ func TestDAGInvocation(t *testing.T) { Runtime: 1000, Memory: 128, } - for i := 0; i < functionsToInvoke; i++ { + functionList := make([]*common.Function, 3) + for i := 0; i < len(functionList); i++ { + functionList[i] = function + } + rootFunction := createDAGWorkflow(functionList, functionList[0], 2, 2) + for i := 0; i < len(functionList); i++ { function = testDriver.Configuration.Functions[0] list.PushBack(function) } @@ -211,20 +218,22 @@ func TestDAGInvocation(t *testing.T) { time.Sleep(2 * time.Second) metadata := &InvocationMetadata{ - RootFunction: list, + RootFunction: rootFunction, Phase: common.ExecutionPhase, MinuteIndex: 0, InvocationIndex: 2, SuccessCount: &successCount, FailedCount: &failureCount, FailedCountByMinute: failureCountByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: invocationRecordOutputChannel, AnnounceDoneWG: announceDone, } announceDone.Add(1) testDriver.invokeFunction(metadata) - if !(successCount == 1 && failureCount == 0) { + announceDone.Wait() + if !(functionsInvoked == 3 && failureCount == 0) { t.Error("The DAG invocation has failed.") } for i := 0; i < functionsToInvoke; i++ {