Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPS mode with cold/warm ratio #557

Merged
merged 32 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
32f9e1e
RPS mode - new feature
cvetkovic Nov 15, 2024
4033f8d
YAML fixes
cvetkovic Nov 15, 2024
e831ffc
Read/Write IAT and generate specs knobs
cvetkovic Nov 15, 2024
968f069
Fixing tests
cvetkovic Nov 15, 2024
64e3219
Porting minor changes from aux branch
cvetkovic Nov 15, 2024
0eb270c
Failure rate percentage logging
cvetkovic Nov 15, 2024
a86ddf7
Fixing DAG test
cvetkovic Nov 15, 2024
757ebc5
Fixing tests
cvetkovic Nov 15, 2024
ee2367b
Extracting Dirigent metadata parsing from Azure parser
cvetkovic Nov 25, 2024
05198bc
Fixing typo in trace driver
cvetkovic Nov 25, 2024
1ff197d
Extracting specification generator to a separate method
cvetkovic Nov 25, 2024
f046e99
Bugfix on Knative parallel deploy
cvetkovic Nov 25, 2024
c0d3b02
Moving driver-unrelated code (metrics) to separate package
cvetkovic Nov 25, 2024
4887435
Variable renaming and fixing hanging comment
cvetkovic Nov 25, 2024
6d29c10
Fixing IAT for empty time slot
cvetkovic Nov 25, 2024
d6b0591
Fixing runtime specification test
cvetkovic Nov 25, 2024
050d5ac
Bugfix: some invocations got ommited if at the end of array
cvetkovic Nov 25, 2024
07875f8
RPS mode without RpsCooldownSeconds offset with fixed tests
cvetkovic Nov 26, 2024
805e057
Comment on IATArray purpose
cvetkovic Nov 26, 2024
389e81f
Extracting commons from OpenWhisk and our function
cvetkovic Nov 26, 2024
f8f2f13
Shift IAT test and fix
cvetkovic Nov 26, 2024
dff61bc
Bugfixing IAT generation for empty minutes
cvetkovic Nov 26, 2024
279e28d
Fixing linter
cvetkovic Nov 26, 2024
c15a643
More unit tests for low RPS and huge cooldown
cvetkovic Nov 26, 2024
134b82c
Bugfixing IAT generation issues - Leonid's solution
cvetkovic Nov 27, 2024
0843abe
Fixing testing checks
cvetkovic Nov 28, 2024
2c49b49
Another round of code improvements - Leonid's feedback
cvetkovic Nov 28, 2024
090e6a6
Per-minute count check
cvetkovic Nov 28, 2024
20d1dc6
New individual function driver
cvetkovic Nov 28, 2024
ce7250d
Applying new round of Leonid's feedback
cvetkovic Nov 29, 2024
1b1c96d
RPS with warmup bugfix
cvetkovic Dec 5, 2024
1c32402
Warmup duration fix
cvetkovic Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/configs/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -777,4 +777,5 @@ PrepullMode
async
AsyncMode
AsyncResponseURL
AsyncWaitToCollectMin
AsyncWaitToCollectMin
Knative's
2 changes: 1 addition & 1 deletion cmd/config_dirigent_dandelion_rps.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"RpsTarget": 1,
Expand Down
2 changes: 1 addition & 1 deletion cmd/config_dirigent_dandelion_trace.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"TracePath": "data/traces/example",
Expand Down
4 changes: 2 additions & 2 deletions cmd/config_dirigent_rps.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"InvokeProtocol" : "http2",
"EndpointPort": 80,

"DirigentControlPlaneIP": "10.0.1.253:9092",
"DirigentControlPlaneIP": "10.0.1.253:9091",
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"RpsTarget": 1,
Expand Down
2 changes: 1 addition & 1 deletion cmd/config_dirigent_trace.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"InvokeProtocol" : "http2",
"EndpointPort": 80,

