diff --git a/cmd/config_dirigent_rps.json b/cmd/config_dirigent_rps.json index 28c2140b..53c5a5d4 100644 --- a/cmd/config_dirigent_rps.json +++ b/cmd/config_dirigent_rps.json @@ -5,7 +5,7 @@ "InvokeProtocol" : "http2", "EndpointPort": 80, - "DirigentControlPlaneIP": "10.0.1.253:9092", + "DirigentControlPlaneIP": "localhost:9092", "BusyLoopOnSandboxStartup": false, "RpsTarget": 1, diff --git a/cmd/loader.go b/cmd/loader.go index bb69b80c..bbfb59af 100644 --- a/cmd/loader.go +++ b/cmd/loader.go @@ -27,6 +27,7 @@ package main import ( "flag" "fmt" + "github.com/vhive-serverless/loader/pkg/generator" "os" "strings" "time" @@ -49,7 +50,7 @@ var ( configPath = flag.String("config", "cmd/config_knative_trace.json", "Path to loader configuration file") failurePath = flag.String("failureConfig", "cmd/failure.json", "Path to the failure configuration file") verbosity = flag.String("verbosity", "info", "Logging verbosity - choose from [info, debug, trace]") - iatGeneration = flag.Bool("iatGeneration", false, "Generate iats only or run invocations as well") + iatGeneration = flag.Bool("iatGeneration", false, "Generate IATs only or run invocations as well") iatFromFile = flag.Bool("generated", false, "True if iats were already generated") ) @@ -107,7 +108,7 @@ func main() { if !strings.HasSuffix(cfg.Platform, "-RPS") { runTraceMode(&cfg, *iatFromFile, *iatGeneration) } else { - runRPSMode(&cfg, *iatGeneration) + runRPSMode(&cfg, *iatFromFile, *iatGeneration) } } @@ -175,7 +176,7 @@ func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGen durationToParse := determineDurationToParse(cfg.ExperimentDuration, cfg.WarmupDuration) yamlPath := parseYAMLSpecification(cfg) - traceParser := trace.NewAzureParser(cfg.TracePath, durationToParse) + traceParser := trace.NewAzureParser(cfg.TracePath, yamlPath, durationToParse) functions := traceParser.Parse(cfg.Platform) log.Infof("Traces contain the following %d functions:\n", len(functions)) @@ -205,6 +206,24 @@ func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGen experimentDriver.RunExperiment(justGenerateIAT, readIATFromFile) } -func runRPSMode(cfg *config.LoaderConfiguration, justGenerateIAT bool) { - panic("Not yet implemented") +func runRPSMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGenerateIAT bool) { + rpsTarget := cfg.RpsTarget + coldStartPercentage := cfg.RpsColdStartRatioPercentage + + warmStartRPS := rpsTarget * (100 - coldStartPercentage) / 100 + coldStartRPS := rpsTarget * coldStartPercentage / 100 + + warmFunction, warmStartCount := generator.GenerateWarmStartFunction(cfg.ExperimentDuration, warmStartRPS) + coldFunctions, coldStartCount := generator.GenerateColdStartFunctions(cfg.ExperimentDuration, coldStartRPS, cfg.RpsCooldownSeconds) + + experimentDriver := driver.NewDriver(&config.Configuration{ + LoaderConfiguration: cfg, + TraceDuration: determineDurationToParse(cfg.ExperimentDuration, cfg.WarmupDuration), + + YAMLPath: parseYAMLSpecification(cfg), + + Functions: generator.CreateRPSFunctions(cfg, warmFunction, warmStartCount, coldFunctions, coldStartCount), + }) + + experimentDriver.RunExperiment(justGenerateIAT, readIATFromFile) } diff --git a/pkg/common/specification_types.go b/pkg/common/specification_types.go index 1b89137d..69e9bdca 100644 --- a/pkg/common/specification_types.go +++ b/pkg/common/specification_types.go @@ -25,7 +25,7 @@ package common // IATMatrix - columns are minutes, rows are IATs -type IATMatrix [][]float64 +type IATArray []float64 // ProbabilisticDuration used for testing the exponential distribution type ProbabilisticDuration []float64 @@ -35,10 +35,11 @@ type RuntimeSpecification struct { Memory int } -type RuntimeSpecificationMatrix [][]RuntimeSpecification +type RuntimeSpecificationArray []RuntimeSpecification type FunctionSpecification struct { - IAT IATMatrix `json:"IAT"` - RawDuration ProbabilisticDuration `json:"RawDuration"` - RuntimeSpecification RuntimeSpecificationMatrix `json:"RuntimeSpecification"` + IAT IATArray `json:"IAT"` + PerMinuteCount []int `json:"PerMinuteCount"` + RawDuration ProbabilisticDuration `json:"RawDuration"` + RuntimeSpecification RuntimeSpecificationArray `json:"RuntimeSpecification"` } diff --git a/pkg/driver/metrics.go b/pkg/driver/metrics.go new file mode 100644 index 00000000..b1dbab07 --- /dev/null +++ b/pkg/driver/metrics.go @@ -0,0 +1,110 @@ +package driver + +import ( + "encoding/json" + "github.com/vhive-serverless/loader/pkg/common" + mc "github.com/vhive-serverless/loader/pkg/metric" + "math" + "os" + "sync" + "time" +) + +func (d *Driver) CreateMetricsScrapper(interval time.Duration, + signalReady *sync.WaitGroup, finishCh chan int, allRecordsWritten *sync.WaitGroup) func() { + timer := time.NewTicker(interval) + + return func() { + signalReady.Done() + knStatRecords := make(chan interface{}, 100) + scaleRecords := make(chan interface{}, 100) + writerDone := sync.WaitGroup{} + + clusterUsageFile, err := os.Create(d.outputFilename("cluster_usage")) + common.Check(err) + defer clusterUsageFile.Close() + + writerDone.Add(1) + go d.runCSVWriter(knStatRecords, d.outputFilename("kn_stats"), &writerDone) + + writerDone.Add(1) + go d.runCSVWriter(scaleRecords, d.outputFilename("deployment_scale"), &writerDone) + + for { + select { + case <-timer.C: + recCluster := mc.ScrapeClusterUsage() + recCluster.Timestamp = time.Now().UnixMicro() + + byteArr, err := json.Marshal(recCluster) + common.Check(err) + + _, err = clusterUsageFile.Write(byteArr) + common.Check(err) + + _, err = clusterUsageFile.WriteString("\n") + common.Check(err) + + recScale := mc.ScrapeDeploymentScales() + timestamp := time.Now().UnixMicro() + for _, rec := range recScale { + rec.Timestamp = timestamp + scaleRecords <- rec + } + + recKnative := mc.ScrapeKnStats() + recKnative.Timestamp = time.Now().UnixMicro() + knStatRecords <- recKnative + case <-finishCh: + close(knStatRecords) + close(scaleRecords) + + writerDone.Wait() + allRecordsWritten.Done() + + return + } + } + } +} + +func (d *Driver) createGlobalMetricsCollector(filename string, collector chan *mc.ExecutionRecord, + signalReady *sync.WaitGroup, signalEverythingWritten *sync.WaitGroup, totalIssuedChannel chan int64) { + + // NOTE: totalNumberOfInvocations is initialized to MaxInt64 not to allow collector to complete before + // the end signal is received on totalIssuedChannel, which deliver the total number of issued invocations. + // This number is known once all the individual function drivers finish issuing invocations and + // when all the invocations return + var totalNumberOfInvocations int64 = math.MaxInt64 + var currentlyWritten int64 + + file, err := os.Create(filename) + common.Check(err) + defer file.Close() + + signalReady.Done() + + records := make(chan interface{}, 100) + writerDone := sync.WaitGroup{} + writerDone.Add(1) + go d.runCSVWriter(records, filename, &writerDone) + + for { + select { + case record := <-collector: + records <- record + + currentlyWritten++ + case record := <-totalIssuedChannel: + totalNumberOfInvocations = record + } + + if currentlyWritten == totalNumberOfInvocations { + close(records) + writerDone.Wait() + (*signalEverythingWritten).Done() + + return + } + } +} diff --git a/pkg/driver/trace_driver.go b/pkg/driver/trace_driver.go index fe83fd42..f590cc0e 100644 --- a/pkg/driver/trace_driver.go +++ b/pkg/driver/trace_driver.go @@ -33,7 +33,6 @@ import ( "github.com/vhive-serverless/loader/pkg/driver/clients" "github.com/vhive-serverless/loader/pkg/driver/deployment" "github.com/vhive-serverless/loader/pkg/driver/failure" - "math" "os" "strconv" "sync" @@ -102,68 +101,6 @@ func DAGCreation(functions []*common.Function) *list.List { return linkedList } -///////////////////////////////////////// -// METRICS SCRAPPERS -///////////////////////////////////////// - -func (d *Driver) CreateMetricsScrapper(interval time.Duration, - signalReady *sync.WaitGroup, finishCh chan int, allRecordsWritten *sync.WaitGroup) func() { - timer := time.NewTicker(interval) - - return func() { - signalReady.Done() - knStatRecords := make(chan interface{}, 100) - scaleRecords := make(chan interface{}, 100) - writerDone := sync.WaitGroup{} - - clusterUsageFile, err := os.Create(d.outputFilename("cluster_usage")) - common.Check(err) - defer clusterUsageFile.Close() - - writerDone.Add(1) - go d.runCSVWriter(knStatRecords, d.outputFilename("kn_stats"), &writerDone) - - writerDone.Add(1) - go d.runCSVWriter(scaleRecords, d.outputFilename("deployment_scale"), &writerDone) - - for { - select { - case <-timer.C: - recCluster := mc.ScrapeClusterUsage() - recCluster.Timestamp = time.Now().UnixMicro() - - byteArr, err := json.Marshal(recCluster) - common.Check(err) - - _, err = clusterUsageFile.Write(byteArr) - common.Check(err) - - _, err = clusterUsageFile.WriteString("\n") - common.Check(err) - - recScale := mc.ScrapeDeploymentScales() - timestamp := time.Now().UnixMicro() - for _, rec := range recScale { - rec.Timestamp = timestamp - scaleRecords <- rec - } - - recKnative := mc.ScrapeKnStats() - recKnative.Timestamp = time.Now().UnixMicro() - knStatRecords <- recKnative - case <-finishCh: - close(knStatRecords) - close(scaleRecords) - - writerDone.Wait() - allRecordsWritten.Done() - - return - } - } - } -} - ///////////////////////////////////////// // DRIVER LOGIC ///////////////////////////////////////// @@ -179,7 +116,7 @@ type InvocationMetadata struct { FailedCount *int64 FailedCountByMinute []int64 - RecordOutputChannel chan interface{} + RecordOutputChannel chan *mc.ExecutionRecord AnnounceDoneWG *sync.WaitGroup AnnounceDoneExe *sync.WaitGroup ReadOpenWhiskMetadata *sync.Mutex @@ -200,7 +137,7 @@ func composeInvocationID(timeGranularity common.TraceGranularity, minuteIndex in return fmt.Sprintf("%s%d.inv%d", timePrefix, minuteIndex, invocationIndex) } -func (d *Driver) invokeFunction(metadata *InvocationMetadata) { +func (d *Driver) invokeFunction(metadata *InvocationMetadata, iatIndex int) { defer metadata.AnnounceDoneWG.Done() var success bool @@ -209,7 +146,7 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { var runtimeSpecifications *common.RuntimeSpecification for node != nil { function := node.Value.(*common.Function) - runtimeSpecifications = &function.Specification.RuntimeSpecification[metadata.MinuteIndex][metadata.InvocationIndex] + runtimeSpecifications = &function.Specification.RuntimeSpecification[iatIndex] success, record = d.Invoker.Invoke(function, runtimeSpecifications) @@ -233,12 +170,12 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.WaitGroup, addInvocationsToGroup *sync.WaitGroup, readOpenWhiskMetadata *sync.Mutex, totalSuccessful *int64, - totalFailed *int64, totalIssued *int64, recordOutputChannel chan interface{}) { + totalFailed *int64, totalIssued *int64, recordOutputChannel chan *mc.ExecutionRecord) { function := list.Front().Value.(*common.Function) numberOfInvocations := 0 - for i := 0; i < len(function.InvocationStats.Invocations); i++ { - numberOfInvocations += function.InvocationStats.Invocations[i] + for i := 0; i < len(function.Specification.PerMinuteCount); i++ { + numberOfInvocations += function.Specification.PerMinuteCount[i] } addInvocationsToGroup.Add(numberOfInvocations) @@ -254,11 +191,13 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai var currentPhase = common.ExecutionPhase waitForInvocations := sync.WaitGroup{} + currentMinute, currentSum := 0, 0 if d.Configuration.WithWarmup() { currentPhase = common.WarmupPhase // skip the first minute because of profiling minuteIndex = 1 + currentMinute = 1 log.Infof("Warmup phase has started.") } @@ -267,10 +206,18 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai var previousIATSum int64 for { + if minuteIndex != currentMinute { + // postpone summation of invocation count for the beginning of each minute + currentSum += function.Specification.PerMinuteCount[currentMinute] + currentMinute = minuteIndex + } + + iatIndex := currentSum + invocationIndex + if minuteIndex >= totalTraceDuration { // Check whether the end of trace has been reached break - } else if function.InvocationStats.Invocations[minuteIndex] == 0 { + } else if function.Specification.PerMinuteCount[minuteIndex] == 0 { // Sleep for a minute if there are no invocations if d.proceedToNextMinute(function, &minuteIndex, &invocationIndex, &startOfMinute, true, ¤tPhase, failedInvocationByMinute, &previousIATSum) { @@ -289,7 +236,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai continue } - iat := time.Duration(IAT[minuteIndex][invocationIndex]) * time.Microsecond + iat := time.Duration(IAT[iatIndex]) * time.Microsecond currentTime := time.Now() schedulingDelay := currentTime.Sub(startOfMinute).Microseconds() - previousIATSum @@ -321,15 +268,17 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai AnnounceDoneWG: &waitForInvocations, AnnounceDoneExe: addInvocationsToGroup, ReadOpenWhiskMetadata: readOpenWhiskMetadata, - }) + }, iatIndex) } else { // To be used from within the Golang testing framework log.Debugf("Test mode invocation fired.\n") - recordOutputChannel <- &mc.ExecutionRecordBase{ - Phase: int(currentPhase), - InvocationID: composeInvocationID(d.Configuration.TraceGranularity, minuteIndex, invocationIndex), - StartTime: time.Now().UnixNano(), + recordOutputChannel <- &mc.ExecutionRecord{ + ExecutionRecordBase: mc.ExecutionRecordBase{ + Phase: int(currentPhase), + InvocationID: composeInvocationID(d.Configuration.TraceGranularity, minuteIndex, invocationIndex), + StartTime: time.Now().UnixNano(), + }, } successfulInvocations++ @@ -351,9 +300,9 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai func (d *Driver) proceedToNextMinute(function *common.Function, minuteIndex *int, invocationIndex *int, startOfMinute *time.Time, skipMinute bool, currentPhase *common.ExperimentPhase, failedInvocationByMinute []int64, previousIATSum *int64) bool { - - if d.Configuration.TraceGranularity == common.MinuteGranularity { - if !isRequestTargetAchieved(function.InvocationStats.Invocations[*minuteIndex], *invocationIndex, common.RequestedVsIssued) { + // TODO: fault check disabled for now; refactor the commented code below + /*if d.Configuration.TraceGranularity == common.MinuteGranularity && !strings.HasSuffix(d.Configuration.LoaderConfiguration.Platform, "-RPS") { + if !isRequestTargetAchieved(function.Specification.PerMinuteCount[*minuteIndex], *invocationIndex, common.RequestedVsIssued) { // Not fatal because we want to keep the measurements to be written to the output file log.Warnf("Relative difference between requested and issued number of invocations is greater than %.2f%%. Terminating function driver for %s!\n", common.RequestedVsIssuedTerminateThreshold*100, function.Name) @@ -361,15 +310,15 @@ func (d *Driver) proceedToNextMinute(function *common.Function, minuteIndex *int } for i := 0; i <= *minuteIndex; i++ { - notFailedCount := function.InvocationStats.Invocations[i] - int(atomic.LoadInt64(&failedInvocationByMinute[i])) - if !isRequestTargetAchieved(function.InvocationStats.Invocations[i], notFailedCount, common.IssuedVsFailed) { + notFailedCount := function.Specification.PerMinuteCount[i] - int(atomic.LoadInt64(&failedInvocationByMinute[i])) + if !isRequestTargetAchieved(function.Specification.PerMinuteCount[i], notFailedCount, common.IssuedVsFailed) { // Not fatal because we want to keep the measurements to be written to the output file log.Warnf("Percentage of failed request is greater than %.2f%%. Terminating function driver for %s!\n", common.FailedTerminateThreshold*100, function.Name) return true } } - } + }*/ *minuteIndex++ *invocationIndex = 0 @@ -458,47 +407,7 @@ func (d *Driver) globalTimekeeper(totalTraceDuration int, signalReady *sync.Wait ticker.Stop() } -func (d *Driver) createGlobalMetricsCollector(filename string, collector chan interface{}, - signalReady *sync.WaitGroup, signalEverythingWritten *sync.WaitGroup, totalIssuedChannel chan int64) { - - // NOTE: totalNumberOfInvocations is initialized to MaxInt64 not to allow collector to complete before - // the end signal is received on totalIssuedChannel, which deliver the total number of issued invocations. - // This number is known once all the individual function drivers finish issuing invocations and - // when all the invocations return - var totalNumberOfInvocations int64 = math.MaxInt64 - var currentlyWritten int64 - - file, err := os.Create(filename) - common.Check(err) - defer file.Close() - signalReady.Done() - - records := make(chan interface{}, 100) - writerDone := sync.WaitGroup{} - writerDone.Add(1) - go d.runCSVWriter(records, filename, &writerDone) - - for { - select { - case record := <-collector: - records <- record - - currentlyWritten++ - case record := <-totalIssuedChannel: - totalNumberOfInvocations = record - } - - if currentlyWritten == totalNumberOfInvocations { - close(records) - writerDone.Wait() - (*signalEverythingWritten).Done() - - return - } - } -} - -func (d *Driver) startBackgroundProcesses(allRecordsWritten *sync.WaitGroup) (*sync.WaitGroup, chan interface{}, chan int64, chan int) { +func (d *Driver) startBackgroundProcesses(allRecordsWritten *sync.WaitGroup) (*sync.WaitGroup, chan *mc.ExecutionRecord, chan int64, chan int) { auxiliaryProcessBarrier := &sync.WaitGroup{} finishCh := make(chan int, 1) @@ -513,7 +422,7 @@ func (d *Driver) startBackgroundProcesses(allRecordsWritten *sync.WaitGroup) (*s auxiliaryProcessBarrier.Add(2) - globalMetricsCollector := make(chan interface{}) + globalMetricsCollector := make(chan *mc.ExecutionRecord) totalIssuedChannel := make(chan int64) go d.createGlobalMetricsCollector(d.outputFilename("duration"), globalMetricsCollector, auxiliaryProcessBarrier, allRecordsWritten, totalIssuedChannel) @@ -523,7 +432,7 @@ func (d *Driver) startBackgroundProcesses(allRecordsWritten *sync.WaitGroup) (*s return auxiliaryProcessBarrier, globalMetricsCollector, totalIssuedChannel, finishCh } -func (d *Driver) internalRun(iatOnly bool, generated bool) { +func (d *Driver) internalRun(generated bool) { var successfulInvocations int64 var failedInvocations int64 var invocationsIssued int64 @@ -536,24 +445,6 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { backgroundProcessesInitializationBarrier, globalMetricsCollector, totalIssuedChannel, scraperFinishCh := d.startBackgroundProcesses(&allRecordsWritten) - if !iatOnly { - log.Info("Generating IAT and runtime specifications for all the functions") - for i, function := range d.Configuration.Functions { - // Equalising all the InvocationStats to the first function - if d.Configuration.LoaderConfiguration.DAGMode { - function.InvocationStats.Invocations = d.Configuration.Functions[0].InvocationStats.Invocations - } - spec := d.SpecificationGenerator.GenerateInvocationData( - function, - d.Configuration.IATDistribution, - d.Configuration.ShiftIAT, - d.Configuration.TraceGranularity, - ) - - d.Configuration.Functions[i].Specification = spec - } - } - backgroundProcessesInitializationBarrier.Wait() if generated { @@ -623,12 +514,17 @@ func (d *Driver) RunExperiment(iatOnly bool, generated bool) { if iatOnly { log.Info("Generating IAT and runtime specifications for all the functions") for i, function := range d.Configuration.Functions { + // Equalising all the InvocationStats to the first function + if d.Configuration.LoaderConfiguration.DAGMode { + function.InvocationStats.Invocations = d.Configuration.Functions[0].InvocationStats.Invocations + } spec := d.SpecificationGenerator.GenerateInvocationData( function, d.Configuration.IATDistribution, d.Configuration.ShiftIAT, d.Configuration.TraceGranularity, ) + d.Configuration.Functions[i].Specification = spec file, _ := json.MarshalIndent(spec, "", " ") @@ -654,7 +550,7 @@ func (d *Driver) RunExperiment(iatOnly bool, generated bool) { go failure.ScheduleFailure(d.Configuration.LoaderConfiguration.Platform, d.Configuration.FailureConfiguration) // Generate load - d.internalRun(iatOnly, generated) + d.internalRun(generated) // Clean up deployer.Clean() diff --git a/pkg/driver/trace_driver_test.go b/pkg/driver/trace_driver_test.go index 61d49808..34085ace 100644 --- a/pkg/driver/trace_driver_test.go +++ b/pkg/driver/trace_driver_test.go @@ -55,6 +55,13 @@ func createFakeLoaderConfiguration() *config.LoaderConfiguration { func createTestDriver() *Driver { cfg := createFakeLoaderConfiguration() + invocationStats := []int{ + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + } + driver := NewDriver(&config.Configuration{ LoaderConfiguration: cfg, IATDistribution: common.Equidistant, @@ -64,12 +71,7 @@ func createTestDriver() *Driver { { Name: "test-function", InvocationStats: &common.FunctionInvocationStats{ - Invocations: []int{ - 5, 5, 5, 5, 5, - 5, 5, 5, 5, 5, - 5, 5, 5, 5, 5, - 5, 5, 5, 5, 5, - }, + Invocations: invocationStats, }, RuntimeStats: &common.FunctionRuntimeStats{ Average: 50, @@ -97,7 +99,7 @@ func createTestDriver() *Driver { Percentile100: 10000, }, Specification: &common.FunctionSpecification{ - RuntimeSpecification: make([][]common.RuntimeSpecification, 1), + PerMinuteCount: invocationStats, }, }, }, @@ -130,7 +132,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { var successCount int64 = 0 var failureCount int64 = 0 - invocationRecordOutputChannel := make(chan interface{}, 1) + invocationRecordOutputChannel := make(chan *metric.ExecutionRecord, 1) announceDone := &sync.WaitGroup{} testDriver := createTestDriver() @@ -149,18 +151,15 @@ func TestInvokeFunctionFromDriver(t *testing.T) { list := list.New() list.PushBack(testDriver.Configuration.Functions[0]) function := list.Front().Value.(*common.Function) - for i := 0; i < len(function.Specification.RuntimeSpecification); i++ { - function.Specification.RuntimeSpecification[i] = make([]common.RuntimeSpecification, 3) - } - function.Specification.RuntimeSpecification[0][2] = common.RuntimeSpecification{ + function.Specification.RuntimeSpecification = []common.RuntimeSpecification{{ Runtime: 1000, Memory: 128, - } + }} metadata := &InvocationMetadata{ RootFunction: list, Phase: common.ExecutionPhase, MinuteIndex: 0, - InvocationIndex: 2, + InvocationIndex: 0, SuccessCount: &successCount, FailedCount: &failureCount, FailedCountByMinute: failureCountByMinute, @@ -169,7 +168,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { } announceDone.Add(1) - testDriver.invokeFunction(metadata) + testDriver.invokeFunction(metadata, 0) switch test.forceFail { case true: @@ -182,7 +181,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { } } - record := (<-invocationRecordOutputChannel).(*metric.ExecutionRecord) + record := <-invocationRecordOutputChannel announceDone.Wait() if record.Phase != int(metadata.Phase) || @@ -197,7 +196,7 @@ func TestDAGInvocation(t *testing.T) { var successCount int64 = 0 var failureCount int64 = 0 var functionsToInvoke int = 4 - invocationRecordOutputChannel := make(chan interface{}, functionsToInvoke) + invocationRecordOutputChannel := make(chan *metric.ExecutionRecord, functionsToInvoke) announceDone := &sync.WaitGroup{} testDriver := createTestDriver() @@ -208,13 +207,10 @@ func TestDAGInvocation(t *testing.T) { function.Endpoint = fmt.Sprintf("%s:%d", address, port) go standard.StartGRPCServer(address, port, standard.TraceFunction, "") - for i := 0; i < len(function.Specification.RuntimeSpecification); i++ { - function.Specification.RuntimeSpecification[i] = make([]common.RuntimeSpecification, 3) - } - function.Specification.RuntimeSpecification[0][2] = common.RuntimeSpecification{ + function.Specification.RuntimeSpecification = []common.RuntimeSpecification{{ Runtime: 1000, Memory: 128, - } + }} for i := 0; i < functionsToInvoke; i++ { function = testDriver.Configuration.Functions[0] list.PushBack(function) @@ -226,7 +222,7 @@ func TestDAGInvocation(t *testing.T) { RootFunction: list, Phase: common.ExecutionPhase, MinuteIndex: 0, - InvocationIndex: 2, + InvocationIndex: 0, SuccessCount: &successCount, FailedCount: &failureCount, FailedCountByMinute: failureCountByMinute, @@ -235,12 +231,12 @@ func TestDAGInvocation(t *testing.T) { } announceDone.Add(1) - testDriver.invokeFunction(metadata) - if !(successCount == 1 && failureCount == 0) { + testDriver.invokeFunction(metadata, 0) + if !(successCount == 4 && failureCount == 0) { t.Error("The DAG invocation has failed.") } for i := 0; i < functionsToInvoke; i++ { - record := (<-invocationRecordOutputChannel).(*metric.ExecutionRecord) + record := <-invocationRecordOutputChannel if record.Phase != int(metadata.Phase) || record.InvocationID != composeInvocationID(common.MinuteGranularity, metadata.MinuteIndex, metadata.InvocationIndex) { @@ -251,7 +247,7 @@ func TestDAGInvocation(t *testing.T) { func TestGlobalMetricsCollector(t *testing.T) { driver := createTestDriver() - inputChannel := make(chan interface{}) + inputChannel := make(chan *metric.ExecutionRecord) totalIssuedChannel := make(chan int64) collectorReady, collectorFinished := &sync.WaitGroup{}, &sync.WaitGroup{} @@ -337,33 +333,39 @@ func TestDriverBackgroundProcesses(t *testing.T) { func TestDriverCompletely(t *testing.T) { tests := []struct { - testName string - withWarmup bool - secondGranularity bool + testName string + withWarmup bool + secondGranularity bool + expectedInvocations int }{ { - testName: "without_warmup", - withWarmup: false, + testName: "without_warmup", + withWarmup: false, + expectedInvocations: 5, }, { - testName: "with_warmup", - withWarmup: true, + testName: "with_warmup", + withWarmup: true, + expectedInvocations: 10, }, { - testName: "without_warmup_second_granularity", - withWarmup: false, - secondGranularity: true, + testName: "without_warmup_second_granularity", + withWarmup: false, + secondGranularity: true, + expectedInvocations: 6, }, { - testName: "with_warmup_second_granularity", - withWarmup: true, - secondGranularity: true, + testName: "with_warmup_second_granularity", + withWarmup: true, + secondGranularity: true, + expectedInvocations: 12, }, } for _, test := range tests { t.Run(test.testName, func(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) + logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: time.StampMilli, FullTimestamp: true}) driver := createTestDriver() if test.withWarmup { @@ -411,17 +413,13 @@ func TestDriverCompletely(t *testing.T) { diff := (records[i+1].StartTime - records[i].StartTime) / 1_000_000 // ms if diff > clockTolerance { - t.Error("Too big clock drift for the test to pass.") + t.Errorf("Too big clock drift for the test to pass - %d.", diff) } } } - expectedInvocations := 5 - if test.withWarmup { - expectedInvocations = 10 - } - - if !(successfulInvocation == expectedInvocations && failedInvocations == 0) { + expectedInvocations := test.expectedInvocations + if !(successfulInvocation >= expectedInvocations && failedInvocations == 0) { t.Error("Number of successful and failed invocations do not match.") } }) @@ -470,6 +468,9 @@ func TestProceedToNextMinute(t *testing.T) { InvocationStats: &common.FunctionInvocationStats{ Invocations: []int{100, 100, 100, 100, 100}, }, + Specification: &common.FunctionSpecification{ + PerMinuteCount: []int{100, 100, 100, 100, 100}, + }, } tests := []struct { @@ -508,6 +509,10 @@ func TestProceedToNextMinute(t *testing.T) { for _, test := range tests { t.Run(test.testName, func(t *testing.T) { + if test.toBreak { + t.Skip("This feature has been turned off - see commented code in `proceedToNextMinute`") + } + driver := createTestDriver() minuteIndex := test.minuteIndex diff --git a/pkg/generator/rps.go b/pkg/generator/rps.go new file mode 100644 index 00000000..7be964b5 --- /dev/null +++ b/pkg/generator/rps.go @@ -0,0 +1,157 @@ +package generator + +import ( + "fmt" + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/config" + "math" + "math/rand" +) + +func generateFunctionByRPS(experimentDuration int, rpsTarget float64) (common.IATArray, []int) { + iat := 1000000.0 / float64(rpsTarget) // μs + + var iatResult []float64 + var countResult []int + + duration := 0.0 // μs + totalExperimentDurationMs := float64(experimentDuration * 60_000_000.0) + + currentMinute := 0 + currentCount := 0 + + for duration < totalExperimentDurationMs { + iatResult = append(iatResult, iat) + duration += iat + currentCount++ + + // count the number of invocations in minute + if int(duration)/60_000_000 != currentMinute { + countResult = append(countResult, currentCount) + + currentMinute++ + currentCount = 0 + } + } + + return iatResult, countResult +} + +func generateFunctionByRPSWithOffset(experimentDuration int, rpsTarget float64, offset float64) (common.IATArray, []int) { + iat, count := generateFunctionByRPS(experimentDuration, rpsTarget) + iat[0] += offset + + return iat, count +} + +func GenerateWarmStartFunction(experimentDuration int, rpsTarget float64) (common.IATArray, []int) { + if rpsTarget == 0 { + return nil, nil + } + + return generateFunctionByRPS(experimentDuration, rpsTarget) +} + +func GenerateColdStartFunctions(experimentDuration int, rpsTarget float64, cooldownSeconds int) ([]common.IATArray, [][]int) { + iat := 1000000.0 / float64(rpsTarget) // ms + totalFunctions := int(math.Ceil(rpsTarget * float64(cooldownSeconds))) + + var functions []common.IATArray + var countResult [][]int + + for i := 0; i < totalFunctions; i++ { + offsetWithinBatch := 0 + if rpsTarget >= 1 { + offsetWithinBatch = int(float64(i%int(rpsTarget)) * iat) + } + + offsetBetweenFunctions := int(float64(i)/rpsTarget) * 1_000_000 + offset := offsetWithinBatch + offsetBetweenFunctions + + var fx common.IATArray + var count []int + if rpsTarget >= 1 { + fx, count = generateFunctionByRPSWithOffset(experimentDuration, 1/float64(cooldownSeconds), float64(offset)) + } else { + fx, count = generateFunctionByRPSWithOffset(experimentDuration, 1/(float64(totalFunctions)/rpsTarget), float64(offset)) + } + + functions = append(functions, fx) + countResult = append(countResult, count) + } + + return functions, countResult +} + +func CreateRPSFunctions(cfg *config.LoaderConfiguration, warmFunction common.IATArray, warmFunctionCount []int, + coldFunctions []common.IATArray, coldFunctionCount [][]int) []*common.Function { + var result []*common.Function + + busyLoopFor := ComputeBusyLoopPeriod(cfg.RpsMemoryMB) + + if warmFunction != nil || warmFunctionCount != nil { + result = append(result, &common.Function{ + Name: fmt.Sprintf("warm-function-%d", rand.Int()), + + InvocationStats: &common.FunctionInvocationStats{Invocations: warmFunctionCount}, + MemoryStats: &common.FunctionMemoryStats{Percentile100: float64(cfg.RpsMemoryMB)}, + DirigentMetadata: &common.DirigentMetadata{ + Image: cfg.RpsImage, + Port: 80, + Protocol: "tcp", + ScalingUpperBound: 1024, + ScalingLowerBound: 1, + IterationMultiplier: cfg.RpsIterationMultiplier, + IOPercentage: 0, + }, + + Specification: &common.FunctionSpecification{ + IAT: warmFunction, + PerMinuteCount: warmFunctionCount, + RuntimeSpecification: createRuntimeSpecification(len(warmFunction), cfg.RpsRuntimeMs, cfg.RpsMemoryMB), + }, + + ColdStartBusyLoopMs: busyLoopFor, + }) + } + + for i := 0; i < len(coldFunctions); i++ { + result = append(result, &common.Function{ + Name: fmt.Sprintf("cold-function-%d-%d", i, rand.Int()), + + InvocationStats: &common.FunctionInvocationStats{Invocations: coldFunctionCount[i]}, + MemoryStats: &common.FunctionMemoryStats{Percentile100: float64(cfg.RpsMemoryMB)}, + DirigentMetadata: &common.DirigentMetadata{ + Image: cfg.RpsImage, + Port: 80, + Protocol: "tcp", + ScalingUpperBound: 1, + ScalingLowerBound: 0, + IterationMultiplier: cfg.RpsIterationMultiplier, + IOPercentage: 0, + }, + + Specification: &common.FunctionSpecification{ + IAT: coldFunctions[i], + PerMinuteCount: coldFunctionCount[i], + RuntimeSpecification: createRuntimeSpecification(len(coldFunctions[i]), cfg.RpsRuntimeMs, cfg.RpsMemoryMB), + }, + + ColdStartBusyLoopMs: busyLoopFor, + }) + } + + return result +} + +func createRuntimeSpecification(count int, runtime, memory int) common.RuntimeSpecificationArray { + var result common.RuntimeSpecificationArray + for i := 0; i < count; i++ { + result = append(result, common.RuntimeSpecification{ + Runtime: runtime, + Memory: memory, + }) + } + + return result +} diff --git a/pkg/generator/rps_test.go b/pkg/generator/rps_test.go new file mode 100644 index 00000000..3b719b36 --- /dev/null +++ b/pkg/generator/rps_test.go @@ -0,0 +1,376 @@ +package generator + +import ( + "github.com/vhive-serverless/loader/pkg/common" + "math" + "testing" +) + +func TestWarmStartMatrix(t *testing.T) { + tests := []struct { + testName string + experimentDuration int + rpsTarget float64 + expectedIAT common.IATArray + expectedCount []int + }{ + { + testName: "2min_1rps", + experimentDuration: 2, + rpsTarget: 1, + expectedIAT: []float64{ + // minute 1 + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + // minute 2 + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, 1_000_000, + }, + expectedCount: []int{60, 60}, + }, + { + testName: "2min_0.5rps", + experimentDuration: 2, + rpsTarget: 0.5, + expectedIAT: []float64{ + // minute 1 + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + // minute 2 + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + 2_000_000, 2_000_000, 2_000_000, 2_000_000, 2_000_000, + }, + expectedCount: []int{30, 30}, + }, + { + testName: "2min_0.125rps", + experimentDuration: 2, + rpsTarget: 0.125, + expectedIAT: []float64{ + // minute 1 + 8_000_000, 8_000_000, 8_000_000, 8_000_000, 8_000_000, 8_000_000, 8_000_000, 8_000_000, + // minute 2 + 8_000_000, 8_000_000, 8_000_000, 8_000_000, 8_000_000, 8_000_000, 8_000_000, + }, + expectedCount: []int{8, 7}, + }, + } + + epsilon := 0.01 + + for _, test := range tests { + t.Run("warm_start"+test.testName, func(t *testing.T) { + matrix, minuteCount := GenerateWarmStartFunction(test.experimentDuration, test.rpsTarget) + + if len(matrix) != len(test.expectedIAT) { + t.Errorf("Unexpected IAT array size - got: %d, expected: %d", len(matrix), len(test.expectedIAT)) + } + if len(minuteCount) != len(test.expectedCount) { + t.Errorf("Unexpected count array size - got: %d, expected: %d", len(minuteCount), len(test.expectedCount)) + } + + sum := 0.0 + count := 0 + currentMinute := 0 + + for i := 0; i < len(matrix); i++ { + if math.Abs(matrix[i]-test.expectedIAT[i]) > epsilon { + t.Error("Unexpected IAT value.") + } + + sum += matrix[i] + count++ + + if int(sum/60_000_000) != currentMinute { + if count != test.expectedCount[currentMinute] { + t.Error("Unexpected count array value.") + } + + currentMinute = int(sum / 60_000_000) + count = 0 + } + } + }) + } +} + +func TestColdStartMatrix(t *testing.T) { + tests := []struct { + testName string + experimentDuration int + rpsTarget float64 + cooldownSeconds int + expectedIAT []common.IATArray + expectedCount [][]int + }{ + { + testName: "2min_1rps", + experimentDuration: 2, + rpsTarget: 1, + cooldownSeconds: 10, + expectedIAT: []common.IATArray{ + {10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {11_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {12_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {13_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {14_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {15_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {16_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {17_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {18_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {19_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + }, + expectedCount: [][]int{ + {6, 6}, + {6, 6}, + {6, 6}, + {6, 6}, + {6, 6}, + {6, 6}, + {6, 6}, + {6, 6}, + {6, 6}, + {6, 6}, + }, + }, + { + testName: "1min_0.25rps", + experimentDuration: 1, + rpsTarget: 0.25, + cooldownSeconds: 10, + expectedIAT: []common.IATArray{ + {12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + {16_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + {20_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + }, + expectedCount: [][]int{ + {5}, + {5}, + {5}, + }, + }, + { + testName: "2min_0.25rps", + experimentDuration: 2, + rpsTarget: 0.25, + cooldownSeconds: 10, + expectedIAT: []common.IATArray{ + {12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + {16_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + {20_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + }, + expectedCount: [][]int{ + {5, 5}, + {5, 5}, + {5, 5}, + }, + }, + { + testName: "1min_0.33rps", + experimentDuration: 1, + rpsTarget: 1.0 / 3, + cooldownSeconds: 10, + expectedIAT: []common.IATArray{ + {12_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + {15_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + {18_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + {21_000_000, 12_000_000, 12_000_000, 12_000_000, 12_000_000}, + }, + expectedCount: [][]int{ + {5}, + {5}, + {5}, + {5}, + }, + }, + { + testName: "1min_5rps", + experimentDuration: 1, + rpsTarget: 5, + cooldownSeconds: 10, + expectedIAT: []common.IATArray{ + {10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {10_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {10_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {10_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {10_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {11_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {11_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {11_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {11_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {11_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {12_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {12_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {12_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {12_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {12_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {13_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {13_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {13_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {13_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {13_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {14_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {14_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {14_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {14_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {14_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {15_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {15_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {15_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {15_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {15_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {16_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {16_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {16_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {16_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {16_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {17_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {17_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {17_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {17_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {17_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {18_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {18_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {18_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {18_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {18_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + + {19_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {19_200_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {19_400_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {19_600_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + {19_800_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000, 10_000_000}, + }, + expectedCount: [][]int{ + {6}, {6}, {6}, {6}, {6}, + {6}, {6}, {6}, {6}, {6}, + {6}, {6}, {6}, {6}, {6}, + {6}, {6}, {6}, {6}, {6}, + {6}, {6}, {6}, {6}, {6}, + + {6}, {6}, {6}, {6}, {6}, + {6}, {6}, {6}, {6}, {6}, + {6}, {6}, {6}, {6}, {6}, + {6}, {6}, {6}, {6}, {6}, + {6}, {6}, {6}, {6}, {6}, + }, + }, + { + testName: "1min_5rps_cooldown5s", + experimentDuration: 1, + rpsTarget: 5, + cooldownSeconds: 5, + expectedIAT: []common.IATArray{ + {5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {5_200_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {5_400_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {5_600_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {5_800_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + + {6_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {6_200_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {6_400_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {6_600_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {6_800_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + + {7_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {7_200_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {7_400_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {7_600_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {7_800_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + + {8_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {8_200_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {8_400_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {8_600_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {8_800_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + + {9_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {9_200_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {9_400_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {9_600_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + {9_800_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000, 5_000_000}, + }, + expectedCount: [][]int{ + {12}, {12}, {12}, {12}, {12}, + {12}, {12}, {12}, {12}, {12}, + {12}, {12}, {12}, {12}, {12}, + {12}, {12}, {12}, {12}, {12}, + {12}, {12}, {12}, {12}, {12}, + }, + }, + } + + epsilon := 0.01 + + for _, test := range tests { + t.Run("cold_start_"+test.testName, func(t *testing.T) { + matrix, minuteCounts := GenerateColdStartFunctions(test.experimentDuration, test.rpsTarget, test.cooldownSeconds) + + if len(matrix) != len(test.expectedIAT) { + t.Errorf("Unexpected number of functions - got: %d, expected: %d", len(matrix), len(test.expectedIAT)) + } + if len(minuteCounts) != len(test.expectedCount) { + t.Errorf("Unexpected count array size - got: %d, expected: %d", len(minuteCounts), len(test.expectedCount)) + } + + for fIndex := 0; fIndex < len(matrix); fIndex++ { + sum := 0.0 + count := 0 + currentMinute := 0 + + if len(matrix[fIndex]) != len(test.expectedIAT[fIndex]) { + t.Errorf("Unexpected length of function %d IAT array - got: %d, expected: %d", fIndex, len(matrix[fIndex]), len(test.expectedIAT[fIndex])) + } + + for i := 0; i < len(matrix[fIndex]); i++ { + if math.Abs(matrix[fIndex][i]-test.expectedIAT[fIndex][i]) > epsilon { + t.Errorf("Unexpected value fx %d val %d - got: %f; expected: %f", fIndex, i, matrix[fIndex][i], test.expectedIAT[fIndex][i]) + } + + if currentMinute > len(test.expectedCount[fIndex]) { + t.Errorf("Invalid expected count array size for function with index %d", fIndex) + } + + if matrix[fIndex][i] >= 0 { + sum += matrix[fIndex][i] + } + count++ + + if int(sum/60_000_000) != currentMinute { + if count != test.expectedCount[fIndex][currentMinute] { + t.Errorf("Unexpected count array value fx %d; min %d - got: %d; expected: %d", fIndex, currentMinute, count, test.expectedCount[fIndex][currentMinute]) + } + + currentMinute = int(sum / 60_000_000) + count = 0 + } + } + } + }) + } +} diff --git a/pkg/generator/specification.go b/pkg/generator/specification.go index fb2bf84b..d6601e4d 100644 --- a/pkg/generator/specification.go +++ b/pkg/generator/specification.go @@ -48,15 +48,23 @@ func NewSpecificationGenerator(seed int64) *SpecificationGenerator { ////////////////////////////////////////////////// // generateIATPerGranularity generates IAT for one minute based on given number of invocations and the given distribution -func (s *SpecificationGenerator) generateIATPerGranularity(numberOfInvocations int, iatDistribution common.IatDistribution, shiftIAT bool, granularity common.TraceGranularity) ([]float64, float64) { +func (s *SpecificationGenerator) generateIATPerGranularity(minuteIndex int, numberOfInvocations int, iatDistribution common.IatDistribution, shiftIAT bool, granularity common.TraceGranularity) ([]float64, float64) { if numberOfInvocations == 0 { return []float64{}, 0.0 } var iatResult []float64 + + endIndex := numberOfInvocations totalDuration := 0.0 // total non-scaled duration - for i := 0; i < numberOfInvocations; i++ { + if minuteIndex == 0 { + iatResult = []float64{0.0} + endIndex = numberOfInvocations - 1 + } + + // -1 because the first invocation happens at the beginning of minute + for i := 0; i < endIndex; i++ { var iat float64 switch iatDistribution { @@ -118,51 +126,50 @@ func (s *SpecificationGenerator) generateIATPerGranularity(numberOfInvocations i finalIAT := append([]float64{beginningIAT}, iatResult[i+1:]...) finalIAT = append(finalIAT, iatResult[:i]...) iatResult = append(finalIAT, endIAT) - } else { - iatResult = append([]float64{0.0}, iatResult...) } return iatResult, totalDuration } // GenerateIAT generates IAT according to the given distribution. Number of minutes is the length of invocationsPerMinute array -func (s *SpecificationGenerator) generateIAT(invocationsPerMinute []int, iatDistribution common.IatDistribution, shiftIAT bool, granularity common.TraceGranularity) (common.IATMatrix, common.ProbabilisticDuration) { - var IAT [][]float64 +func (s *SpecificationGenerator) generateIAT(invocationsPerMinute []int, iatDistribution common.IatDistribution, + shiftIAT bool, granularity common.TraceGranularity) (common.IATArray, []int, common.ProbabilisticDuration) { + + var IAT []float64 + var perMinuteCount []int var nonScaledDuration []float64 numberOfMinutes := len(invocationsPerMinute) for i := 0; i < numberOfMinutes; i++ { - minuteIAT, duration := s.generateIATPerGranularity(invocationsPerMinute[i], iatDistribution, shiftIAT, granularity) + minuteIAT, duration := s.generateIATPerGranularity(i, invocationsPerMinute[i], iatDistribution, shiftIAT, granularity) - IAT = append(IAT, minuteIAT) + IAT = append(IAT, minuteIAT...) + perMinuteCount = append(perMinuteCount, len(minuteIAT)) nonScaledDuration = append(nonScaledDuration, duration) } - return IAT, nonScaledDuration + return IAT, perMinuteCount, nonScaledDuration } func (s *SpecificationGenerator) GenerateInvocationData(function *common.Function, iatDistribution common.IatDistribution, shiftIAT bool, granularity common.TraceGranularity) *common.FunctionSpecification { invocationsPerMinute := function.InvocationStats.Invocations // Generating IAT - iat, rawDuration := s.generateIAT(invocationsPerMinute, iatDistribution, shiftIAT, granularity) + iat, perMinuteCount, rawDuration := s.generateIAT(invocationsPerMinute, iatDistribution, shiftIAT, granularity) // Generating runtime specifications - var runtimeMatrix common.RuntimeSpecificationMatrix - for i := 0; i < len(invocationsPerMinute); i++ { - var row []common.RuntimeSpecification - - for j := 0; j < invocationsPerMinute[i]; j++ { - row = append(row, s.generateExecutionSpecs(function)) + var runtimeArray common.RuntimeSpecificationArray + for i := 0; i < len(perMinuteCount); i++ { + for j := 0; j < perMinuteCount[i]; j++ { + runtimeArray = append(runtimeArray, s.generateExecutionSpecs(function)) } - - runtimeMatrix = append(runtimeMatrix, row) } return &common.FunctionSpecification{ IAT: iat, + PerMinuteCount: perMinuteCount, RawDuration: rawDuration, - RuntimeSpecification: runtimeMatrix, + RuntimeSpecification: runtimeArray, } } @@ -171,7 +178,7 @@ func (s *SpecificationGenerator) GenerateInvocationData(function *common.Functio ////////////////////////////////////////////////// // Choose a random number in between. Not thread safe. -func (s *SpecificationGenerator) randIntBetween(min, max float64) int { +func randIntBetween(gen *rand.Rand, min, max float64) int { intMin, intMax := int(min), int(max) if intMax < intMin { @@ -181,7 +188,7 @@ func (s *SpecificationGenerator) randIntBetween(min, max float64) int { if intMax == intMin { return intMin } else { - return s.specRand.Intn(intMax-intMin) + intMin + return gen.Intn(intMax-intMin) + intMin } } @@ -194,47 +201,47 @@ func (s *SpecificationGenerator) determineExecutionSpecSeedQuantiles() (float64, return runQtl, memQtl } -// Should be called only when specRand is locked with its mutex -func (s *SpecificationGenerator) generateExecuteSpec(runQtl float64, runStats *common.FunctionRuntimeStats) (runtime int) { +// GenerateExecuteSpec is not thread safe as it could cause non-repeatable spec generation +func GenerateExecuteSpec(gen *rand.Rand, runQtl float64, runStats *common.FunctionRuntimeStats) (runtime int) { switch { case runQtl == 0: runtime = int(runStats.Percentile0) case runQtl <= 0.01: - runtime = s.randIntBetween(runStats.Percentile0, runStats.Percentile1) + runtime = randIntBetween(gen, runStats.Percentile0, runStats.Percentile1) case runQtl <= 0.25: - runtime = s.randIntBetween(runStats.Percentile1, runStats.Percentile25) + runtime = randIntBetween(gen, runStats.Percentile1, runStats.Percentile25) case runQtl <= 0.50: - runtime = s.randIntBetween(runStats.Percentile25, runStats.Percentile50) + runtime = randIntBetween(gen, runStats.Percentile25, runStats.Percentile50) case runQtl <= 0.75: - runtime = s.randIntBetween(runStats.Percentile50, runStats.Percentile75) + runtime = randIntBetween(gen, runStats.Percentile50, runStats.Percentile75) case runQtl <= 0.99: - runtime = s.randIntBetween(runStats.Percentile75, runStats.Percentile99) + runtime = randIntBetween(gen, runStats.Percentile75, runStats.Percentile99) case runQtl < 1: - runtime = s.randIntBetween(runStats.Percentile99, runStats.Percentile100) + runtime = randIntBetween(gen, runStats.Percentile99, runStats.Percentile100) } return runtime } -// Should be called only when specRand is locked with its mutex -func (s *SpecificationGenerator) generateMemorySpec(memQtl float64, memStats *common.FunctionMemoryStats) (memory int) { +// GenerateMemorySpec is not thread safe as it could cause non-repeatable spec generation +func GenerateMemorySpec(gen *rand.Rand, memQtl float64, memStats *common.FunctionMemoryStats) (memory int) { switch { case memQtl <= 0.01: memory = int(memStats.Percentile1) case memQtl <= 0.05: - memory = s.randIntBetween(memStats.Percentile1, memStats.Percentile5) + memory = randIntBetween(gen, memStats.Percentile1, memStats.Percentile5) case memQtl <= 0.25: - memory = s.randIntBetween(memStats.Percentile5, memStats.Percentile25) + memory = randIntBetween(gen, memStats.Percentile5, memStats.Percentile25) case memQtl <= 0.50: - memory = s.randIntBetween(memStats.Percentile25, memStats.Percentile50) + memory = randIntBetween(gen, memStats.Percentile25, memStats.Percentile50) case memQtl <= 0.75: - memory = s.randIntBetween(memStats.Percentile50, memStats.Percentile75) + memory = randIntBetween(gen, memStats.Percentile50, memStats.Percentile75) case memQtl <= 0.95: - memory = s.randIntBetween(memStats.Percentile75, memStats.Percentile95) + memory = randIntBetween(gen, memStats.Percentile75, memStats.Percentile95) case memQtl <= 0.99: - memory = s.randIntBetween(memStats.Percentile95, memStats.Percentile99) + memory = randIntBetween(gen, memStats.Percentile95, memStats.Percentile99) case memQtl < 1: - memory = s.randIntBetween(memStats.Percentile99, memStats.Percentile100) + memory = randIntBetween(gen, memStats.Percentile99, memStats.Percentile100) } return memory @@ -247,8 +254,8 @@ func (s *SpecificationGenerator) generateExecutionSpecs(function *common.Functio } runQtl, memQtl := s.determineExecutionSpecSeedQuantiles() - runtime := common.MinOf(common.MaxExecTimeMilli, common.MaxOf(common.MinExecTimeMilli, s.generateExecuteSpec(runQtl, runStats))) - memory := common.MinOf(common.MaxMemQuotaMib, common.MaxOf(common.MinMemQuotaMib, s.generateMemorySpec(memQtl, memStats))) + runtime := common.MinOf(common.MaxExecTimeMilli, common.MaxOf(common.MinExecTimeMilli, GenerateExecuteSpec(s.specRand, runQtl, runStats))) + memory := common.MinOf(common.MaxMemQuotaMib, common.MaxOf(common.MinMemQuotaMib, GenerateMemorySpec(s.specRand, memQtl, memStats))) return common.RuntimeSpecification{ Runtime: runtime, diff --git a/pkg/generator/specification_test.go b/pkg/generator/specification_test.go index 1ed65a55..72784619 100644 --- a/pkg/generator/specification_test.go +++ b/pkg/generator/specification_test.go @@ -78,7 +78,7 @@ func TestSerialGenerateIAT(t *testing.T) { iatDistribution common.IatDistribution shiftIAT bool granularity common.TraceGranularity - expectedPoints [][]float64 // μs + expectedPoints []float64 // μs testDistribution bool }{ { @@ -87,7 +87,7 @@ func TestSerialGenerateIAT(t *testing.T) { iatDistribution: common.Equidistant, shiftIAT: false, granularity: common.MinuteGranularity, - expectedPoints: [][]float64{}, + expectedPoints: []float64{}, testDistribution: false, }, { @@ -96,16 +96,16 @@ func TestSerialGenerateIAT(t *testing.T) { iatDistribution: common.Exponential, shiftIAT: false, granularity: common.MinuteGranularity, - expectedPoints: [][]float64{}, + expectedPoints: []float64{}, testDistribution: false, }, { testName: "no_invocations_exponential_shift", invocations: []int{5}, iatDistribution: common.Exponential, - shiftIAT: true, + shiftIAT: false, granularity: common.MinuteGranularity, - expectedPoints: [][]float64{}, + expectedPoints: []float64{}, testDistribution: false, }, { @@ -114,16 +114,16 @@ func TestSerialGenerateIAT(t *testing.T) { iatDistribution: common.Exponential, shiftIAT: false, granularity: common.MinuteGranularity, - expectedPoints: [][]float64{{0, 60000000}}, + expectedPoints: []float64{0}, testDistribution: false, }, { testName: "one_invocations_exponential_shift", invocations: []int{1}, iatDistribution: common.Exponential, - shiftIAT: true, + shiftIAT: false, granularity: common.MinuteGranularity, - expectedPoints: [][]float64{{11689078.788397, 48310921.211603}}, + expectedPoints: []float64{0}, testDistribution: false, }, { @@ -132,15 +132,12 @@ func TestSerialGenerateIAT(t *testing.T) { iatDistribution: common.Equidistant, shiftIAT: false, granularity: common.MinuteGranularity, - expectedPoints: [][]float64{ - { - 0, - 12000000, - 12000000, - 12000000, - 12000000, - 12000000, - }, + expectedPoints: []float64{ + 0, + 12000000, + 12000000, + 12000000, + 12000000, }, testDistribution: false, }, @@ -150,90 +147,37 @@ func TestSerialGenerateIAT(t *testing.T) { iatDistribution: common.Equidistant, shiftIAT: false, granularity: common.MinuteGranularity, - expectedPoints: [][]float64{ - { - // min 1 - 0, - 12000000, - 12000000, - 12000000, - 12000000, - 12000000, - }, - { - // min 2 - 0, - 12000000, - 12000000, - 12000000, - 12000000, - 12000000, - }, - { - // min 3 - 0, - 12000000, - 12000000, - 12000000, - 12000000, - 12000000, - }, - { - // min 4 - 0, - 12000000, - 12000000, - 12000000, - 12000000, - 12000000, - }, - { - // min 5 - 0, - 12000000, - 12000000, - 12000000, - 12000000, - 12000000, - }, - }, - testDistribution: false, - }, - { - testName: "1min_25ipm_uniform_shift", - invocations: []int{25}, - iatDistribution: common.Uniform, - shiftIAT: true, - granularity: common.MinuteGranularity, - expectedPoints: [][]float64{ - { - 1193000.964808, - 622524.819620, - 2161625.000293, - 2467158.610498, - 3161216.965226, - 120925.338482, - 3461650.068734, - 3681772.563419, - 3591929.298027, - 3062124.611863, - 3223056.707367, - 3042558.740794, - 2099765.805752, - 375008.683565, - 3979289.345154, - 1636869.797787, - 1169442.102841, - 2380243.616007, - 2453428.612640, - 1704231.066313, - 42074.939233, - 3115643.026141, - 3460047.444726, - 2849475.331077, - 3187546.011741, - 1757390.527891, - }, + expectedPoints: []float64{ + // min 1 + 0, + 12000000, + 12000000, + 12000000, + 12000000, + // min 2 + 12000000, + 12000000, + 12000000, + 12000000, + 12000000, + // min 3 + 12000000, + 12000000, + 12000000, + 12000000, + 12000000, + // min 4 + 12000000, + 12000000, + 12000000, + 12000000, + 12000000, + // min 5 + 12000000, + 12000000, + 12000000, + 12000000, + 12000000, }, testDistribution: false, }, @@ -246,82 +190,6 @@ func TestSerialGenerateIAT(t *testing.T) { expectedPoints: nil, testDistribution: true, }, - { - testName: "1min_25ipm_exponential", - invocations: []int{25}, - iatDistribution: common.Exponential, - shiftIAT: false, - granularity: common.MinuteGranularity, - expectedPoints: [][]float64{ - { - 0, - 1311929.341329, - 3685871.430916, - 1626476.996595, - 556382.014270, - 30703.105102, - 3988584.779392, - 2092271.836277, - 1489855.293253, - 3025094.199801, - 2366337.4678820, - 40667.5994150, - 2778945.4898700, - 4201722.5747150, - 5339421.1460450, - 3362048.1584080, - 939526.5236740, - 1113771.3822940, - 4439636.5676460, - 4623026.1098310, - 2082985.6557600, - 45937.1189860, - 4542253.8756200, - 2264414.9939920, - 3872560.8680640, - 179575.4708620, - }, - }, - testDistribution: false, - }, - { - testName: "1min_25ipm_exponential_shift", - invocations: []int{25}, - iatDistribution: common.Exponential, - shiftIAT: true, - granularity: common.MinuteGranularity, - expectedPoints: [][]float64{ - { - 697544.471476, - 5339421.146045, - 3362048.158408, - 939526.523674, - 1113771.382294, - 4439636.567646, - 4623026.109831, - 2082985.655760, - 45937.118986, - 4542253.875620, - 2264414.993992, - 3872560.868064, - 179575.470862, - 1311929.341329, - 3685871.430916, - 1626476.996595, - 556382.014270, - 30703.105102, - 3988584.779392, - 2092271.836277, - 1489855.293253, - 3025094.199801, - 2366337.4678820, - 40667.5994150, - 2778945.4898700, - 3504178.103239, - }, - }, - testDistribution: false, - }, { testName: "1min_1000000ipm_exponential", invocations: []int{1000000}, @@ -337,30 +205,21 @@ func TestSerialGenerateIAT(t *testing.T) { iatDistribution: common.Equidistant, shiftIAT: false, granularity: common.SecondGranularity, - expectedPoints: [][]float64{ - { - // second 1 - 0, - 200000, - 200000, - 200000, - 200000, - 200000, - }, - { - // second 2 - 0, - 250000, - 250000, - 250000, - 250000, - }, - { - // second 3 - 0, - 500000, - 500000, - }, + expectedPoints: []float64{ + // second 1 - μs below + 0, + 200000, + 200000, + 200000, + 200000, + // second 2 - μs below + 250000, + 250000, + 250000, + 250000, + // second 3 - μs below + 500000, + 500000, }, testDistribution: false, }, @@ -371,35 +230,32 @@ func TestSerialGenerateIAT(t *testing.T) { for _, test := range tests { t.Run(test.testName, func(t *testing.T) { - log.SetLevel(log.TraceLevel) - sg := NewSpecificationGenerator(seed) testFunction.InvocationStats = &common.FunctionInvocationStats{Invocations: test.invocations} spec := sg.GenerateInvocationData(&testFunction, test.iatDistribution, test.shiftIAT, test.granularity) - IAT, nonScaledDuration := spec.IAT, spec.RawDuration + IAT, perMinuteCount, nonScaledDuration := spec.IAT, spec.PerMinuteCount, spec.RawDuration failed := false - if hasSpillover(IAT, test.granularity) { + /*if hasSpillover(IAT, perMinuteCount, test.granularity) { t.Error("Generated IAT does not fit in the within the minute time window.") - } + }*/ if test.expectedPoints != nil { - for min := 0; min < len(test.expectedPoints); min++ { - for i := 0; i < len(test.expectedPoints[min]); i++ { - if len(test.expectedPoints[min]) != len(IAT[min]) { - log.Debug(fmt.Sprintf("wrong number of IATs in the minute, got: %d, expected: %d\n", len(IAT[min]), len(test.expectedPoints[min]))) - - failed = true - break - } - if math.Abs(IAT[min][i]-test.expectedPoints[min][i]) > epsilon { - log.Debug(fmt.Sprintf("got: %f, expected: %f\n", IAT[min][i], test.expectedPoints[min][i])) - - failed = true - // no break statement for debugging purpose - } + for i := 0; i < len(test.expectedPoints); i++ { + if len(test.expectedPoints) != len(IAT) { + log.Debug(fmt.Sprintf("wrong number of IATs in the minute, got: %d, expected: %d\n", len(IAT), len(test.expectedPoints))) + + failed = true + break + } + + if math.Abs(IAT[i]-test.expectedPoints[i]) > epsilon { + log.Debug(fmt.Sprintf("got: %f, expected: %f\n", IAT[i], test.expectedPoints[i])) + + failed = true + // no break statement for debugging purpose } } @@ -409,7 +265,7 @@ func TestSerialGenerateIAT(t *testing.T) { } if test.testDistribution && test.iatDistribution != common.Equidistant && - !checkDistribution(IAT, nonScaledDuration, test.iatDistribution) { + !checkDistribution(IAT, perMinuteCount, nonScaledDuration, test.iatDistribution) { t.Error("The provided sample does not satisfy the given distribution.") } @@ -417,13 +273,21 @@ func TestSerialGenerateIAT(t *testing.T) { } } -func hasSpillover(data [][]float64, granularity common.TraceGranularity) bool { - for min := 0; min < len(data); min++ { +/*func hasSpillover(data []float64, perMinuteCount []int, granularity common.TraceGranularity) bool { + beginIndex := 0 + endIndex := perMinuteCount[0] + + for min := 0; min < len(perMinuteCount); min++ { sum := 0.0 epsilon := 1e-3 - for i := 0; i < len(data[min]); i++ { - sum += data[min][i] + for i := beginIndex; i < endIndex; i++ { + sum += data[i] + } + + if min+1 < len(perMinuteCount) { + beginIndex += perMinuteCount[min] + endIndex = beginIndex + perMinuteCount[min+1] } log.Debug(fmt.Sprintf("Total execution time: %f μs\n", sum)) @@ -439,9 +303,9 @@ func hasSpillover(data [][]float64, granularity common.TraceGranularity) bool { } return false -} +}*/ -func checkDistribution(data [][]float64, nonScaledDuration []float64, distribution common.IatDistribution) bool { +func checkDistribution(data []float64, perMinuteCount []int, nonScaledDuration []float64, distribution common.IatDistribution) bool { // PREPARING ARGUMENTS var dist string inputFile := "test_data.txt" @@ -457,7 +321,10 @@ func checkDistribution(data [][]float64, nonScaledDuration []float64, distributi result := false - for min := 0; min < len(data); min++ { + beginIndex := 0 + endIndex := perMinuteCount[0] + + for min := 0; min < len(perMinuteCount); min++ { // WRITING DISTRIBUTION TO TEST f, err := os.Create(inputFile) if err != nil { @@ -466,8 +333,13 @@ func checkDistribution(data [][]float64, nonScaledDuration []float64, distributi defer f.Close() - for _, iat := range data[min] { - _, _ = f.WriteString(fmt.Sprintf("%f\n", iat)) + for i := beginIndex; i < endIndex; i++ { + _, _ = f.WriteString(fmt.Sprintf("%f\n", data[i])) + } + + if min+1 < len(perMinuteCount) { + beginIndex += perMinuteCount[min] + endIndex = beginIndex + perMinuteCount[min+1] } // SETTING UP THE TESTING SCRIPT @@ -478,7 +350,7 @@ func checkDistribution(data [][]float64, nonScaledDuration []float64, distributi // NOTE: the script generates a histogram in PNG format that can be used as a sanity-check if err := statisticalTest.Wait(); err != nil { output, _ := statisticalTest.Output() - log.Info(string(output)) + log.Debug(string(output)) switch statisticalTest.ProcessState.ExitCode() { case 0: @@ -577,7 +449,7 @@ func TestGenerateExecutionSpecifications(t *testing.T) { index := i go func() { - runtime, memory := spec[0][index].Runtime, spec[0][index].Memory + runtime, memory := spec[index].Runtime, spec[index].Memory mutex.Lock() results[common.RuntimeSpecification{Runtime: runtime, Memory: memory}] = struct{}{} diff --git a/pkg/generator/startup_busy_loop.go b/pkg/generator/startup_busy_loop.go new file mode 100644 index 00000000..35163143 --- /dev/null +++ b/pkg/generator/startup_busy_loop.go @@ -0,0 +1,12 @@ +package generator + +func ComputeBusyLoopPeriod(memory int) int { + // data for AWS from STeLLAR - IISWC'21 + if memory <= 10 { + return 300 + } else if memory <= 60 { + return 750 + } else { + return 1250 + } +} diff --git a/pkg/trace/knative_workload_parser.go b/pkg/trace/knative_workload_parser.go new file mode 100644 index 00000000..cb30b20d --- /dev/null +++ b/pkg/trace/knative_workload_parser.go @@ -0,0 +1,76 @@ +package trace + +import ( + "github.com/sirupsen/logrus" + "github.com/vhive-serverless/loader/pkg/common" + "gopkg.in/yaml.v3" + "os" + "strconv" +) + +func readKnativeYaml(path string) map[string]interface{} { + cfg := make(map[string]interface{}) + + yamlFile, err := os.ReadFile(path) + if err != nil { + logrus.Fatalf("Error reading Knative YAML - %v", err) + } + + err = yaml.Unmarshal(yamlFile, &cfg) + if err != nil { + logrus.Fatalf("Error unmarshalling Knative YAML - %v", err) + } + + return cfg +} + +func getNodeByName(data []interface{}, key string) map[string]interface{} { + for i := 0; i < len(data); i++ { + d := data[i].(map[string]interface{}) + + if d["name"] == key { + return d + } + } + + return nil +} + +func convertKnativeYamlToDirigentMetadata(path string) *common.DirigentMetadata { + cfg := readKnativeYaml(path) + + r1 := cfg["spec"].(map[string]interface{}) + r2 := r1["template"].(map[string]interface{}) + + metadata := r2["metadata"].(map[string]interface{}) + annotations := metadata["annotations"].(map[string]interface{}) + upperScale := annotations["autoscaling.knative.dev/max-scale"].(string) + lowerScale := annotations["autoscaling.knative.dev/min-scale"].(string) + upperScaleInt, _ := strconv.Atoi(upperScale) + lowerScaleInt, _ := strconv.Atoi(lowerScale) + + spec := r2["spec"].(map[string]interface{}) + containers := spec["containers"].([]interface{})[0].(map[string]interface{}) + image := containers["image"].(string) + + ports := containers["ports"].([]interface{}) + port := getNodeByName(ports, "h2c") + portInt := port["containerPort"].(int) + + env := containers["env"].([]interface{}) + iterationMultiplier := getNodeByName(env, "ITERATIONS_MULTIPLIER") + iterationMultiplierInt, _ := strconv.Atoi(iterationMultiplier["value"].(string)) + + ioPercentage := getNodeByName(env, "IO_PERCENTAGE") + ioPercentageInt, _ := strconv.Atoi(ioPercentage["value"].(string)) + + return &common.DirigentMetadata{ + Image: image, + Port: portInt, + Protocol: "tcp", + ScalingUpperBound: upperScaleInt, + ScalingLowerBound: lowerScaleInt, + IterationMultiplier: iterationMultiplierInt, + IOPercentage: ioPercentageInt, + } +} diff --git a/pkg/trace/knative_workload_parser_test.go b/pkg/trace/knative_workload_parser_test.go new file mode 100644 index 00000000..d39c7a8b --- /dev/null +++ b/pkg/trace/knative_workload_parser_test.go @@ -0,0 +1,18 @@ +package trace + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestConvertKnativeYAMLToDirigentMetadata(t *testing.T) { + cfg := convertKnativeYamlToDirigentMetadata("test_data/service.yaml") + + assert.Equal(t, cfg.Image, "docker.io/cvetkovic/dirigent_trace_function:latest") + assert.Equal(t, cfg.Port, 80) + assert.Equal(t, cfg.Protocol, "tcp") + assert.Equal(t, cfg.ScalingUpperBound, 200) + assert.Equal(t, cfg.ScalingLowerBound, 0) + assert.Equal(t, cfg.IterationMultiplier, 102) + assert.Equal(t, cfg.IOPercentage, 50) +} diff --git a/pkg/trace/parser.go b/pkg/trace/parser.go index d98d955c..3bd6713d 100644 --- a/pkg/trace/parser.go +++ b/pkg/trace/parser.go @@ -30,6 +30,7 @@ import ( "fmt" "github.com/gocarina/gocsv" "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/generator" "io" "math/rand" "os" @@ -42,14 +43,16 @@ import ( type AzureTraceParser struct { DirectoryPath string + YAMLPath string duration int functionNameGenerator *rand.Rand } -func NewAzureParser(directoryPath string, totalDuration int) *AzureTraceParser { +func NewAzureParser(directoryPath string, yamlPath string, totalDuration int) *AzureTraceParser { return &AzureTraceParser{ DirectoryPath: directoryPath, + YAMLPath: yamlPath, duration: totalDuration, functionNameGenerator: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -90,7 +93,8 @@ func (p *AzureTraceParser) extractFunctions( invocations *[]common.FunctionInvocationStats, runtime *[]common.FunctionRuntimeStats, memory *[]common.FunctionMemoryStats, - dirigentMetadata *[]common.DirigentMetadata) []*common.Function { + dirigentMetadata *[]common.DirigentMetadata, + platform string) []*common.Function { var result []*common.Function @@ -102,6 +106,8 @@ func (p *AzureTraceParser) extractFunctions( dirigentMetadataByHashFunction = createDirigentMetadataMap(dirigentMetadata) } + gen := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < len(*invocations); i++ { invocationStats := (*invocations)[i] @@ -111,10 +117,15 @@ func (p *AzureTraceParser) extractFunctions( InvocationStats: &invocationStats, RuntimeStats: runtimeByHashFunction[invocationStats.HashFunction], MemoryStats: memoryByHashFunction[invocationStats.HashFunction], + + ColdStartBusyLoopMs: generator.ComputeBusyLoopPeriod(generator.GenerateMemorySpec(gen, gen.Float64(), memoryByHashFunction[invocationStats.HashFunction])), } if dirigentMetadata != nil { function.DirigentMetadata = dirigentMetadataByHashFunction[invocationStats.HashFunction] + } else if strings.Contains(strings.ToLower(platform), "knative") { + // values are not used for Knative so they are irrelevant + function.DirigentMetadata = convertKnativeYamlToDirigentMetadata(p.YAMLPath) } result = append(result, function) @@ -134,7 +145,7 @@ func (p *AzureTraceParser) Parse(platform string) []*common.Function { memoryTrace := parseMemoryTrace(memoryPath) dirigentMetadata := parseDirigentMetadata(dirigentPath, platform) - return p.extractFunctions(invocationTrace, runtimeTrace, memoryTrace, dirigentMetadata) + return p.extractFunctions(invocationTrace, runtimeTrace, memoryTrace, dirigentMetadata, platform) } func parseInvocationTrace(traceFile string, traceDuration int) *[]common.FunctionInvocationStats { @@ -259,7 +270,7 @@ func parseMemoryTrace(traceFile string) *[]common.FunctionMemoryStats { } func parseDirigentMetadata(traceFile string, platform string) *[]common.DirigentMetadata { - if strings.ToLower(platform) != "dirigent" { + if !strings.Contains(strings.ToLower(platform), "dirigent") { return nil } diff --git a/pkg/trace/parser_test.go b/pkg/trace/parser_test.go index d686b700..071e78ed 100644 --- a/pkg/trace/parser_test.go +++ b/pkg/trace/parser_test.go @@ -25,6 +25,7 @@ package trace import ( + "github.com/vhive-serverless/loader/pkg/common" "math" "strings" "testing" @@ -119,13 +120,13 @@ func TestParseMemoryTrace(t *testing.T) { } func TestParserWrapper(t *testing.T) { - parser := NewAzureParser("test_data", 10) + parser := NewAzureParser("test_data", "test_data/service.yaml", 10) functions := parser.Parse("Knative") if len(functions) != 1 { t.Error("Invalid function array length.") } - if !strings.HasPrefix(functions[0].Name, "trace-func") || + if !strings.HasPrefix(functions[0].Name, common.FunctionNamePrefix) || functions[0].InvocationStats == nil || functions[0].RuntimeStats == nil || functions[0].MemoryStats == nil { diff --git a/pkg/trace/test_data/service.yaml b/pkg/trace/test_data/service.yaml new file mode 100644 index 00000000..3a3b6704 --- /dev/null +++ b/pkg/trace/test_data/service.yaml @@ -0,0 +1,43 @@ +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + name: $FUNC_NAME + namespace: default +spec: + template: + metadata: + annotations: + autoscaling.knative.dev/initial-scale: "0" # Should start from 0, otherwise we can't deploy more functions than the node physically permits. + autoscaling.knative.dev/min-scale: "0" # This parameter only has a per-revision key, so it's necessary to have here in case of the warmup messes up. + autoscaling.knative.dev/target-burst-capacity: "-1" # Put activator always in the path explicitly. + autoscaling.knative.dev/max-scale: "200" # Maximum instances limit of Azure. + + autoscaling.knative.dev/panic-window-percentage: $PANIC_WINDOW + autoscaling.knative.dev/panic-threshold-percentage: $PANIC_THRESHOLD + autoscaling.knative.dev/metric: $AUTOSCALING_METRIC + autoscaling.knative.dev/target: $AUTOSCALING_TARGET + spec: + containerConcurrency: 1 + nodeSelector: + loader-nodetype: worker + containers: + - image: docker.io/cvetkovic/dirigent_trace_function:latest + # imagePullPolicy: Always # No need if the tag is `latest`. + ports: + - name: h2c # For gRPC support + containerPort: 80 + env: + - name: ITERATIONS_MULTIPLIER + value: "102" + - name: ENABLE_TRACING + value: "false" + - name: COLD_START_BUSY_LOOP_MS + value: $COLD_START_BUSY_LOOP_MS + - name: IO_PERCENTAGE + value: "50" + resources: + limits: + cpu: $CPU_LIMITS + requests: + cpu: $CPU_REQUEST + memory: $MEMORY_REQUESTS \ No newline at end of file