Skip to content

Commit

Permalink
RPS mode - new feature
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic committed Nov 15, 2024
1 parent 25e9a99 commit 78f02ab
Show file tree
Hide file tree
Showing 16 changed files with 1,084 additions and 480 deletions.
2 changes: 1 addition & 1 deletion cmd/config_dirigent_rps.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"InvokeProtocol" : "http2",
"EndpointPort": 80,

"DirigentControlPlaneIP": "10.0.1.253:9092",
"DirigentControlPlaneIP": "localhost:9092",
"BusyLoopOnSandboxStartup": false,

"RpsTarget": 1,
Expand Down
29 changes: 24 additions & 5 deletions cmd/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package main
import (
"flag"
"fmt"
"github.com/vhive-serverless/loader/pkg/generator"
"os"
"strings"
"time"
Expand All @@ -49,7 +50,7 @@ var (
configPath = flag.String("config", "cmd/config_knative_trace.json", "Path to loader configuration file")
failurePath = flag.String("failureConfig", "cmd/failure.json", "Path to the failure configuration file")
verbosity = flag.String("verbosity", "info", "Logging verbosity - choose from [info, debug, trace]")
iatGeneration = flag.Bool("iatGeneration", false, "Generate iats only or run invocations as well")
iatGeneration = flag.Bool("iatGeneration", false, "Generate IATs only or run invocations as well")
iatFromFile = flag.Bool("generated", false, "True if iats were already generated")
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func main() {
if !strings.HasSuffix(cfg.Platform, "-RPS") {
runTraceMode(&cfg, *iatFromFile, *iatGeneration)
} else {
runRPSMode(&cfg, *iatGeneration)
runRPSMode(&cfg, *iatFromFile, *iatGeneration)
}
}

Expand Down Expand Up @@ -175,7 +176,7 @@ func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGen
durationToParse := determineDurationToParse(cfg.ExperimentDuration, cfg.WarmupDuration)
yamlPath := parseYAMLSpecification(cfg)

traceParser := trace.NewAzureParser(cfg.TracePath, durationToParse)
traceParser := trace.NewAzureParser(cfg.TracePath, yamlPath, durationToParse)
functions := traceParser.Parse(cfg.Platform)

log.Infof("Traces contain the following %d functions:\n", len(functions))
Expand Down Expand Up @@ -205,6 +206,24 @@ func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGen
experimentDriver.RunExperiment(justGenerateIAT, readIATFromFile)
}

func runRPSMode(cfg *config.LoaderConfiguration, justGenerateIAT bool) {
panic("Not yet implemented")
func runRPSMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGenerateIAT bool) {
rpsTarget := cfg.RpsTarget
coldStartPercentage := cfg.RpsColdStartRatioPercentage

warmStartRPS := rpsTarget * (100 - coldStartPercentage) / 100
coldStartRPS := rpsTarget * coldStartPercentage / 100

warmFunction, warmStartCount := generator.GenerateWarmStartFunction(cfg.ExperimentDuration, warmStartRPS)
coldFunctions, coldStartCount := generator.GenerateColdStartFunctions(cfg.ExperimentDuration, coldStartRPS, cfg.RpsCooldownSeconds)

experimentDriver := driver.NewDriver(&config.Configuration{
LoaderConfiguration: cfg,
TraceDuration: determineDurationToParse(cfg.ExperimentDuration, cfg.WarmupDuration),

YAMLPath: parseYAMLSpecification(cfg),

Functions: generator.CreateRPSFunctions(cfg, warmFunction, warmStartCount, coldFunctions, coldStartCount),
})

experimentDriver.RunExperiment(justGenerateIAT, readIATFromFile)
}
11 changes: 6 additions & 5 deletions pkg/common/specification_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package common

// IATMatrix - columns are minutes, rows are IATs
type IATMatrix [][]float64
type IATArray []float64

// ProbabilisticDuration used for testing the exponential distribution
type ProbabilisticDuration []float64
Expand All @@ -35,10 +35,11 @@ type RuntimeSpecification struct {
Memory int
}

type RuntimeSpecificationMatrix [][]RuntimeSpecification
type RuntimeSpecificationArray []RuntimeSpecification

type FunctionSpecification struct {
IAT IATMatrix `json:"IAT"`
RawDuration ProbabilisticDuration `json:"RawDuration"`
RuntimeSpecification RuntimeSpecificationMatrix `json:"RuntimeSpecification"`
IAT IATArray `json:"IAT"`
PerMinuteCount []int `json:"PerMinuteCount"`
RawDuration ProbabilisticDuration `json:"RawDuration"`
RuntimeSpecification RuntimeSpecificationArray `json:"RuntimeSpecification"`
}
110 changes: 110 additions & 0 deletions pkg/driver/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package driver

import (
"encoding/json"
"github.com/vhive-serverless/loader/pkg/common"
mc "github.com/vhive-serverless/loader/pkg/metric"
"math"
"os"
"sync"
"time"
)

func (d *Driver) CreateMetricsScrapper(interval time.Duration,
signalReady *sync.WaitGroup, finishCh chan int, allRecordsWritten *sync.WaitGroup) func() {
timer := time.NewTicker(interval)

return func() {
signalReady.Done()
knStatRecords := make(chan interface{}, 100)
scaleRecords := make(chan interface{}, 100)
writerDone := sync.WaitGroup{}

clusterUsageFile, err := os.Create(d.outputFilename("cluster_usage"))
common.Check(err)
defer clusterUsageFile.Close()

writerDone.Add(1)
go d.runCSVWriter(knStatRecords, d.outputFilename("kn_stats"), &writerDone)

writerDone.Add(1)
go d.runCSVWriter(scaleRecords, d.outputFilename("deployment_scale"), &writerDone)

for {
select {
case <-timer.C:
recCluster := mc.ScrapeClusterUsage()
recCluster.Timestamp = time.Now().UnixMicro()

byteArr, err := json.Marshal(recCluster)
common.Check(err)

_, err = clusterUsageFile.Write(byteArr)
common.Check(err)

_, err = clusterUsageFile.WriteString("\n")
common.Check(err)

recScale := mc.ScrapeDeploymentScales()
timestamp := time.Now().UnixMicro()
for _, rec := range recScale {
rec.Timestamp = timestamp
scaleRecords <- rec
}

recKnative := mc.ScrapeKnStats()
recKnative.Timestamp = time.Now().UnixMicro()
knStatRecords <- recKnative
case <-finishCh:
close(knStatRecords)
close(scaleRecords)

writerDone.Wait()
allRecordsWritten.Done()

return
}
}
}
}

func (d *Driver) createGlobalMetricsCollector(filename string, collector chan *mc.ExecutionRecord,
signalReady *sync.WaitGroup, signalEverythingWritten *sync.WaitGroup, totalIssuedChannel chan int64) {

// NOTE: totalNumberOfInvocations is initialized to MaxInt64 not to allow collector to complete before
// the end signal is received on totalIssuedChannel, which deliver the total number of issued invocations.
// This number is known once all the individual function drivers finish issuing invocations and
// when all the invocations return
var totalNumberOfInvocations int64 = math.MaxInt64
var currentlyWritten int64

file, err := os.Create(filename)
common.Check(err)
defer file.Close()

signalReady.Done()

records := make(chan interface{}, 100)
writerDone := sync.WaitGroup{}
writerDone.Add(1)
go d.runCSVWriter(records, filename, &writerDone)

for {
select {
case record := <-collector:
records <- record

currentlyWritten++
case record := <-totalIssuedChannel:
totalNumberOfInvocations = record
}

if currentlyWritten == totalNumberOfInvocations {
close(records)
writerDone.Wait()
(*signalEverythingWritten).Done()

return
}
}
}
Loading

0 comments on commit 78f02ab

Please sign in to comment.