"DirigentControlPlaneIP": "10.0.1.253:9092",
"DirigentControlPlaneIP": "10.0.1.253:9091",
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
Expand Down
44 changes: 36 additions & 8 deletions cmd/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package main
import (
"flag"
"fmt"
"github.com/vhive-serverless/loader/pkg/generator"
"os"
"strings"
"time"
Expand All @@ -49,7 +50,7 @@ var (
configPath = flag.String("config", "cmd/config_knative_trace.json", "Path to loader configuration file")
failurePath = flag.String("failureConfig", "cmd/failure.json", "Path to the failure configuration file")
verbosity = flag.String("verbosity", "info", "Logging verbosity - choose from [info, debug, trace]")
iatGeneration = flag.Bool("iatGeneration", false, "Generate iats only or run invocations as well")
iatGeneration = flag.Bool("iatGeneration", false, "Generate IATs only or run invocations as well")
iatFromFile = flag.Bool("generated", false, "True if iats were already generated")
)

Expand Down Expand Up @@ -107,15 +108,14 @@ func main() {
if !strings.HasSuffix(cfg.Platform, "-RPS") {
runTraceMode(&cfg, *iatFromFile, *iatGeneration)
} else {
runRPSMode(&cfg, *iatGeneration)
runRPSMode(&cfg, *iatFromFile, *iatGeneration)
}
}

func determineDurationToParse(runtimeDuration int, warmupDuration int) int {
result := 0

if warmupDuration > 0 {
result += 1 // profiling
result += warmupDuration // warmup
}

Expand Down Expand Up @@ -171,12 +171,17 @@ func parseTraceGranularity(cfg *config.LoaderConfiguration) common.TraceGranular
return common.MinuteGranularity
}

func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGenerateIAT bool) {
func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, writeIATsToFile bool) {
durationToParse := determineDurationToParse(cfg.ExperimentDuration, cfg.WarmupDuration)
yamlPath := parseYAMLSpecification(cfg)

// Azure trace parsing
traceParser := trace.NewAzureParser(cfg.TracePath, durationToParse)
functions := traceParser.Parse(cfg.Platform)
functions := traceParser.Parse()

// Dirigent metadata parsing
dirigentMetadataParser := trace.NewDirigentMetadataParser(cfg.TracePath, functions, yamlPath, cfg.Platform)
dirigentMetadataParser.Parse()

log.Infof("Traces contain the following %d functions:\n", len(functions))
for _, function := range functions {
Expand All @@ -202,9 +207,32 @@ func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGen

log.Infof("Using %s as a service YAML specification file.\n", experimentDriver.Configuration.YAMLPath)

experimentDriver.RunExperiment(justGenerateIAT, readIATFromFile)
experimentDriver.GenerateSpecification()
experimentDriver.ReadOrWriteFileSpecification(writeIATsToFile, readIATFromFile)
experimentDriver.RunExperiment()
}

func runRPSMode(cfg *config.LoaderConfiguration, justGenerateIAT bool) {
panic("Not yet implemented")
func runRPSMode(cfg *config.LoaderConfiguration, readIATFromFile bool, writeIATsToFile bool) {
experimentDuration := determineDurationToParse(cfg.ExperimentDuration, cfg.WarmupDuration)

rpsTarget := cfg.RpsTarget
coldStartPercentage := cfg.RpsColdStartRatioPercentage

warmStartRPS := rpsTarget * (100 - coldStartPercentage) / 100
coldStartRPS := rpsTarget * coldStartPercentage / 100

warmFunction, warmStartCount := generator.GenerateWarmStartFunction(experimentDuration, warmStartRPS)
coldFunctions, coldStartCount := generator.GenerateColdStartFunctions(experimentDuration, coldStartRPS, cfg.RpsCooldownSeconds)

experimentDriver := driver.NewDriver(&config.Configuration{
LoaderConfiguration: cfg,
TraceDuration: experimentDuration,

YAMLPath: parseYAMLSpecification(cfg),

Functions: generator.CreateRPSFunctions(cfg, warmFunction, warmStartCount, coldFunctions, coldStartCount),
})

experimentDriver.ReadOrWriteFileSpecification(writeIATsToFile, readIATFromFile)
experimentDriver.RunExperiment()
}
4 changes: 3 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
| AsyncWaitToCollectMin [^6] | int | >= 0 | 0 | Time after experiment ends after which to collect invocation results |
| RpsTarget | int | >= 0 | 0 | Number of requests per second to issue |
| RpsColdStartRatioPercentage | int | >= 0 && <= 100 | 0 | Percentage of cold starts out of specified RPS |
| RpsCooldownSeconds | int | > 0 | 0 | The time it takes for the autoscaler to downscale function (higher for higher RPS) |
| RpsCooldownSeconds [^7] | int | > 0 | 0 | The time it takes for the autoscaler to downscale function (higher for higher RPS) |
| RpsImage | string | N/A | N/A | Function image to use for RPS experiments |
| RpsRuntimeMs | int | >=0 | 0 | Requested execution time |
| RpsMemoryMB | int | >=0 | 0 | Requested memory |
Expand Down Expand Up @@ -55,6 +55,8 @@ Lambda; https://aws.amazon.com/about-aws/whats-new/2018/10/aws-lambda-supports-f

[^6]: Dirigent specific

[^7] It is recommended that the first 10% of cold starts are discarded from the experiment results for low cold start RPS.

---

InVitro can cause failure on cluster manager components. To do so, please configure the `cmd/failure.json`. Make sure
Expand Down
58 changes: 58 additions & 0 deletions pkg/common/interval_search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package common

type Interval[T comparable] struct {
Start T
End T
Value T
}

type IntervalSearch struct {
intervals []Interval[int]
}

func NewIntervalSearch(data []int) *IntervalSearch {
return &IntervalSearch{
intervals: constructIntervals(data),
}
}

func constructIntervals(pmc []int) []Interval[int] {
var result []Interval[int]

sum := 0
for i := 0; i < len(pmc); i++ {
if pmc[i] == 0 {
continue
}
startIndex := sum
endIndex := sum + pmc[i]

if startIndex != endIndex {
result = append(result, Interval[int]{Start: startIndex, End: endIndex - 1, Value: i})
}
//logrus.Debugf("[%d, %d] = %d", startIndex, endIndex-1, i)

sum = endIndex
}

return result
}

func (si *IntervalSearch) SearchInterval(val int) *Interval[int] {
low := 0
high := len(si.intervals) - 1

for low <= high {
mid := (low + high) / 2

if val >= si.intervals[mid].Start && val <= si.intervals[mid].End {
return &si.intervals[mid]
} else if val < si.intervals[mid].Start {
high = mid - 1
} else {
low = mid + 1
}
}

return nil
}
39 changes: 39 additions & 0 deletions pkg/common/interval_search_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package common

import "testing"

func TestIntervalTree(t *testing.T) {
pmc := []int{0, 10, 10, 10, 10, 10, 0, 0, 2}
intervals := NewIntervalSearch(pmc)

pairs := map[int]int{
// minute 1
0: 1,
5: 1,
9: 1,
// minute 2
10: 2,
17: 2,
19: 2,
// minute 5
40: 5,
45: 5,
49: 5,
// minute 8
50: 8,
51: 8,
52: -1, // non-existing
}

for iatIndex, minute := range pairs {
got := intervals.SearchInterval(iatIndex)

if got == nil && minute != -1 {
t.Errorf("Value %d not found ", iatIndex)
} else if got != nil {
if minute != got.Value {
t.Errorf("Unexpected value received - iatIndex: %d; expected: %d; got: %d", iatIndex, minute, got.Value)
}
}
}
}
15 changes: 9 additions & 6 deletions pkg/common/specification_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

package common

// IATMatrix - columns are minutes, rows are IATs
type IATMatrix [][]float64
// IATArray Hold the IATs of invocations for a particular function. Values in this array tells individual function driver
// how much time to sleep before firing an invocation. First invocations should be fired right away after the start of
// experiment, i.e., should typically have a IAT of 0.
type IATArray []float64
cvetkovic marked this conversation as resolved.
Show resolved Hide resolved

// ProbabilisticDuration used for testing the exponential distribution
type ProbabilisticDuration []float64
Expand All @@ -35,10 +37,11 @@ type RuntimeSpecification struct {
Memory int
}

type RuntimeSpecificationMatrix [][]RuntimeSpecification
type RuntimeSpecificationArray []RuntimeSpecification

type FunctionSpecification struct {
IAT IATMatrix `json:"IAT"`
RawDuration ProbabilisticDuration `json:"RawDuration"`
RuntimeSpecification RuntimeSpecificationMatrix `json:"RuntimeSpecification"`
IAT IATArray `json:"IAT"`
PerMinuteCount []int `json:"PerMinuteCount"`
RawDuration ProbabilisticDuration `json:"RawDuration"`
RuntimeSpecification RuntimeSpecificationArray `json:"RuntimeSpecification"`
}
47 changes: 47 additions & 0 deletions pkg/common/workload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package common

// static double SQRTSD (double x) {
// double r;
// __asm__ ("sqrtsd %1, %0" : "=x" (r) : "x" (x));
// return r;
// }
import "C"
import (
"time"
)

const (
// ContainerImageSizeMB was chosen as a median of the container physical memory usage.
// Allocate this much less memory inside the actual function.
ContainerImageSizeMB = 15

ExecUnit int = 1e2
)

func takeSqrts() C.double {
var tmp C.double // Circumvent compiler optimizations
for i := 0; i < ExecUnit; i++ {
tmp = C.SQRTSD(C.double(10))
}
return tmp
}

func busySpin(multiplier, runtimeMilli uint32) {
totalIterations := int(multiplier * runtimeMilli)

for i := 0; i < totalIterations; i++ {
takeSqrts()
}
}

func TraceFunctionExecution(start time.Time, IterationsMultiplier uint32, timeLeftMilliseconds uint32) (msg string) {
timeConsumedMilliseconds := uint32(time.Since(start).Milliseconds())
if timeConsumedMilliseconds < timeLeftMilliseconds {
timeLeftMilliseconds -= timeConsumedMilliseconds
if timeLeftMilliseconds > 0 {
busySpin(uint32(IterationsMultiplier), timeLeftMilliseconds)
}
}

return msg
}
3 changes: 2 additions & 1 deletion pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ type Configuration struct {
IATDistribution common.IatDistribution
ShiftIAT bool // shift the invocations inside minute
TraceGranularity common.TraceGranularity
TraceDuration int // in minutes
// TraceDuration In minutes.
TraceDuration int

YAMLPath string
TestMode bool
Expand Down
Loading
Loading