Skip to content

Commit

Permalink
Applying new round of Leonid's feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic committed Nov 29, 2024
1 parent 20d1dc6 commit ce7250d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 84 deletions.
37 changes: 9 additions & 28 deletions pkg/driver/trace_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,11 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) {
}
}

func getTotalInvocationCount(perMinuteCount []int, traceDuration int, granularity common.TraceGranularity) int {
if len(perMinuteCount) == 0 {
return 0
}

if granularity == common.SecondGranularity {
traceDuration *= 60
}

sum := 0
for i := 0; i < traceDuration; i++ {
sum += perMinuteCount[i]
}

return sum
}

func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.WaitGroup, addInvocationsToGroup *sync.WaitGroup, totalSuccessful *int64, totalFailed *int64, totalIssued *int64, recordOutputChannel chan *mc.ExecutionRecord) {
defer announceFunctionDone.Done()

function := list.Front().Value.(*common.Function)
invocationCount := getTotalInvocationCount(function.Specification.PerMinuteCount, d.Configuration.TraceDuration, d.Configuration.TraceGranularity)
invocationCount := len(function.Specification.IAT)
addInvocationsToGroup.Add(invocationCount)

if invocationCount == 0 {
Expand All @@ -196,7 +179,6 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai

var successfulInvocations int64
var failedInvocations int64
var numberOfIssuedInvocations int64
var currentPhase = common.ExecutionPhase

waitForInvocations := sync.WaitGroup{}
Expand All @@ -206,8 +188,8 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai
log.Infof("Warmup phase has started.")
}

experimentStart := time.Now()
startOfIteration := time.Now()
startOfExperiment := time.Now()
var previousIATSum int64

for {
if iatIndex >= len(IAT) || iatIndex >= terminationIAT {
Expand All @@ -216,11 +198,11 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai

iat := time.Duration(IAT[iatIndex]) * time.Microsecond

schedulingDelay := time.Since(startOfIteration)
sleepFor := iat - schedulingDelay
time.Sleep(sleepFor)
schedulingDelay := time.Since(startOfExperiment).Microseconds() - previousIATSum
sleepFor := iat.Microseconds() - schedulingDelay
time.Sleep(time.Duration(sleepFor) * time.Microsecond)

startOfIteration = time.Now()
previousIATSum += iat.Microseconds()

if !d.Configuration.TestMode {
waitForInvocations.Add(1)
Expand Down Expand Up @@ -252,10 +234,9 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai
successfulInvocations++
}

numberOfIssuedInvocations++
iatIndex++

d.announceWarmupEnd(experimentStart, &currentPhase)
d.announceWarmupEnd(startOfExperiment, &currentPhase)

// counter updates
invocationSinceTheBeginningOfMinute++
Expand All @@ -273,7 +254,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai

atomic.AddInt64(totalSuccessful, successfulInvocations)
atomic.AddInt64(totalFailed, failedInvocations)
atomic.AddInt64(totalIssued, numberOfIssuedInvocations)
atomic.AddInt64(totalIssued, int64(iatIndex))
}

func (d *Driver) announceWarmupEnd(start time.Time, currentPhase *common.ExperimentPhase) {
Expand Down
19 changes: 6 additions & 13 deletions pkg/driver/trace_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,6 @@ func createFakeLoaderConfiguration() *config.LoaderConfiguration {
func createTestDriver(invocationStats []int) *Driver {
cfg := createFakeLoaderConfiguration()

if invocationStats == nil {
invocationStats = []int{
5, 5, 5, 5, 5,
5, 5, 5, 5, 5,
5, 5, 5, 5, 5,
5, 5, 5, 5, 5,
}
}

driver := NewDriver(&config.Configuration{
LoaderConfiguration: cfg,
IATDistribution: common.Equidistant,
Expand Down Expand Up @@ -137,7 +128,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) {
invocationRecordOutputChannel := make(chan *metric.ExecutionRecord, 1)
announceDone := &sync.WaitGroup{}

testDriver := createTestDriver(nil)
testDriver := createTestDriver([]int{1})

if !test.forceFail {
address, port := "localhost", test.port
Expand Down Expand Up @@ -200,7 +191,7 @@ func TestDAGInvocation(t *testing.T) {
invocationRecordOutputChannel := make(chan *metric.ExecutionRecord, functionsToInvoke)
announceDone := &sync.WaitGroup{}

testDriver := createTestDriver(nil)
testDriver := createTestDriver([]int{4})
list := list.New()
address, port := "localhost", 8085
function := testDriver.Configuration.Functions[0]
Expand Down Expand Up @@ -245,7 +236,7 @@ func TestDAGInvocation(t *testing.T) {
}

func TestGlobalMetricsCollector(t *testing.T) {
driver := createTestDriver(nil)
driver := createTestDriver([]int{5})

inputChannel := make(chan *metric.ExecutionRecord)
totalIssuedChannel := make(chan int64)
Expand Down Expand Up @@ -321,7 +312,7 @@ func TestDriverBackgroundProcesses(t *testing.T) {
t.Skip("Not yet implemented")
}

driver := createTestDriver(nil)
driver := createTestDriver([]int{5})
globalCollectorAnnounceDone := &sync.WaitGroup{}

completed, _, _, _ := driver.startBackgroundProcesses(globalCollectorAnnounceDone)
Expand Down Expand Up @@ -350,12 +341,14 @@ func TestDriverCompletely(t *testing.T) {
{
testName: "without_warmup",
experimentDurationMin: 1,
invocationStats: []int{5},
traceGranularity: common.MinuteGranularity,
expectedInvocations: 5,
},
{
testName: "with_warmup",
experimentDurationMin: 2, // 1 withWarmup + 1 execution
invocationStats: []int{5, 5},
traceGranularity: common.MinuteGranularity,
withWarmup: true,
expectedInvocations: 10,
Expand Down
5 changes: 0 additions & 5 deletions pkg/generator/rps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ func TestColdStartMatrix(t *testing.T) {
}

for fIndex := 0; fIndex < len(matrix); fIndex++ {
sum := 0.0
currentMinute := 0

if len(matrix[fIndex]) != len(test.expectedIAT[fIndex]) {
Expand All @@ -400,10 +399,6 @@ func TestColdStartMatrix(t *testing.T) {
if currentMinute > len(test.expectedCount[fIndex]) {
t.Errorf("Invalid expected count array size for function with index %d", fIndex)
}

if matrix[fIndex][i] >= 0 {
sum += matrix[fIndex][i]
}
}

for i := 0; i < len(test.expectedCount[fIndex]); i++ {
Expand Down
48 changes: 10 additions & 38 deletions pkg/generator/specification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,18 @@ func TestGenerateDistribution(t *testing.T) {
}

failed := false
if test.expectedPoints != nil {
for i := 0; i < len(test.expectedPoints); i++ {
if math.Abs(data[i]-test.expectedPoints[i]) > epsilon {
log.Debug(fmt.Sprintf("got: %f, expected: %f\n", data[i], test.expectedPoints[i]))

failed = true
// no break statement for debugging purpose
}
}
for i := 0; i < len(test.expectedPoints); i++ {
if math.Abs(data[i]-test.expectedPoints[i]) > epsilon {
log.Debug(fmt.Sprintf("got: %f, expected: %f\n", data[i], test.expectedPoints[i]))

if failed {
t.Error("Test " + testName + " has failed due to incorrectly generated IAT.")
failed = true
// no break statement for debugging purpose
}
}

if failed {
t.Error("Test " + testName + " has failed due to incorrectly generated IAT.")
}
})
}
}
Expand Down Expand Up @@ -234,33 +232,7 @@ func TestSerialGenerateIAT(t *testing.T) {
testDistribution: false,
},
{
testName: "1inv_1min_exponential",
invocations: []int{1},
iatDistribution: common.Exponential,
shiftIAT: false,
granularity: common.MinuteGranularity,
expectedPoints: []float64{
0,
},
testDistribution: false,
},
{
testName: "5inv_1min_equidistant",
invocations: []int{5},
iatDistribution: common.Equidistant,
shiftIAT: false,
granularity: common.MinuteGranularity,
expectedPoints: []float64{
0,
12000000,
12000000,
12000000,
12000000,
},
testDistribution: false,
},
{
testName: "30inv_5min_equidistant",
testName: "25inv_5min_equidistant",
invocations: []int{5, 5, 5, 5, 5},
iatDistribution: common.Equidistant,
shiftIAT: false,
Expand Down

0 comments on commit ce7250d

Please sign in to comment.