Skip to content

Commit

Permalink
New worker streaming tests (#2032)
Browse files Browse the repository at this point in the history
* extracted pipeline to a file

* updated test

* added minimum requirement test

* added test

* comment

* implement scale general method + added roundTrip related test

* removed netDurations

* completed scale up tests

* removed un-necessary logging

* organise file

* scale down tests added

* fixed scale conflicts tests for new logic, decreased tests time

* organise tests
  • Loading branch information
Adir111 authored Nov 17, 2024
1 parent 9b3ce79 commit 0b085ed
Show file tree
Hide file tree
Showing 4 changed files with 441 additions and 583 deletions.
6 changes: 0 additions & 6 deletions core/worker/lib/streaming/core/metrics.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
const { mean } = require('@hkube/stats');
const Logger = require('@hkube/logger');

const _calcRate = (list) => {
let first = list[0];
if (list.length === 1) {
first = { time: first.time - 2000, count: 0 };
}
const last = list[list.length - 1];

const log = Logger.GetLogFromContainer();
log.info(`STATISTICS: first value: ${first.count}, last value: ${last.count}, time diff: ${last.time - first.time} ms`);

const timeDiff = (last.time - first.time) / 1000;
const countDiff = last.count - first.count;
let rate = 0;
Expand Down
2 changes: 1 addition & 1 deletion core/worker/lib/streaming/core/statistics.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Statistics {
stats.requests.add(this._createItem(requests));
stats.responses.add(this._createItem(responses));
stats.durations.addRange(netDurations);
stats.grossDurations.addRange(durations);
stats.grossDurations.addRange(durations); // used to calculate round trip
stats.queueDurations.addRange(queueDurations);

this._data[source] = {
Expand Down
77 changes: 77 additions & 0 deletions core/worker/test/mocks/pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"name": "stream",
"kind": "stream",
"nodes": [
{
"nodeName": "A",
"algorithmName": "eval-alg",
"input": [
"@flowInput.arraySize",
"@flowInput.bufferSize"
],
"stateType": "stateful",
"maxStatelessCount": 0
},
{
"nodeName": "B",
"algorithmName": "eval-alg",
"input": [
"@flowInput.arraySize",
"@flowInput.bufferSize"
],
"stateType": "stateful"
},
{
"nodeName": "C",
"algorithmName": "eval-alg",
"input": [
"@flowInput.arraySize",
"@flowInput.bufferSize"
],
"stateType": "stateful"
},
{
"nodeName": "D",
"algorithmName": "eval-alg",
"input": [],
"stateType": "stateless"
},
{
"nodeName": "E",
"algorithmName": "eval-alg",
"input": [],
"stateType": "stateless",
"minStatelessCount": 10
},
{
"nodeName": "F",
"algorithmName": "eval-alg",
"input": [],
"stateType": "stateless",
"minStatelessCount": 3,
"maxStatelessCount": 2
}
],
"edges": [
{ "source": "A", "target": "D" },
{ "source": "B", "target": "D" },
{ "source": "C", "target": "D" },
{ "source": "A", "target": "E" },
{ "source": "B", "target": "E" },
{ "source": "C", "target": "F" }
],
"flowInputMetadata": {
"metadata": {
"flowInput.arraySize": {
"type": "number"
},
"flowInput.bufferSize": {
"type": "number"
}
},
"storageInfo": {
"path": "local-hkube/main:streaming:9dy12jfh/main:streaming:9dy12jfh"
}
}
}

Loading

0 comments on commit 0b085ed

Please sign in to comment.