From 871e5c99778c1247d65a28404f8c643117d6be19 Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 11:41:34 +0200 Subject: [PATCH 01/13] extracted pipeline to a file --- core/worker/test/mocks/pipeline.json | 75 +++++++++++++++++++++++++++ core/worker/test/streaming.js | 76 +--------------------------- 2 files changed, 76 insertions(+), 75 deletions(-) create mode 100644 core/worker/test/mocks/pipeline.json diff --git a/core/worker/test/mocks/pipeline.json b/core/worker/test/mocks/pipeline.json new file mode 100644 index 000000000..47a5651bd --- /dev/null +++ b/core/worker/test/mocks/pipeline.json @@ -0,0 +1,75 @@ +{ + "name": "stream", + "kind": "stream", + "nodes": [ + { + "nodeName": "A", + "algorithmName": "eval-alg", + "input": [ + "@flowInput.arraySize", + "@flowInput.bufferSize" + ], + "stateType": "stateful" + }, + { + "nodeName": "B", + "algorithmName": "eval-alg", + "input": [ + "@flowInput.arraySize", + "@flowInput.bufferSize" + ], + "stateType": "stateful" + }, + { + "nodeName": "C", + "algorithmName": "eval-alg", + "input": [ + "@flowInput.arraySize", + "@flowInput.bufferSize" + ], + "stateType": "stateful" + }, + { + "nodeName": "D", + "algorithmName": "eval-alg", + "input": [], + "stateType": "stateless" + }, + { + "nodeName": "E", + "algorithmName": "eval-alg", + "input": [], + "stateType": "stateless" + }, + { + "nodeName": "F", + "algorithmName": "eval-alg", + "input": [], + "stateType": "stateless", + "maxStatelessCount": 3, + "minStatelessCount": 1 + } + ], + "edges": [ + { "source": "A", "target": "D" }, + { "source": "B", "target": "D" }, + { "source": "C", "target": "D" }, + { "source": "A", "target": "E" }, + { "source": "B", "target": "E" }, + { "source": "C", "target": "F" } + ], + "flowInputMetadata": { + "metadata": { + "flowInput.arraySize": { + "type": "number" + }, + "flowInput.bufferSize": { + "type": "number" + } + }, + "storageInfo": { + "path": "local-hkube/main:streaming:9dy12jfh/main:streaming:9dy12jfh" + } + } +} + diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index cceafc417..b7a98ff5d 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -6,83 +6,9 @@ const streamHandler = require('../lib/streaming/services/stream-handler'); const streamService = require('../lib/streaming/services/stream-service'); const discovery = require('../lib/streaming/services/service-discovery'); const SlaveAdapter = require('../lib/streaming/adapters/slave-adapter'); +const pipeline = require('./mocks/pipeline.json'); const SEC = 1000; -const pipeline = { - name: "stream", - kind: "stream", - nodes: [ - { - nodeName: "A", - algorithmName: "eval-alg", - input: [ - "@flowInput.arraySize", - "@flowInput.bufferSize" - ], - stateType: "stateful" - }, - { - nodeName: "B", - algorithmName: "eval-alg", - input: [ - "@flowInput.arraySize", - "@flowInput.bufferSize" - ], - stateType: "stateful" - }, - { - nodeName: "C", - algorithmName: "eval-alg", - input: [ - "@flowInput.arraySize", - "@flowInput.bufferSize" - ], - stateType: "stateful" - }, - { - nodeName: "D", - algorithmName: "eval-alg", - input: [], - stateType: "stateless" - }, - { - nodeName: "E", - algorithmName: "eval-alg", - input: [], - stateType: "stateless" - }, - { - nodeName: "F", - algorithmName: "eval-alg", - input: [], - stateType: "stateless", - maxStatelessCount: 3, - minStatelessCount: 1 - } - ], - edges: [ - { source: "A", target: "D" }, - { source: "B", target: "D" }, - { source: "C", target: "D" }, - { source: "A", target: "E" }, - { source: "B", target: "E" }, - { source: "C", target: "F" } - ], - flowInputMetadata: { - metadata: { - "flowInput.arraySize": { - "type": "number" - }, - "flowInput.bufferSize": { - "type": "number" - } - }, - storageInfo: { - "path": "local-hkube/main:streaming:9dy12jfh/main:streaming:9dy12jfh" - } - } -} - const streamingDiscovery = { host: process.env.POD_IP || '127.0.0.1', port: process.env.STREAMING_DISCOVERY_PORT || 9022 From a32c6856e86411c28271a92ea47b4cd4d1697f8d Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 11:43:42 +0200 Subject: [PATCH 02/13] updated test --- core/worker/test/streaming.js | 85 +++++++++++------------------------ 1 file changed, 26 insertions(+), 59 deletions(-) diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index b7a98ff5d..811287c4e 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -78,96 +78,63 @@ describe('Streaming', () => { await stateAdapter._db.jobs.create({ pipeline, jobId }); await streamHandler.start(job); }); + beforeEach(() => { const masters = getMasters(); masters.map(m => m.reset()); }) - describe('scale-up', () => { + + describe.only('scale-up', () => { + const replicasOnFirstScale = require('../config/main/config.base.js').streaming.autoScaler.scaleUp.replicasOnFirstScale; + it('should not scale based on no data', async () => { const scale = async (data) => { - streamService.reportStats(data); + streamService.reportStats([data]); } - const list = [{ + const list = { nodeName: 'D', - }]; + }; await scale(list); - const { required } = autoScale(list[0].nodeName); + const { required } = autoScale(list.nodeName); expect(required).to.equal(0); }); + it('should init scale when there is a queue', async () => { const scale = async (data) => { - streamService.reportStats(data); + streamService.reportStats([data]); } - const list = [{ + const list = { nodeName: 'D', queueSize: 1, netDurations - }]; + }; await scale(list); - const { required } = autoScale(list[0].nodeName); - expect(required).to.gte(1); + const { required } = autoScale(list.nodeName); + expect(required).to.equal(replicasOnFirstScale); }); + it('should init scale when there is request rate', async () => { const scale = async (data) => { - data[0].sent += 10; - streamService.reportStats(data); + data.sent += 10; + streamService.reportStats([data]); await delay(100); } - const list = [{ + const list = { nodeName: 'D', sent: 10, queueSize: 0, netDurations - }]; + }; await scale(list); await scale(list); - const { required } = autoScale(list[0].nodeName); - expect(required).to.equal(1); + const { required } = autoScale(list.nodeName); + expect(required).to.equal(replicasOnFirstScale); + }); + + it('should init scale when there is minimum requirement, and a queue', async () => { + }); - // it.only('should not scale if currentSize is fixed', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // streamService.reportStats(data); - // await delay(100); - // } - // const currentSize = async (data) => { - // data[0].currentSize = 5; - // data[0].queueSize += 500; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName: 'D', - // queueSize: 500, - // netDurations - // }]; - // await scale(list); - // const jobs1 = autoScale(list[0].nodeName); - // const jobs2 = autoScale(list[0].nodeName); - // const jobs3 = autoScale(list[0].nodeName); - // await currentSize(list); - // const jobs4 = autoScale(list[0].nodeName); - // const jobs5 = autoScale(list[0].nodeName); - // expect(jobs1.required).to.gte(1); - // expect(jobs2.required).to.gte(1); - // expect(jobs3.required).to.gte(1); - // expect(jobs4.required).to.gte(30); - // expect(jobs5.required).to.gte(30); - // }); - // it('should scale based on queueSize only', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // streamService.reportStats(data); - // } - // const list = [{ - // nodeName: 'D', - // queueSize: 500, - // responses: 0, - // netDurations - // }]; - // await scale(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(1); - // }); // it.only('should scale based on all params', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP (NOT GIVEN HERE) // const queueSize = 0; // const responses = 0; From 9af03e51fba65a972429a70b654a902d02e489cd Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 13:33:21 +0200 Subject: [PATCH 03/13] added minimum requirement test --- core/worker/test/mocks/pipeline.json | 7 ++++++ core/worker/test/streaming.js | 35 ++++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/core/worker/test/mocks/pipeline.json b/core/worker/test/mocks/pipeline.json index 47a5651bd..8b38b1db6 100644 --- a/core/worker/test/mocks/pipeline.json +++ b/core/worker/test/mocks/pipeline.json @@ -48,6 +48,13 @@ "stateType": "stateless", "maxStatelessCount": 3, "minStatelessCount": 1 + }, + { + "nodeName": "G", + "algorithmName": "eval-alg", + "input": [], + "stateType": "stateless", + "minStatelessCount": 3 } ], "edges": [ diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index 811287c4e..bb37a52fa 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -49,7 +49,7 @@ const createJob = (jobId) => { algorithmName: 'my-alg', pipelineName: 'my-pipe', parents: [], - childs: ['D','F'], + childs: ['D','F', 'G'], }; return job; }; @@ -74,6 +74,7 @@ const duration = SEC / msgPerSec; const netDurations = Array.from(Array(10).fill(duration)); describe('Streaming', () => { + before(async () => { await stateAdapter._db.jobs.create({ pipeline, jobId }); await streamHandler.start(job); @@ -91,26 +92,30 @@ describe('Streaming', () => { const scale = async (data) => { streamService.reportStats([data]); } + const list = { nodeName: 'D', }; + await scale(list); const { required } = autoScale(list.nodeName); - expect(required).to.equal(0); + expect(required).to.equal(0, `required=${required}, suppose to be 0`); }); it('should init scale when there is a queue', async () => { const scale = async (data) => { streamService.reportStats([data]); } + const list = { nodeName: 'D', queueSize: 1, netDurations }; + await scale(list); const { required } = autoScale(list.nodeName); - expect(required).to.equal(replicasOnFirstScale); + expect(required).to.equal(replicasOnFirstScale, `required=${required}, suppose to be ${replicasOnFirstScale}`); }); it('should init scale when there is request rate', async () => { @@ -119,19 +124,39 @@ describe('Streaming', () => { streamService.reportStats([data]); await delay(100); } + const list = { nodeName: 'D', sent: 10, queueSize: 0, netDurations }; + + await scale(list); await scale(list); + const { required } = autoScale(list.nodeName); + expect(required).to.equal(replicasOnFirstScale, `required=${required}, suppose to be ${replicasOnFirstScale}`); + }); + + it('should init scale to minimum requirement, when there is a queue', async () => { + const scale = async (data) => { + streamService.reportStats([data]); + } + + const nodeName = 'G'; + const list = { + nodeName, + queueSize: 1, + netDurations + }; + await scale(list); const { required } = autoScale(list.nodeName); - expect(required).to.equal(replicasOnFirstScale); + const min = pipeline.nodes.filter((node) => nodeName === node.nodeName)[0].minStatelessCount; + expect(required).to.be.equal(min, `required=${required}, suppose to be ${min}`); }); - it('should init scale when there is minimum requirement, and a queue', async () => { + it.only('should init scale and a queue', async () => { }); From 333a71b55320e90a5caf17e9811cc18351d28b85 Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 13:55:27 +0200 Subject: [PATCH 04/13] added test --- core/worker/test/mocks/pipeline.json | 17 ++++++----------- core/worker/test/streaming.js | 20 +++++++++++++++++--- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/core/worker/test/mocks/pipeline.json b/core/worker/test/mocks/pipeline.json index 8b38b1db6..baf63e920 100644 --- a/core/worker/test/mocks/pipeline.json +++ b/core/worker/test/mocks/pipeline.json @@ -9,7 +9,8 @@ "@flowInput.arraySize", "@flowInput.bufferSize" ], - "stateType": "stateful" + "stateType": "stateful", + "maxStatelessCount": 0 }, { "nodeName": "B", @@ -39,22 +40,16 @@ "nodeName": "E", "algorithmName": "eval-alg", "input": [], - "stateType": "stateless" - }, - { - "nodeName": "F", - "algorithmName": "eval-alg", - "input": [], "stateType": "stateless", - "maxStatelessCount": 3, - "minStatelessCount": 1 + "minStatelessCount": 3 }, { - "nodeName": "G", + "nodeName": "F", "algorithmName": "eval-alg", "input": [], "stateType": "stateless", - "minStatelessCount": 3 + "minStatelessCount": 3, + "maxStatelessCount": 2 } ], "edges": [ diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index bb37a52fa..e0e9bdd58 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -49,7 +49,7 @@ const createJob = (jobId) => { algorithmName: 'my-alg', pipelineName: 'my-pipe', parents: [], - childs: ['D','F', 'G'], + childs: ['D', 'E', 'F'], }; return job; }; @@ -143,7 +143,7 @@ describe('Streaming', () => { streamService.reportStats([data]); } - const nodeName = 'G'; + const nodeName = 'E'; const list = { nodeName, queueSize: 1, @@ -156,8 +156,22 @@ describe('Streaming', () => { expect(required).to.be.equal(min, `required=${required}, suppose to be ${min}`); }); - it.only('should init scale and a queue', async () => { + it('should init scale and not pass maximum requirement, when there is a queue', async () => { + const scale = async (data) => { + streamService.reportStats([data]); + } + const nodeName = 'F'; + const list = { + nodeName, + queueSize: 1, + netDurations + }; + + await scale(list); + const { required } = autoScale(list.nodeName); + const max = pipeline.nodes.filter((node) => nodeName === node.nodeName)[0].maxStatelessCount; + expect(required).to.be.equal(max, `required=${required}, suppose to be ${max}`); }); // it.only('should scale based on all params', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP (NOT GIVEN HERE) From 6d4ecd4b592affba51639245256617b353b34688 Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 15:12:56 +0200 Subject: [PATCH 05/13] comment --- core/worker/lib/streaming/core/statistics.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/worker/lib/streaming/core/statistics.js b/core/worker/lib/streaming/core/statistics.js index 7078d9906..661fd238b 100644 --- a/core/worker/lib/streaming/core/statistics.js +++ b/core/worker/lib/streaming/core/statistics.js @@ -27,7 +27,7 @@ class Statistics { stats.requests.add(this._createItem(requests)); stats.responses.add(this._createItem(responses)); stats.durations.addRange(netDurations); - stats.grossDurations.addRange(durations); + stats.grossDurations.addRange(durations); // used to calculate round trip stats.queueDurations.addRange(queueDurations); this._data[source] = { From 855a69e121beddd96fd7a6d318c6738e89a90cc0 Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 15:14:40 +0200 Subject: [PATCH 06/13] implement scale general method + added roundTrip related test --- core/worker/test/streaming.js | 258 ++++++++++------------------------ 1 file changed, 71 insertions(+), 187 deletions(-) diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index e0e9bdd58..9420b0d3d 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -72,6 +72,7 @@ const checkMetrics = () => { const msgPerSec = 30; const duration = SEC / msgPerSec; const netDurations = Array.from(Array(10).fill(duration)); +const durations = Array.from(Array(10).fill(duration)); describe('Streaming', () => { @@ -85,239 +86,122 @@ describe('Streaming', () => { masters.map(m => m.reset()); }) + /** + * Adjusts the `data` object by adding rate-related statistics from the `reqRateInfo` + * parameter, then reports the updated statistics via `streamService` and introduces a delay. + * The operation is repeated `repeatCount` times. + * + * Note - `reqRate` is calculated as Δ count / Δ time, meaning `(queueSize + sent) / delayTime`. + * When `delayTime` is 1000 (1 second), this gives the exact `reqRate`. + * + * @async + * @param {Object} data - The data object holding current statistics, to be updated. + * @param {Object} [reqRateInfo={}] - Rate-related information to be added to `data`. + * @param {number} [reqRateInfo.queueSize=0] - The number of items in the request queue to be added. + * @param {number} [reqRateInfo.sent=0] - The number of requests sent, to be added to `data.sent`. + * @param {number} [reqRateInfo.delayTime=0] - The delay time (in milliseconds) to wait after each report. + * @param {number} [repeatCount=1] - The number of times to repeat the scaling and reporting operation. + * + * @returns {Promise} - Resolves after completing the specified number of repetitions. + */ + const scale = async (data, reqRateInfo = {}, repeatCount = 1) => { + const { queueSize = 0, sent = 0, delayTime = 0 } = reqRateInfo; + for (let i = 0; i < repeatCount; i++) { + data.queueSize !== undefined && (data.queueSize += queueSize); + data.sent !== undefined && (data.sent += sent); + streamService.reportStats([data]); + + if (delayTime > 0) { + await delay(delayTime); + } + } + }; + describe.only('scale-up', () => { const replicasOnFirstScale = require('../config/main/config.base.js').streaming.autoScaler.scaleUp.replicasOnFirstScale; it('should not scale based on no data', async () => { - const scale = async (data) => { - streamService.reportStats([data]); - } - - const list = { + const data = { nodeName: 'D', }; - await scale(list); - const { required } = autoScale(list.nodeName); + await scale(data); + const { required } = autoScale(data.nodeName); expect(required).to.equal(0, `required=${required}, suppose to be 0`); }); it('should init scale when there is a queue', async () => { - const scale = async (data) => { - streamService.reportStats([data]); - } - - const list = { + const data = { nodeName: 'D', queueSize: 1, netDurations }; - await scale(list); - const { required } = autoScale(list.nodeName); + await scale(data); + const { required } = autoScale(data.nodeName); expect(required).to.equal(replicasOnFirstScale, `required=${required}, suppose to be ${replicasOnFirstScale}`); }); it('should init scale when there is request rate', async () => { - const scale = async (data) => { - data.sent += 10; - streamService.reportStats([data]); - await delay(100); - } - - const list = { + const data = { nodeName: 'D', sent: 10, queueSize: 0, netDurations }; + const reqRateInfo = { + sent: 10, + delayTime: 100 + } - await scale(list); - await scale(list); - const { required } = autoScale(list.nodeName); + await scale(data, reqRateInfo, 2); + const { required } = autoScale(data.nodeName); expect(required).to.equal(replicasOnFirstScale, `required=${required}, suppose to be ${replicasOnFirstScale}`); }); it('should init scale to minimum requirement, when there is a queue', async () => { - const scale = async (data) => { - streamService.reportStats([data]); - } - - const nodeName = 'E'; - const list = { - nodeName, + const data = { + nodeName: 'E', queueSize: 1, netDurations }; - await scale(list); - const { required } = autoScale(list.nodeName); - const min = pipeline.nodes.filter((node) => nodeName === node.nodeName)[0].minStatelessCount; + await scale(data); + const { required } = autoScale(data.nodeName); + const min = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].minStatelessCount; expect(required).to.be.equal(min, `required=${required}, suppose to be ${min}`); }); it('should init scale and not pass maximum requirement, when there is a queue', async () => { - const scale = async (data) => { - streamService.reportStats([data]); - } - - const nodeName = 'F'; - const list = { - nodeName, + const data = { + nodeName: 'F', queueSize: 1, netDurations }; - await scale(list); - const { required } = autoScale(list.nodeName); - const max = pipeline.nodes.filter((node) => nodeName === node.nodeName)[0].maxStatelessCount; + await scale(data); + const { required } = autoScale(data.nodeName); + const max = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].maxStatelessCount; expect(required).to.be.equal(max, `required=${required}, suppose to be ${max}`); }); - // it.only('should scale based on all params', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP (NOT GIVEN HERE) - // const queueSize = 0; - // const responses = 0; - // const currentSize = 0; + it('should scale based on roundTrip, queueSize, currentSize', async () => { + const data = { + nodeName: 'D', + queueSize: 0, + currentSize: 1, + durations + }; + const reqRateInfo = { + queueSize: 200, + delayTime: 500 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + expect(required).to.equal(15, `required is ${required}, suppose to be 15`); + }); - // const scale = async (data) => { - // data[0].queueSize += 200; - // data[0].responses += 1; - // streamService.reportStats(data); - // await delay(50); - // } - // const list = [{ - // nodeName: 'D', - // queueSize, - // currentSize, - // netDurations, - // responses - // }]; - // await scale(list); - // await scale(list); - // await scale(list); - // await scale(list); - // const scale1 = autoScale(list[0].nodeName); - // const scale2 = autoScale(list[0].nodeName); - // const scale3 = autoScale(list[0].nodeName); - // expect(scale1.required).to.gte(20); - // expect(scale2.required).to.gte(20); - // expect(scale3.required).to.gte(20); - // }).timeout(1000000000000); - // it('should scale based on queueSize and responses only', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // streamService.reportStats(data); - // } - // const list = [{ - // nodeName: 'D', - // queueSize: 500, - // responses: 100, - // netDurations - // }]; - // await scale(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(2); - // }); - // it.only('should scale up based on high req/res rate', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].currentSize = 0; - // data[0].queueSize += 200; - // data[0].responses = 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // sent: 0, - // queueSize: 0, - // responses: 0, - // netDurations - // }]; - // await requests(list); - // await requests(list); - // await requests(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(30); - // }); - // it('should scale based on high durations', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // data[0].sent += 400; - // data[0].responses += 30; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName: 'D', - // sent: 0, - // responses: 0, - // netDurations - // }]; - // await scale(list); - // await scale(list); - // await scale(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(10); - // }); - // it('should scale based on low durations', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // data[0].sent += 100; - // data[0].responses += 0; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName: 'D', - // sent: 0, - // responses: 0, - // netDurations - // }]; - // await scale(list); - // await scale(list); - // await scale(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(1); - // }); - // it('should scale only up based on req/res rate', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // data[0].sent += 10; - // data[0].responses += 3; - // streamService.reportStats(data); - // await delay(50); - // } - // const increaseSize = async (data) => { - // data[0].responses += 1; - // data[0].currentSize += 2; - // streamService.reportStats(data); - // await delay(50); - // } - // const list = [{ - // nodeName: 'D', - // sent: 10, - // queueSize: 0, - // currentSize: 0, - // netDurations, - // responses: 3 - // }]; - // await scale(list); - // await scale(list); - // const jobs1 = autoScale(list[0].nodeName); - // await increaseSize(list); - // const jobs2 = autoScale(list[0].nodeName); - // await increaseSize(list); - // autoScale(list[0].nodeName); - // const jobs3 = autoScale(list[0].nodeName); - // await scale(list); - // await scale(list); - // const jobs4 = autoScale(list[0].nodeName); - // const jobs5 = autoScale(list[0].nodeName); - // const jobs6 = autoScale(list[0].nodeName); - // expect(jobs1.required).to.gte(4); - // expect(jobs2.required).to.gte(4); - // expect(jobs3.required).to.gte(4); - // expect(jobs4.required).to.gte(4); - // expect(jobs5.required).to.gte(4); - // expect(jobs6.required).to.gte(4); - // }); // it('should scale only up based on req/res rate with a maxStatelessCount limit', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP // const scale = async (data) => { // data[0].sent += 10; From 8453de8168dcacc6236fb53482a7e7694eae8dfc Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 15:27:06 +0200 Subject: [PATCH 07/13] removed netDurations --- core/worker/test/streaming.js | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index 9420b0d3d..91dd12732 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -71,10 +71,9 @@ const checkMetrics = () => { const msgPerSec = 30; const duration = SEC / msgPerSec; -const netDurations = Array.from(Array(10).fill(duration)); const durations = Array.from(Array(10).fill(duration)); -describe('Streaming', () => { +describe.only('Streaming', () => { before(async () => { await stateAdapter._db.jobs.create({ pipeline, jobId }); @@ -117,7 +116,7 @@ describe('Streaming', () => { } }; - describe.only('scale-up', () => { + describe('scale-up', () => { const replicasOnFirstScale = require('../config/main/config.base.js').streaming.autoScaler.scaleUp.replicasOnFirstScale; it('should not scale based on no data', async () => { @@ -133,8 +132,7 @@ describe('Streaming', () => { it('should init scale when there is a queue', async () => { const data = { nodeName: 'D', - queueSize: 1, - netDurations + queueSize: 1 }; await scale(data); @@ -146,8 +144,7 @@ describe('Streaming', () => { const data = { nodeName: 'D', sent: 10, - queueSize: 0, - netDurations + queueSize: 0 }; const reqRateInfo = { sent: 10, @@ -162,8 +159,7 @@ describe('Streaming', () => { it('should init scale to minimum requirement, when there is a queue', async () => { const data = { nodeName: 'E', - queueSize: 1, - netDurations + queueSize: 1 }; await scale(data); @@ -175,8 +171,7 @@ describe('Streaming', () => { it('should init scale and not pass maximum requirement, when there is a queue', async () => { const data = { nodeName: 'F', - queueSize: 1, - netDurations + queueSize: 1 }; await scale(data); @@ -459,7 +454,7 @@ describe('Streaming', () => { const data = [{ nodeName, queueSize: 10, - netDurations + durations }]; streamService.reportStats(data); streamService.reportStats(data); @@ -472,11 +467,11 @@ describe('Streaming', () => { const statsData = masters[0]._autoScaler._statistics._data; const key = Object.keys(statsData)[0]; const stats = statsData[key]; - const { requests, responses, durations } = stats; + const { requests, responses, grossDurations } = stats; const maxSizeWindow = testParams.config.streaming.autoScaler.statistics.maxSizeWindow; expect(requests.items).to.have.lengthOf(maxSizeWindow); expect(responses.items).to.have.lengthOf(maxSizeWindow); - expect(durations.items).to.have.lengthOf(maxSizeWindow * 10); + expect(grossDurations.items).to.have.lengthOf(maxSizeWindow * 10); }); }); describe('metrics', () => { @@ -569,9 +564,9 @@ describe('Streaming', () => { await delay(20); } const currentSize = 0; - const list = [{ nodeName, queueSize: 150, responses: 30, netDurations, currentSize }]; - const list1 = { nodeName, queueSize: 300, responses: 80, netDurations, currentSize }; - const list2 = { nodeName, queueSize: 450, responses: 140, netDurations, currentSize }; + const list = [{ nodeName, queueSize: 150, responses: 30, currentSize }]; + const list1 = { nodeName, queueSize: 300, responses: 80, currentSize }; + const list2 = { nodeName, queueSize: 450, responses: 140, currentSize }; const slave1 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); const slave2 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); const slave3 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); From fec78ac3bdfeeab1a1595fff5f0679f3aae7e811 Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 15:58:53 +0200 Subject: [PATCH 08/13] completed scale up tests --- core/worker/test/streaming.js | 99 ++++++++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 14 deletions(-) diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index 91dd12732..0b6879048 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -69,7 +69,7 @@ const checkMetrics = () => { return streamService._metrics._checkMetrics() || []; } -const msgPerSec = 30; +const msgPerSec = 50; // Equals pod rate const duration = SEC / msgPerSec; const durations = Array.from(Array(10).fill(duration)); @@ -106,8 +106,8 @@ describe.only('Streaming', () => { const scale = async (data, reqRateInfo = {}, repeatCount = 1) => { const { queueSize = 0, sent = 0, delayTime = 0 } = reqRateInfo; for (let i = 0; i < repeatCount; i++) { - data.queueSize !== undefined && (data.queueSize += queueSize); - data.sent !== undefined && (data.sent += sent); + data.queueSize = (data.queueSize || 0) + queueSize; + data.sent = (data.sent || 0) + sent; streamService.reportStats([data]); if (delayTime > 0) { @@ -116,13 +116,13 @@ describe.only('Streaming', () => { } }; - describe('scale-up', () => { + describe.only('scale-up', () => { const replicasOnFirstScale = require('../config/main/config.base.js').streaming.autoScaler.scaleUp.replicasOnFirstScale; it('should not scale based on no data', async () => { const data = { nodeName: 'D', - }; + } await scale(data); const { required } = autoScale(data.nodeName); @@ -133,7 +133,7 @@ describe.only('Streaming', () => { const data = { nodeName: 'D', queueSize: 1 - }; + } await scale(data); const { required } = autoScale(data.nodeName); @@ -143,9 +143,8 @@ describe.only('Streaming', () => { it('should init scale when there is request rate', async () => { const data = { nodeName: 'D', - sent: 10, - queueSize: 0 - }; + sent: 10 + } const reqRateInfo = { sent: 10, delayTime: 100 @@ -160,7 +159,7 @@ describe.only('Streaming', () => { const data = { nodeName: 'E', queueSize: 1 - }; + } await scale(data); const { required } = autoScale(data.nodeName); @@ -172,7 +171,7 @@ describe.only('Streaming', () => { const data = { nodeName: 'F', queueSize: 1 - }; + } await scale(data); const { required } = autoScale(data.nodeName); @@ -183,10 +182,9 @@ describe.only('Streaming', () => { it('should scale based on roundTrip, queueSize, currentSize', async () => { const data = { nodeName: 'D', - queueSize: 0, currentSize: 1, durations - }; + } const reqRateInfo = { queueSize: 200, delayTime: 500 @@ -194,7 +192,80 @@ describe.only('Streaming', () => { await scale(data, reqRateInfo, 4); const { required } = autoScale(data.nodeName); - expect(required).to.equal(15, `required is ${required}, suppose to be 15`); + expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + }); + + it('should scale based on all params', async () => { + const data = { + nodeName: 'D', + currentSize: 1, + durations + } + const reqRateInfo = { + queueSize: 150, + sent: 50, + delayTime: 500 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + }); + + it('should scale based on all params, when currentSize is 0 and there are responses already', async () => { + const data = { + nodeName: 'D', + currentSize: 0, + durations, + responses: 1 + } + const reqRateInfo = { + queueSize: 150, + sent: 50, + delayTime: 500 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + }); + + it('should scale based on all params, and there are responses already, and not pass max stateless', async () => { + const data = { + nodeName: 'F', + currentSize: 1, + durations, + responses: 1 + } + const reqRateInfo = { + queueSize: 150, + sent: 50, + delayTime: 500 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + const max = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].maxStatelessCount; + expect(required).to.equal(max, `required is ${required}, suppose to be ${max}`); + }); + + it('should scale based on all params, and there are responses already, and have min stateless', async () => { + const data = { + nodeName: 'E', + currentSize: 1, + durations, + responses: 1 + } + const reqRateInfo = { + queueSize: 1, + sent: 1, + delayTime: 500 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + const min = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].minStatelessCount; + expect(required).to.equal(min, `required is ${required}, suppose to be ${min}`); }); // it('should scale only up based on req/res rate with a maxStatelessCount limit', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP From 83ba64c425ec60a160861b03b4ac937f6ccdf92d Mon Sep 17 00:00:00 2001 From: Adir111 Date: Thu, 14 Nov 2024 16:08:26 +0200 Subject: [PATCH 09/13] removed un-necessary logging --- core/worker/lib/streaming/core/metrics.js | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/worker/lib/streaming/core/metrics.js b/core/worker/lib/streaming/core/metrics.js index 706578a27..644c2ac5e 100644 --- a/core/worker/lib/streaming/core/metrics.js +++ b/core/worker/lib/streaming/core/metrics.js @@ -1,16 +1,10 @@ const { mean } = require('@hkube/stats'); -const Logger = require('@hkube/logger'); - const _calcRate = (list) => { let first = list[0]; if (list.length === 1) { first = { time: first.time - 2000, count: 0 }; } const last = list[list.length - 1]; - - const log = Logger.GetLogFromContainer(); - log.info(`STATISTICS: first value: ${first.count}, last value: ${last.count}, time diff: ${last.time - first.time} ms`); - const timeDiff = (last.time - first.time) / 1000; const countDiff = last.count - first.count; let rate = 0; From d136bc172fc8a80bd584b9062d742c4dd641dc38 Mon Sep 17 00:00:00 2001 From: Adir111 Date: Sun, 17 Nov 2024 09:17:10 +0200 Subject: [PATCH 10/13] organise file --- core/worker/test/streaming.js | 118 +++++++++++----------------------- 1 file changed, 37 insertions(+), 81 deletions(-) diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index 0b6879048..43c079d28 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -69,11 +69,41 @@ const checkMetrics = () => { return streamService._metrics._checkMetrics() || []; } +/** + * Adjusts the `data` object by adding rate-related statistics from the `reqRateInfo` + * parameter, then reports the updated statistics via `streamService` and introduces a delay. + * The operation is repeated `repeatCount` times. + * + * Note - `reqRate` is calculated as Δ count / Δ time, meaning `(queueSize + sent) / delayTime`. + * When `delayTime` is 1000 (1 second), this gives the exact `reqRate`. + * +* @async +* @param {Object} data - The data object holding current statistics, to be updated. +* @param {Object} [reqRateInfo={}] - Rate-related information to be added to `data`. +* @param {number} [reqRateInfo.queueSize=0] - The number of items in the request queue to be added. +* @param {number} [reqRateInfo.sent=0] - The number of requests sent, to be added to `data.sent`. +* @param {number} [reqRateInfo.delayTime=0] - The delay time (in milliseconds) to wait after each report. +* @param {number} [repeatCount=1] - The number of times to repeat the scaling and reporting operation. +* +* @returns {Promise} - Resolves after completing the specified number of repetitions. +*/ +const scale = async (data, reqRateInfo = {}, repeatCount = 1, slave) => { + const { queueSize = 0, sent = 0, delayTime = 0 } = reqRateInfo; + for (let i = 0; i < repeatCount; i++) { + data.queueSize = (data.queueSize || 0) + queueSize; + data.sent = (data.sent || 0) + sent; + slave ? slave.report(data) : streamService.reportStats([data]); + if (delayTime > 0) { + await delay(delayTime); + } + } +}; + const msgPerSec = 50; // Equals pod rate const duration = SEC / msgPerSec; const durations = Array.from(Array(10).fill(duration)); -describe.only('Streaming', () => { +describe('Streaming', () => { before(async () => { await stateAdapter._db.jobs.create({ pipeline, jobId }); @@ -85,38 +115,7 @@ describe.only('Streaming', () => { masters.map(m => m.reset()); }) - /** - * Adjusts the `data` object by adding rate-related statistics from the `reqRateInfo` - * parameter, then reports the updated statistics via `streamService` and introduces a delay. - * The operation is repeated `repeatCount` times. - * - * Note - `reqRate` is calculated as Δ count / Δ time, meaning `(queueSize + sent) / delayTime`. - * When `delayTime` is 1000 (1 second), this gives the exact `reqRate`. - * - * @async - * @param {Object} data - The data object holding current statistics, to be updated. - * @param {Object} [reqRateInfo={}] - Rate-related information to be added to `data`. - * @param {number} [reqRateInfo.queueSize=0] - The number of items in the request queue to be added. - * @param {number} [reqRateInfo.sent=0] - The number of requests sent, to be added to `data.sent`. - * @param {number} [reqRateInfo.delayTime=0] - The delay time (in milliseconds) to wait after each report. - * @param {number} [repeatCount=1] - The number of times to repeat the scaling and reporting operation. - * - * @returns {Promise} - Resolves after completing the specified number of repetitions. - */ - const scale = async (data, reqRateInfo = {}, repeatCount = 1) => { - const { queueSize = 0, sent = 0, delayTime = 0 } = reqRateInfo; - for (let i = 0; i < repeatCount; i++) { - data.queueSize = (data.queueSize || 0) + queueSize; - data.sent = (data.sent || 0) + sent; - streamService.reportStats([data]); - - if (delayTime > 0) { - await delay(delayTime); - } - } - }; - - describe.only('scale-up', () => { + describe('scale-up', () => { const replicasOnFirstScale = require('../config/main/config.base.js').streaming.autoScaler.scaleUp.replicasOnFirstScale; it('should not scale based on no data', async () => { @@ -179,7 +178,7 @@ describe.only('Streaming', () => { expect(required).to.be.equal(max, `required=${required}, suppose to be ${max}`); }); - it('should scale based on roundTrip, queueSize, currentSize', async () => { + it('should scale up based on roundTrip, queueSize, currentSize', async () => { const data = { nodeName: 'D', currentSize: 1, @@ -195,7 +194,7 @@ describe.only('Streaming', () => { expect(required).to.equal(9, `required is ${required}, suppose to be 9`); }); - it('should scale based on all params', async () => { + it('should scale up based on all params', async () => { const data = { nodeName: 'D', currentSize: 1, @@ -212,7 +211,7 @@ describe.only('Streaming', () => { expect(required).to.equal(9, `required is ${required}, suppose to be 9`); }); - it('should scale based on all params, when currentSize is 0 and there are responses already', async () => { + it('should scale up based on all params, when currentSize is 0 and there are responses already', async () => { const data = { nodeName: 'D', currentSize: 0, @@ -230,7 +229,7 @@ describe.only('Streaming', () => { expect(required).to.equal(9, `required is ${required}, suppose to be 9`); }); - it('should scale based on all params, and there are responses already, and not pass max stateless', async () => { + it('should scale up based on all params, and there are responses already, and not exceeding max stateless', async () => { const data = { nodeName: 'F', currentSize: 1, @@ -249,7 +248,7 @@ describe.only('Streaming', () => { expect(required).to.equal(max, `required is ${required}, suppose to be ${max}`); }); - it('should scale based on all params, and there are responses already, and have min stateless', async () => { + it('should scale up based on all params, and there are responses already, and have min stateless', async () => { const data = { nodeName: 'E', currentSize: 1, @@ -267,49 +266,6 @@ describe.only('Streaming', () => { const min = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].minStatelessCount; expect(required).to.equal(min, `required is ${required}, suppose to be ${min}`); }); - - // it('should scale only up based on req/res rate with a maxStatelessCount limit', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // data[0].sent += 10; - // data[0].responses += 3; - // streamService.reportStats(data); - // await delay(50); - // } - // const increaseSize = async (data) => { - // data[0].responses += 1; - // data[0].currentSize += 2; - // streamService.reportStats(data); - // await delay(50); - // } - // const list = [{ - // nodeName: 'F', - // sent: 10, - // queueSize: 0, - // currentSize: 0, - // netDurations, - // responses: 3 - // }]; - // const scaledNode = pipeline.nodes[5] - // await scale(list); - // await scale(list); - // const jobs1 = autoScale(list[0].nodeName); - // await increaseSize(list); - // const jobs2 = autoScale(list[0].nodeName); - // await increaseSize(list); - // autoScale(list[0].nodeName); - // const jobs3 = autoScale(list[0].nodeName); - // await scale(list); - // await scale(list); - // const jobs4 = autoScale(list[0].nodeName); - // const jobs5 = autoScale(list[0].nodeName); - // const jobs6 = autoScale(list[0].nodeName); - // expect(jobs1.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs2.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs3.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs4.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs5.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs6.required).to.eql(scaledNode.maxStatelessCount); - // }); }); describe('scale-down', () => { From 64bb2a31712ec915112c21e3c442f6224c6b0b57 Mon Sep 17 00:00:00 2001 From: Adir111 Date: Sun, 17 Nov 2024 09:17:50 +0200 Subject: [PATCH 11/13] scale down tests added --- core/worker/test/streaming.js | 171 ++++++++++------------------------ 1 file changed, 51 insertions(+), 120 deletions(-) diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index 43c079d28..18084090e 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -269,126 +269,57 @@ describe('Streaming', () => { }); describe('scale-down', () => { - // it('should scale up and down based on durations', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requestsUp = async (data) => { - // data[0].queueSize += 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const responsesUp = async (data) => { - // data[0].responses += 100; - // data[0].sent = 200; - // data[0].queueSize = 0; - // data[0].currentSize += 1; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // currentSize: 0, - // sent: 0, - // queueSize: 0, - // responses: 0, - // netDurations - // }]; - // await requestsUp(list); - // await requestsUp(list); - // const jobs1 = autoScale(list[0].nodeName); - // const jobs2 = autoScale(list[0].nodeName); - // await delay(200) - // await responsesUp(list); - // await responsesUp(list); - // const jobs3 = autoScale(list[0].nodeName); - // const jobs4 = autoScale(list[0].nodeName); - // expect(jobs1.required).to.gte(1); - // expect(jobs2.required).to.gte(1); - // expect(jobs3.required).to.gte(7); - // expect(jobs4.required).to.gte(7); - // }); - // it('should scale up and down based on no requests and no responses', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requestsUp = async (data) => { - // data[0].sent = 100; - // data[0].responses = 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // currentSize: 0, - // sent: 0, - // responses: 0, - // netDurations - // }]; - // await requestsUp(list); - // await requestsUp(list); - // await requestsUp(list); - // await requestsUp(list); - // const scale = autoScale(list[0].nodeName); - // expect(scale.required).to.eql(0); - // }); - // it('should scale down based on zero ratio', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].queueSize = 100; - // data[0].responses = 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // sent: 0, - // currentSize: 5, - // queueSize: 0, - // responses: 0 - // }]; - // await requests(list); - // await requests(list); - // await requests(list); - // await requests(list); - // const scale = autoScale(list[0].nodeName); - // expect(scale.required).to.eql(0); - // }); - // it('should not scale down based on responses', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].currentSize = 5; - // data[0].responses += 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // responses: 0 - // }]; - // await requests(list); - // await requests(list); - // await requests(list); - // const scale = autoScale(list[0].nodeName); - // expect(scale.required).to.eql(0); - // }); - // it('should not scale down based on currentSize', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].currentSize = 1; - // data[0].queueSize = 0; - // data[0].responses += 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // sent: 0, - // queueSize: 0, - // responses: 0 - // }]; - // await requests(list); - // await requests(list); - // await requests(list); - // const scale = autoScale(list[0].nodeName); - // expect(scale.required).to.eql(0); - // }); + it('should scale down based on roundTrip, queueSize, currentSize', async () => { + const data = { + nodeName: 'D', + currentSize: 100, + durations + } + const reqRateInfo = { + queueSize: 200, + delayTime: 500 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + }); + + it('should scale down based on all params', async () => { + const data = { + nodeName: 'D', + currentSize: 100, + durations + } + const reqRateInfo = { + queueSize: 150, + sent: 50, + delayTime: 500 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + }); + + it('should scale up based on all params, and there are responses already, and have min stateless', async () => { + const data = { + nodeName: 'E', + currentSize: 20, + durations, + responses: 1 + } + const reqRateInfo = { + queueSize: 1, + sent: 1, + delayTime: 500 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + const min = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].minStatelessCount; + expect(required).to.equal(min, `required is ${required}, suppose to be ${min}`); + }); }); describe('scale-conflicts', () => { // it('should only scale up based on master', async () => { From 51ae55d117a69cae037765f582818b3f57bee63f Mon Sep 17 00:00:00 2001 From: Adir111 Date: Sun, 17 Nov 2024 11:20:40 +0200 Subject: [PATCH 12/13] fixed scale conflicts tests for new logic, decreased tests time --- core/worker/test/mocks/pipeline.json | 2 +- core/worker/test/streaming.js | 248 +++++++++++++++------------ 2 files changed, 142 insertions(+), 108 deletions(-) diff --git a/core/worker/test/mocks/pipeline.json b/core/worker/test/mocks/pipeline.json index baf63e920..a9f46fbb9 100644 --- a/core/worker/test/mocks/pipeline.json +++ b/core/worker/test/mocks/pipeline.json @@ -41,7 +41,7 @@ "algorithmName": "eval-alg", "input": [], "stateType": "stateless", - "minStatelessCount": 3 + "minStatelessCount": 10 }, { "nodeName": "F", diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index 18084090e..2b6c9ece4 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -71,23 +71,26 @@ const checkMetrics = () => { /** * Adjusts the `data` object by adding rate-related statistics from the `reqRateInfo` - * parameter, then reports the updated statistics via `streamService` and introduces a delay. - * The operation is repeated `repeatCount` times. + * parameter, then reports the updated statistics via `streamService` or the optional `slave` parameter, + * and introduces a delay. The operation is repeated `repeatCount` times. * * Note - `reqRate` is calculated as Δ count / Δ time, meaning `(queueSize + sent) / delayTime`. * When `delayTime` is 1000 (1 second), this gives the exact `reqRate`. * -* @async -* @param {Object} data - The data object holding current statistics, to be updated. -* @param {Object} [reqRateInfo={}] - Rate-related information to be added to `data`. -* @param {number} [reqRateInfo.queueSize=0] - The number of items in the request queue to be added. -* @param {number} [reqRateInfo.sent=0] - The number of requests sent, to be added to `data.sent`. -* @param {number} [reqRateInfo.delayTime=0] - The delay time (in milliseconds) to wait after each report. -* @param {number} [repeatCount=1] - The number of times to repeat the scaling and reporting operation. -* -* @returns {Promise} - Resolves after completing the specified number of repetitions. -*/ -const scale = async (data, reqRateInfo = {}, repeatCount = 1, slave) => { + * @async + * @param {Object} data - The data object holding current statistics, to be updated. + * @param {Object} [reqRateInfo={}] - Rate-related information to be added to `data`. + * @param {number} [reqRateInfo.queueSize=0] - The number of items in the request queue to be added. + * @param {number} [reqRateInfo.sent=0] - The number of requests sent, to be added to `data.sent`. + * @param {number} [reqRateInfo.delayTime=0] - The delay time (in milliseconds) to wait after each report. + * @param {number} [repeatCount=1] - The number of times to repeat the scaling and reporting operation. + * @param {Object} [slave=undefined] - Optional object with a `report` method, used for reporting statistics. + * If provided, its `report` method is called with `data`. + * If not provided, `streamService.reportStats` is used. + * + * @returns {Promise} - Resolves after completing the specified number of repetitions. + */ +const scale = async (data, reqRateInfo = {}, repeatCount = 1, slave = undefined) => { const { queueSize = 0, sent = 0, delayTime = 0 } = reqRateInfo; for (let i = 0; i < repeatCount; i++) { data.queueSize = (data.queueSize || 0) + queueSize; @@ -145,8 +148,8 @@ describe('Streaming', () => { sent: 10 } const reqRateInfo = { - sent: 10, - delayTime: 100 + sent: 1, + delayTime: 10 } await scale(data, reqRateInfo, 2); @@ -185,13 +188,15 @@ describe('Streaming', () => { durations } const reqRateInfo = { - queueSize: 200, - delayTime: 500 + queueSize: 20, + delayTime: 50 } await scale(data, reqRateInfo, 4); + await delay(500); const { required } = autoScale(data.nodeName); - expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); }); it('should scale up based on all params', async () => { @@ -201,14 +206,16 @@ describe('Streaming', () => { durations } const reqRateInfo = { - queueSize: 150, - sent: 50, - delayTime: 500 + queueSize: 15, + sent: 5, + delayTime: 50 } await scale(data, reqRateInfo, 4); + await delay(500); const { required } = autoScale(data.nodeName); - expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); }); it('should scale up based on all params, when currentSize is 0 and there are responses already', async () => { @@ -219,14 +226,16 @@ describe('Streaming', () => { responses: 1 } const reqRateInfo = { - queueSize: 150, - sent: 50, - delayTime: 500 + queueSize: 15, + sent: 5, + delayTime: 50 } await scale(data, reqRateInfo, 4); + await delay(500); const { required } = autoScale(data.nodeName); - expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); }); it('should scale up based on all params, and there are responses already, and not exceeding max stateless', async () => { @@ -237,9 +246,9 @@ describe('Streaming', () => { responses: 1 } const reqRateInfo = { - queueSize: 150, - sent: 50, - delayTime: 500 + queueSize: 3, + sent: 1, + delayTime: 10 } await scale(data, reqRateInfo, 4); @@ -258,7 +267,7 @@ describe('Streaming', () => { const reqRateInfo = { queueSize: 1, sent: 1, - delayTime: 500 + delayTime: 10 } await scale(data, reqRateInfo, 4); @@ -276,13 +285,15 @@ describe('Streaming', () => { durations } const reqRateInfo = { - queueSize: 200, - delayTime: 500 + queueSize: 20, + delayTime: 50 } await scale(data, reqRateInfo, 4); + await delay(500); const { required } = autoScale(data.nodeName); - expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); }); it('should scale down based on all params', async () => { @@ -292,14 +303,15 @@ describe('Streaming', () => { durations } const reqRateInfo = { - queueSize: 150, - sent: 50, - delayTime: 500 + queueSize: 15, + sent: 5, + delayTime: 50 } await scale(data, reqRateInfo, 4); const { required } = autoScale(data.nodeName); - expect(required).to.equal(9, `required is ${required}, suppose to be 9`); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); }); it('should scale up based on all params, and there are responses already, and have min stateless', async () => { @@ -312,7 +324,7 @@ describe('Streaming', () => { const reqRateInfo = { queueSize: 1, sent: 1, - delayTime: 500 + delayTime: 10 } await scale(data, reqRateInfo, 4); @@ -321,77 +333,97 @@ describe('Streaming', () => { expect(required).to.equal(min, `required is ${required}, suppose to be ${min}`); }); }); + describe('scale-conflicts', () => { - // it('should only scale up based on master', async () => { - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].queueSize += 100; - // data[0].responses += 50; - // streamService.reportStats(data); - // await delay(50); - // } - // const reportSlave = async (slave, data) => { - // data.queueSize += 100; - // data.responses += 50; - // slave.report(data); - // await delay(50); - // } - // const currentSize = 0; - // const list1 = [{ nodeName, queueSize: 150, responses: 30, netDurations, currentSize }]; - // const list2 = { nodeName, queueSize: 450, responses: 150, netDurations, currentSize }; - // const slave = new SlaveAdapter({ jobId, nodeName, source: 'B' }); - // await requests(list1); - // await requests(list1); - // await requests(list1); - // await requests(list1); - // await reportSlave(slave, list2); - // await reportSlave(slave, list2); - // await reportSlave(slave, list2); - // await reportSlave(slave, list2); - // const scale = autoScale(nodeName); - // expect(scale.required).to.gte(30); - // }); - // it('should not scale up based on avg master and slaves', async () => { - // const nodeName = 'D'; - // const reportSlave = async (slave, data) => { - // data.queueSize += 100; - // data.responses += 50; - // slave.report(data); - // await delay(50) - // } - // const currentSize = 0; - // const list1 = { nodeName, queueSize: 300, responses: 40, netDurations, currentSize }; - // const list2 = { nodeName, queueSize: 300, responses: 60, netDurations, currentSize }; - // const list3 = { nodeName, queueSize: 300, responses: 80, netDurations, currentSize }; - // const list4 = { nodeName, queueSize: 300, responses: 100, netDurations, currentSize }; - // const slave1 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); - // const slave2 = new SlaveAdapter({ jobId, nodeName, source: 'B' }); - // const slave3 = new SlaveAdapter({ jobId, nodeName, source: 'C' }); - // const slave4 = new SlaveAdapter({ jobId, nodeName, source: 'D' }); - // await reportSlave(slave1, list1); - // await reportSlave(slave1, list1); - // await reportSlave(slave1, list1); - // await reportSlave(slave1, list1); - - // await reportSlave(slave2, list2); - // await reportSlave(slave2, list2); - // await reportSlave(slave2, list2); - // await reportSlave(slave2, list2); - - // slave3.report(list3); - // slave3.report(list3); - // slave3.report(list3); - // slave3.report(list3); - - // slave4.report(list4); - // slave4.report(list4); - // slave4.report(list4); - // slave4.report(list4); - // await delay(200); - // const scale = autoScale(nodeName); - // expect(scale.required).to.gte(30); - // }); + it('should only scale up based on master', async () => { + const nodeName = 'D'; + const currentSize = 0; + const slave = new SlaveAdapter({ jobId, nodeName, source: 'B' }); + const data1 = { + nodeName, + queueSize: 150, + responses: 30, + sent: 30, + durations, + currentSize + } + const data2 = { + nodeName, + queueSize: 4500, + responses: 150, + sent: 150, + durations, + currentSize + } + const reqRateInfo = { + queueSize: 20, + sent: 10, + delayTime: 50 + } + + await scale(data1, reqRateInfo, 4); + await scale(data2, reqRateInfo, 4, slave); + await delay(1000); + const { required } = autoScale(nodeName); + expect(required).to.be.equal(28, `required is ${required}, suppose to be 28`); + }); + + it('should not scale up based on avg master and slaves', async () => { + const nodeName = 'D'; + const currentSize = 0; + const data1 = { + nodeName, + queueSize: 300, + responses: 40, + sent: 40, + durations, + currentSize + } + const data2 = { + nodeName, + queueSize: 300, + responses: 60, + sent: 60, + durations, + currentSize + } + const data3 = { + nodeName, + queueSize: 300, + responses: 80, + sent: 80, + durations, + currentSize + } + const data4 = { + nodeName, + queueSize: 300, + responses: 100, + sent: 100, + durations, + currentSize + } + const reqRateInfo = { + queueSize: 20, + sent: 10, + delayTime: 50 + } + + const slave1 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); + const slave2 = new SlaveAdapter({ jobId, nodeName, source: 'B' }); + const slave3 = new SlaveAdapter({ jobId, nodeName, source: 'C' }); + const slave4 = new SlaveAdapter({ jobId, nodeName, source: 'D' }); + await scale(data1, reqRateInfo, 4, slave1); + await scale(data2, reqRateInfo, 4, slave2); + await scale(data3, reqRateInfo, 4, slave3); + await scale(data4, reqRateInfo, 4, slave4); + await delay(1000); + const { required } = autoScale(nodeName); + expect(required).to.be.gte(49, `required is ${required}, suppose to be 49-50`); + expect(required).to.be.lte(50, `required is ${required}, suppose to be 49-50`); + }); }); + describe('no-scale', () => { it('should not scale when no relevant data', async () => { const reportStats = async (data) => { @@ -407,6 +439,7 @@ describe('Streaming', () => { const scale = autoScale(list[0].nodeName); expect(scale.required).to.eql(0); }); + it('should not over the maxSizeWindow', async () => { const nodeName = 'D'; const data = [{ @@ -432,6 +465,7 @@ describe('Streaming', () => { expect(grossDurations.items).to.have.lengthOf(maxSizeWindow * 10); }); }); + describe('metrics', () => { it('should scale and update metrics', async () => { const nodeName = 'D'; From b4a53da1bfde04f37d5509bb9fa59f06d6b8ba21 Mon Sep 17 00:00:00 2001 From: Adir111 Date: Sun, 17 Nov 2024 12:04:48 +0200 Subject: [PATCH 13/13] organise tests --- core/worker/test/streaming.js | 68 ++++++++++++++--------------------- 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index 2b6c9ece4..f1fa6def2 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -426,33 +426,24 @@ describe('Streaming', () => { describe('no-scale', () => { it('should not scale when no relevant data', async () => { - const reportStats = async (data) => { - streamService.reportStats(data); - await delay(100); - } - const list = [{ + const data = { nodeName: 'D' - }]; - await reportStats(list); - await reportStats(list); - await reportStats(list); - const scale = autoScale(list[0].nodeName); - expect(scale.required).to.eql(0); + }; + + await scale(data, { delayTime: 100 }, 3); + const { required } = autoScale(data.nodeName); + expect(required).to.eql(0); }); it('should not over the maxSizeWindow', async () => { const nodeName = 'D'; - const data = [{ + const data = { nodeName, queueSize: 10, durations - }]; - streamService.reportStats(data); - streamService.reportStats(data); - streamService.reportStats(data); - streamService.reportStats(data); - streamService.reportStats(data); - streamService.reportStats(data); + }; + + await scale(data, {}, 14); let masters = getMasters(); masters = masters.filter(m => m.nodeName === nodeName); const statsData = masters[0]._autoScaler._statistics._data; @@ -520,6 +511,7 @@ describe('Streaming', () => { expect(metricsUid3.totalDropped).to.eql(dropped * 3); }); }); + describe('master-slaves', () => { it('should get slaves', async () => { const nodeName = 'D'; @@ -541,6 +533,7 @@ describe('Streaming', () => { const slaves = masters[0].slaves(); expect(slaves.sort()).to.deep.equal([slave1.source, slave2.source]) }); + it('metrics test', async () => { const nodeName = 'D'; const requests = async (data) => { @@ -563,29 +556,14 @@ describe('Streaming', () => { const slave2 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); const slave3 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); const slave4 = new SlaveAdapter({ jobId, nodeName, source: 'B' }); - await requests(list); - await requests(list); - await requests(list); - await requests(list); - await reportSlave(slave1, list1); - await reportSlave(slave1, list1); - await reportSlave(slave1, list1); - await reportSlave(slave1, list1); - - await reportSlave(slave2, list1); - await reportSlave(slave2, list1); - await reportSlave(slave2, list1); - await reportSlave(slave2, list1); - - await reportSlave(slave3, list1); - await reportSlave(slave3, list1); - await reportSlave(slave3, list1); - await reportSlave(slave3, list1); - - await reportSlave(slave4, list2); - await reportSlave(slave4, list2); - await reportSlave(slave4, list2); - await reportSlave(slave4, list2); + for ( i = 0; i < 4; i++) { await requests(list); } + for ( i = 0; i < 4; i++) { await reportSlave(slave1, list1); } + + for ( i = 0; i < 4; i++) { await reportSlave(slave2, list1); } + + for ( i = 0; i < 4; i++) { await reportSlave(slave3, list1); } + + for ( i = 0; i < 4; i++) { await reportSlave(slave4, list2); } await delay(200); autoScale(nodeName); @@ -595,6 +573,7 @@ describe('Streaming', () => { expect(metrics.map(t => t.source).sort()).to.eql(['A', 'B', 'C']); expect(metrics).to.have.lengthOf(3); }); + it('should start and finish correctly', async () => { expect(streamService._jobData).to.be.not.null; expect(streamService._election).to.be.not.null; @@ -611,10 +590,12 @@ describe('Streaming', () => { await streamService.start(job); }); }); + describe('discovery', () => { beforeEach(() => { discovery._discoveryMap = Object.create(null); }); + it('should add discovery and get right changes', async () => { const jobId = uid(); const nodeName = uid(); @@ -626,6 +607,7 @@ describe('Streaming', () => { expect(changes2[0].type).to.eql('Add'); expect(changes2[0].nodeName).to.eql(nodeName); }); + it('should add discovery multiple times and get right changes', async () => { const jobId = uid(); const nodeName1 = uid(); @@ -647,6 +629,7 @@ describe('Streaming', () => { expect(changes2[0].nodeName).to.eql(nodeName2); expect(changes2[1].nodeName).to.eql(nodeName1); }); + it('should add and delete discovery and get right changes', async () => { const jobId = uid(); const nodeName = uid(); @@ -662,6 +645,7 @@ describe('Streaming', () => { expect(changes2[0].type).to.eql('Del'); expect(changes2[1].type).to.eql('Del'); }); + it('should add discovery and get right changes', async () => { const jobId = uid(); const nodeName1 = uid();