diff --git a/core/worker/lib/streaming/core/metrics.js b/core/worker/lib/streaming/core/metrics.js index 706578a27..644c2ac5e 100644 --- a/core/worker/lib/streaming/core/metrics.js +++ b/core/worker/lib/streaming/core/metrics.js @@ -1,16 +1,10 @@ const { mean } = require('@hkube/stats'); -const Logger = require('@hkube/logger'); - const _calcRate = (list) => { let first = list[0]; if (list.length === 1) { first = { time: first.time - 2000, count: 0 }; } const last = list[list.length - 1]; - - const log = Logger.GetLogFromContainer(); - log.info(`STATISTICS: first value: ${first.count}, last value: ${last.count}, time diff: ${last.time - first.time} ms`); - const timeDiff = (last.time - first.time) / 1000; const countDiff = last.count - first.count; let rate = 0; diff --git a/core/worker/lib/streaming/core/statistics.js b/core/worker/lib/streaming/core/statistics.js index 7078d9906..661fd238b 100644 --- a/core/worker/lib/streaming/core/statistics.js +++ b/core/worker/lib/streaming/core/statistics.js @@ -27,7 +27,7 @@ class Statistics { stats.requests.add(this._createItem(requests)); stats.responses.add(this._createItem(responses)); stats.durations.addRange(netDurations); - stats.grossDurations.addRange(durations); + stats.grossDurations.addRange(durations); // used to calculate round trip stats.queueDurations.addRange(queueDurations); this._data[source] = { diff --git a/core/worker/test/mocks/pipeline.json b/core/worker/test/mocks/pipeline.json new file mode 100644 index 000000000..a9f46fbb9 --- /dev/null +++ b/core/worker/test/mocks/pipeline.json @@ -0,0 +1,77 @@ +{ + "name": "stream", + "kind": "stream", + "nodes": [ + { + "nodeName": "A", + "algorithmName": "eval-alg", + "input": [ + "@flowInput.arraySize", + "@flowInput.bufferSize" + ], + "stateType": "stateful", + "maxStatelessCount": 0 + }, + { + "nodeName": "B", + "algorithmName": "eval-alg", + "input": [ + "@flowInput.arraySize", + "@flowInput.bufferSize" + ], + "stateType": "stateful" + }, + { + "nodeName": "C", + "algorithmName": "eval-alg", + "input": [ + "@flowInput.arraySize", + "@flowInput.bufferSize" + ], + "stateType": "stateful" + }, + { + "nodeName": "D", + "algorithmName": "eval-alg", + "input": [], + "stateType": "stateless" + }, + { + "nodeName": "E", + "algorithmName": "eval-alg", + "input": [], + "stateType": "stateless", + "minStatelessCount": 10 + }, + { + "nodeName": "F", + "algorithmName": "eval-alg", + "input": [], + "stateType": "stateless", + "minStatelessCount": 3, + "maxStatelessCount": 2 + } + ], + "edges": [ + { "source": "A", "target": "D" }, + { "source": "B", "target": "D" }, + { "source": "C", "target": "D" }, + { "source": "A", "target": "E" }, + { "source": "B", "target": "E" }, + { "source": "C", "target": "F" } + ], + "flowInputMetadata": { + "metadata": { + "flowInput.arraySize": { + "type": "number" + }, + "flowInput.bufferSize": { + "type": "number" + } + }, + "storageInfo": { + "path": "local-hkube/main:streaming:9dy12jfh/main:streaming:9dy12jfh" + } + } +} + diff --git a/core/worker/test/streaming.js b/core/worker/test/streaming.js index cceafc417..f1fa6def2 100644 --- a/core/worker/test/streaming.js +++ b/core/worker/test/streaming.js @@ -6,83 +6,9 @@ const streamHandler = require('../lib/streaming/services/stream-handler'); const streamService = require('../lib/streaming/services/stream-service'); const discovery = require('../lib/streaming/services/service-discovery'); const SlaveAdapter = require('../lib/streaming/adapters/slave-adapter'); +const pipeline = require('./mocks/pipeline.json'); const SEC = 1000; -const pipeline = { - name: "stream", - kind: "stream", - nodes: [ - { - nodeName: "A", - algorithmName: "eval-alg", - input: [ - "@flowInput.arraySize", - "@flowInput.bufferSize" - ], - stateType: "stateful" - }, - { - nodeName: "B", - algorithmName: "eval-alg", - input: [ - "@flowInput.arraySize", - "@flowInput.bufferSize" - ], - stateType: "stateful" - }, - { - nodeName: "C", - algorithmName: "eval-alg", - input: [ - "@flowInput.arraySize", - "@flowInput.bufferSize" - ], - stateType: "stateful" - }, - { - nodeName: "D", - algorithmName: "eval-alg", - input: [], - stateType: "stateless" - }, - { - nodeName: "E", - algorithmName: "eval-alg", - input: [], - stateType: "stateless" - }, - { - nodeName: "F", - algorithmName: "eval-alg", - input: [], - stateType: "stateless", - maxStatelessCount: 3, - minStatelessCount: 1 - } - ], - edges: [ - { source: "A", target: "D" }, - { source: "B", target: "D" }, - { source: "C", target: "D" }, - { source: "A", target: "E" }, - { source: "B", target: "E" }, - { source: "C", target: "F" } - ], - flowInputMetadata: { - metadata: { - "flowInput.arraySize": { - "type": "number" - }, - "flowInput.bufferSize": { - "type": "number" - } - }, - storageInfo: { - "path": "local-hkube/main:streaming:9dy12jfh/main:streaming:9dy12jfh" - } - } -} - const streamingDiscovery = { host: process.env.POD_IP || '127.0.0.1', port: process.env.STREAMING_DISCOVERY_PORT || 9022 @@ -123,7 +49,7 @@ const createJob = (jobId) => { algorithmName: 'my-alg', pipelineName: 'my-pipe', parents: [], - childs: ['D','F'], + childs: ['D', 'E', 'F'], }; return job; }; @@ -143,526 +69,394 @@ const checkMetrics = () => { return streamService._metrics._checkMetrics() || []; } -const msgPerSec = 30; +/** + * Adjusts the `data` object by adding rate-related statistics from the `reqRateInfo` + * parameter, then reports the updated statistics via `streamService` or the optional `slave` parameter, + * and introduces a delay. The operation is repeated `repeatCount` times. + * + * Note - `reqRate` is calculated as Δ count / Δ time, meaning `(queueSize + sent) / delayTime`. + * When `delayTime` is 1000 (1 second), this gives the exact `reqRate`. + * + * @async + * @param {Object} data - The data object holding current statistics, to be updated. + * @param {Object} [reqRateInfo={}] - Rate-related information to be added to `data`. + * @param {number} [reqRateInfo.queueSize=0] - The number of items in the request queue to be added. + * @param {number} [reqRateInfo.sent=0] - The number of requests sent, to be added to `data.sent`. + * @param {number} [reqRateInfo.delayTime=0] - The delay time (in milliseconds) to wait after each report. + * @param {number} [repeatCount=1] - The number of times to repeat the scaling and reporting operation. + * @param {Object} [slave=undefined] - Optional object with a `report` method, used for reporting statistics. + * If provided, its `report` method is called with `data`. + * If not provided, `streamService.reportStats` is used. + * + * @returns {Promise} - Resolves after completing the specified number of repetitions. + */ +const scale = async (data, reqRateInfo = {}, repeatCount = 1, slave = undefined) => { + const { queueSize = 0, sent = 0, delayTime = 0 } = reqRateInfo; + for (let i = 0; i < repeatCount; i++) { + data.queueSize = (data.queueSize || 0) + queueSize; + data.sent = (data.sent || 0) + sent; + slave ? slave.report(data) : streamService.reportStats([data]); + if (delayTime > 0) { + await delay(delayTime); + } + } +}; + +const msgPerSec = 50; // Equals pod rate const duration = SEC / msgPerSec; -const netDurations = Array.from(Array(10).fill(duration)); +const durations = Array.from(Array(10).fill(duration)); describe('Streaming', () => { + before(async () => { await stateAdapter._db.jobs.create({ pipeline, jobId }); await streamHandler.start(job); }); + beforeEach(() => { const masters = getMasters(); masters.map(m => m.reset()); }) + describe('scale-up', () => { + const replicasOnFirstScale = require('../config/main/config.base.js').streaming.autoScaler.scaleUp.replicasOnFirstScale; + it('should not scale based on no data', async () => { - const scale = async (data) => { - streamService.reportStats(data); - } - const list = [{ + const data = { nodeName: 'D', - }]; - await scale(list); - const { required } = autoScale(list[0].nodeName); - expect(required).to.equal(0); + } + + await scale(data); + const { required } = autoScale(data.nodeName); + expect(required).to.equal(0, `required=${required}, suppose to be 0`); }); + it('should init scale when there is a queue', async () => { - const scale = async (data) => { - streamService.reportStats(data); - } - const list = [{ + const data = { nodeName: 'D', - queueSize: 1, - netDurations - }]; - await scale(list); - const { required } = autoScale(list[0].nodeName); - expect(required).to.gte(1); + queueSize: 1 + } + + await scale(data); + const { required } = autoScale(data.nodeName); + expect(required).to.equal(replicasOnFirstScale, `required=${required}, suppose to be ${replicasOnFirstScale}`); }); + it('should init scale when there is request rate', async () => { - const scale = async (data) => { - data[0].sent += 10; - streamService.reportStats(data); - await delay(100); + const data = { + nodeName: 'D', + sent: 10 } - const list = [{ + const reqRateInfo = { + sent: 1, + delayTime: 10 + } + + await scale(data, reqRateInfo, 2); + const { required } = autoScale(data.nodeName); + expect(required).to.equal(replicasOnFirstScale, `required=${required}, suppose to be ${replicasOnFirstScale}`); + }); + + it('should init scale to minimum requirement, when there is a queue', async () => { + const data = { + nodeName: 'E', + queueSize: 1 + } + + await scale(data); + const { required } = autoScale(data.nodeName); + const min = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].minStatelessCount; + expect(required).to.be.equal(min, `required=${required}, suppose to be ${min}`); + }); + + it('should init scale and not pass maximum requirement, when there is a queue', async () => { + const data = { + nodeName: 'F', + queueSize: 1 + } + + await scale(data); + const { required } = autoScale(data.nodeName); + const max = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].maxStatelessCount; + expect(required).to.be.equal(max, `required=${required}, suppose to be ${max}`); + }); + + it('should scale up based on roundTrip, queueSize, currentSize', async () => { + const data = { nodeName: 'D', - sent: 10, - queueSize: 0, - netDurations - }]; - await scale(list); - await scale(list); - const { required } = autoScale(list[0].nodeName); - expect(required).to.equal(1); + currentSize: 1, + durations + } + const reqRateInfo = { + queueSize: 20, + delayTime: 50 + } + + await scale(data, reqRateInfo, 4); + await delay(500); + const { required } = autoScale(data.nodeName); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); + }); + + it('should scale up based on all params', async () => { + const data = { + nodeName: 'D', + currentSize: 1, + durations + } + const reqRateInfo = { + queueSize: 15, + sent: 5, + delayTime: 50 + } + + await scale(data, reqRateInfo, 4); + await delay(500); + const { required } = autoScale(data.nodeName); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); + }); + + it('should scale up based on all params, when currentSize is 0 and there are responses already', async () => { + const data = { + nodeName: 'D', + currentSize: 0, + durations, + responses: 1 + } + const reqRateInfo = { + queueSize: 15, + sent: 5, + delayTime: 50 + } + + await scale(data, reqRateInfo, 4); + await delay(500); + const { required } = autoScale(data.nodeName); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); + }); + + it('should scale up based on all params, and there are responses already, and not exceeding max stateless', async () => { + const data = { + nodeName: 'F', + currentSize: 1, + durations, + responses: 1 + } + const reqRateInfo = { + queueSize: 3, + sent: 1, + delayTime: 10 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + const max = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].maxStatelessCount; + expect(required).to.equal(max, `required is ${required}, suppose to be ${max}`); + }); + + it('should scale up based on all params, and there are responses already, and have min stateless', async () => { + const data = { + nodeName: 'E', + currentSize: 1, + durations, + responses: 1 + } + const reqRateInfo = { + queueSize: 1, + sent: 1, + delayTime: 10 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + const min = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].minStatelessCount; + expect(required).to.equal(min, `required is ${required}, suppose to be ${min}`); }); - // it.only('should not scale if currentSize is fixed', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // streamService.reportStats(data); - // await delay(100); - // } - // const currentSize = async (data) => { - // data[0].currentSize = 5; - // data[0].queueSize += 500; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName: 'D', - // queueSize: 500, - // netDurations - // }]; - - // await scale(list); - // const jobs1 = autoScale(list[0].nodeName); - // const jobs2 = autoScale(list[0].nodeName); - // const jobs3 = autoScale(list[0].nodeName); - // await currentSize(list); - // const jobs4 = autoScale(list[0].nodeName); - // const jobs5 = autoScale(list[0].nodeName); - // expect(jobs1.required).to.gte(1); - // expect(jobs2.required).to.gte(1); - // expect(jobs3.required).to.gte(1); - // expect(jobs4.required).to.gte(30); - // expect(jobs5.required).to.gte(30); - // }); - // it('should scale based on queueSize only', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // streamService.reportStats(data); - // } - // const list = [{ - // nodeName: 'D', - // queueSize: 500, - // responses: 0, - // netDurations - // }]; - // await scale(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(1); - // }); - // it.only('should scale based on all params', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP (NOT GIVEN HERE) - // const queueSize = 0; - // const responses = 0; - // const currentSize = 0; - - // const scale = async (data) => { - // data[0].queueSize += 200; - // data[0].responses += 1; - // streamService.reportStats(data); - // await delay(50); - // } - // const list = [{ - // nodeName: 'D', - // queueSize, - // currentSize, - // netDurations, - // responses - // }]; - // await scale(list); - // await scale(list); - // await scale(list); - // await scale(list); - // const scale1 = autoScale(list[0].nodeName); - // const scale2 = autoScale(list[0].nodeName); - // const scale3 = autoScale(list[0].nodeName); - // expect(scale1.required).to.gte(20); - // expect(scale2.required).to.gte(20); - // expect(scale3.required).to.gte(20); - // }).timeout(1000000000000); - // it('should scale based on queueSize and responses only', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // streamService.reportStats(data); - // } - // const list = [{ - // nodeName: 'D', - // queueSize: 500, - // responses: 100, - // netDurations - // }]; - // await scale(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(2); - // }); - // it.only('should scale up based on high req/res rate', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].currentSize = 0; - // data[0].queueSize += 200; - // data[0].responses = 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // sent: 0, - // queueSize: 0, - // responses: 0, - // netDurations - // }]; - // await requests(list); - // await requests(list); - // await requests(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(30); - // }); - // it('should scale based on high durations', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // data[0].sent += 400; - // data[0].responses += 30; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName: 'D', - // sent: 0, - // responses: 0, - // netDurations - // }]; - // await scale(list); - // await scale(list); - // await scale(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(10); - // }); - // it('should scale based on low durations', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // data[0].sent += 100; - // data[0].responses += 0; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName: 'D', - // sent: 0, - // responses: 0, - // netDurations - // }]; - // await scale(list); - // await scale(list); - // await scale(list); - // const { required } = autoScale(list[0].nodeName); - // expect(required).to.gte(1); - // }); - // it('should scale only up based on req/res rate', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // data[0].sent += 10; - // data[0].responses += 3; - // streamService.reportStats(data); - // await delay(50); - // } - // const increaseSize = async (data) => { - // data[0].responses += 1; - // data[0].currentSize += 2; - // streamService.reportStats(data); - // await delay(50); - // } - // const list = [{ - // nodeName: 'D', - // sent: 10, - // queueSize: 0, - // currentSize: 0, - // netDurations, - // responses: 3 - // }]; - // await scale(list); - // await scale(list); - // const jobs1 = autoScale(list[0].nodeName); - // await increaseSize(list); - // const jobs2 = autoScale(list[0].nodeName); - // await increaseSize(list); - // autoScale(list[0].nodeName); - // const jobs3 = autoScale(list[0].nodeName); - // await scale(list); - // await scale(list); - // const jobs4 = autoScale(list[0].nodeName); - // const jobs5 = autoScale(list[0].nodeName); - // const jobs6 = autoScale(list[0].nodeName); - // expect(jobs1.required).to.gte(4); - // expect(jobs2.required).to.gte(4); - // expect(jobs3.required).to.gte(4); - // expect(jobs4.required).to.gte(4); - // expect(jobs5.required).to.gte(4); - // expect(jobs6.required).to.gte(4); - // }); - // it('should scale only up based on req/res rate with a maxStatelessCount limit', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const scale = async (data) => { - // data[0].sent += 10; - // data[0].responses += 3; - // streamService.reportStats(data); - // await delay(50); - // } - // const increaseSize = async (data) => { - // data[0].responses += 1; - // data[0].currentSize += 2; - // streamService.reportStats(data); - // await delay(50); - // } - // const list = [{ - // nodeName: 'F', - // sent: 10, - // queueSize: 0, - // currentSize: 0, - // netDurations, - // responses: 3 - // }]; - // const scaledNode = pipeline.nodes[5] - // await scale(list); - // await scale(list); - // const jobs1 = autoScale(list[0].nodeName); - // await increaseSize(list); - // const jobs2 = autoScale(list[0].nodeName); - // await increaseSize(list); - // autoScale(list[0].nodeName); - // const jobs3 = autoScale(list[0].nodeName); - // await scale(list); - // await scale(list); - // const jobs4 = autoScale(list[0].nodeName); - // const jobs5 = autoScale(list[0].nodeName); - // const jobs6 = autoScale(list[0].nodeName); - // expect(jobs1.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs2.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs3.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs4.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs5.required).to.eql(scaledNode.maxStatelessCount); - // expect(jobs6.required).to.eql(scaledNode.maxStatelessCount); - // }); }); describe('scale-down', () => { - // it('should scale up and down based on durations', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requestsUp = async (data) => { - // data[0].queueSize += 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const responsesUp = async (data) => { - // data[0].responses += 100; - // data[0].sent = 200; - // data[0].queueSize = 0; - // data[0].currentSize += 1; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // currentSize: 0, - // sent: 0, - // queueSize: 0, - // responses: 0, - // netDurations - // }]; - // await requestsUp(list); - // await requestsUp(list); - // const jobs1 = autoScale(list[0].nodeName); - // const jobs2 = autoScale(list[0].nodeName); - // await delay(200) - // await responsesUp(list); - // await responsesUp(list); - // const jobs3 = autoScale(list[0].nodeName); - // const jobs4 = autoScale(list[0].nodeName); - // expect(jobs1.required).to.gte(1); - // expect(jobs2.required).to.gte(1); - // expect(jobs3.required).to.gte(7); - // expect(jobs4.required).to.gte(7); - // }); - // it('should scale up and down based on no requests and no responses', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requestsUp = async (data) => { - // data[0].sent = 100; - // data[0].responses = 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // currentSize: 0, - // sent: 0, - // responses: 0, - // netDurations - // }]; - // await requestsUp(list); - // await requestsUp(list); - // await requestsUp(list); - // await requestsUp(list); - // const scale = autoScale(list[0].nodeName); - // expect(scale.required).to.eql(0); - // }); - // it('should scale down based on zero ratio', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].queueSize = 100; - // data[0].responses = 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // sent: 0, - // currentSize: 5, - // queueSize: 0, - // responses: 0 - // }]; - // await requests(list); - // await requests(list); - // await requests(list); - // await requests(list); - // const scale = autoScale(list[0].nodeName); - // expect(scale.required).to.eql(0); - // }); - // it('should not scale down based on responses', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].currentSize = 5; - // data[0].responses += 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // responses: 0 - // }]; - // await requests(list); - // await requests(list); - // await requests(list); - // const scale = autoScale(list[0].nodeName); - // expect(scale.required).to.eql(0); - // }); - // it('should not scale down based on currentSize', async () => { // COMMENT SINCE SCALING LOGIC CHANGED, NOW BASED ON ROUND TRIP - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].currentSize = 1; - // data[0].queueSize = 0; - // data[0].responses += 100; - // streamService.reportStats(data); - // await delay(100); - // } - // const list = [{ - // nodeName, - // sent: 0, - // queueSize: 0, - // responses: 0 - // }]; - // await requests(list); - // await requests(list); - // await requests(list); - // const scale = autoScale(list[0].nodeName); - // expect(scale.required).to.eql(0); - // }); + it('should scale down based on roundTrip, queueSize, currentSize', async () => { + const data = { + nodeName: 'D', + currentSize: 100, + durations + } + const reqRateInfo = { + queueSize: 20, + delayTime: 50 + } + + await scale(data, reqRateInfo, 4); + await delay(500); + const { required } = autoScale(data.nodeName); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); + }); + + it('should scale down based on all params', async () => { + const data = { + nodeName: 'D', + currentSize: 100, + durations + } + const reqRateInfo = { + queueSize: 15, + sent: 5, + delayTime: 50 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + expect(required).to.be.gte(8, `required is ${required}, suppose to be 8-9`); + expect(required).to.be.lte(9, `required is ${required}, suppose to be 8-9`); + }); + + it('should scale up based on all params, and there are responses already, and have min stateless', async () => { + const data = { + nodeName: 'E', + currentSize: 20, + durations, + responses: 1 + } + const reqRateInfo = { + queueSize: 1, + sent: 1, + delayTime: 10 + } + + await scale(data, reqRateInfo, 4); + const { required } = autoScale(data.nodeName); + const min = pipeline.nodes.filter((node) => data.nodeName === node.nodeName)[0].minStatelessCount; + expect(required).to.equal(min, `required is ${required}, suppose to be ${min}`); + }); }); + describe('scale-conflicts', () => { - // it('should only scale up based on master', async () => { - // const nodeName = 'D'; - // const requests = async (data) => { - // data[0].queueSize += 100; - // data[0].responses += 50; - // streamService.reportStats(data); - // await delay(50); - // } - // const reportSlave = async (slave, data) => { - // data.queueSize += 100; - // data.responses += 50; - // slave.report(data); - // await delay(50); - // } - // const currentSize = 0; - // const list1 = [{ nodeName, queueSize: 150, responses: 30, netDurations, currentSize }]; - // const list2 = { nodeName, queueSize: 450, responses: 150, netDurations, currentSize }; - // const slave = new SlaveAdapter({ jobId, nodeName, source: 'B' }); - // await requests(list1); - // await requests(list1); - // await requests(list1); - // await requests(list1); - // await reportSlave(slave, list2); - // await reportSlave(slave, list2); - // await reportSlave(slave, list2); - // await reportSlave(slave, list2); - // const scale = autoScale(nodeName); - // expect(scale.required).to.gte(30); - // }); - // it('should not scale up based on avg master and slaves', async () => { - // const nodeName = 'D'; - // const reportSlave = async (slave, data) => { - // data.queueSize += 100; - // data.responses += 50; - // slave.report(data); - // await delay(50) - // } - // const currentSize = 0; - // const list1 = { nodeName, queueSize: 300, responses: 40, netDurations, currentSize }; - // const list2 = { nodeName, queueSize: 300, responses: 60, netDurations, currentSize }; - // const list3 = { nodeName, queueSize: 300, responses: 80, netDurations, currentSize }; - // const list4 = { nodeName, queueSize: 300, responses: 100, netDurations, currentSize }; - // const slave1 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); - // const slave2 = new SlaveAdapter({ jobId, nodeName, source: 'B' }); - // const slave3 = new SlaveAdapter({ jobId, nodeName, source: 'C' }); - // const slave4 = new SlaveAdapter({ jobId, nodeName, source: 'D' }); - // await reportSlave(slave1, list1); - // await reportSlave(slave1, list1); - // await reportSlave(slave1, list1); - // await reportSlave(slave1, list1); - - // await reportSlave(slave2, list2); - // await reportSlave(slave2, list2); - // await reportSlave(slave2, list2); - // await reportSlave(slave2, list2); - - // slave3.report(list3); - // slave3.report(list3); - // slave3.report(list3); - // slave3.report(list3); - - // slave4.report(list4); - // slave4.report(list4); - // slave4.report(list4); - // slave4.report(list4); - // await delay(200); - // const scale = autoScale(nodeName); - // expect(scale.required).to.gte(30); - // }); + it('should only scale up based on master', async () => { + const nodeName = 'D'; + const currentSize = 0; + const slave = new SlaveAdapter({ jobId, nodeName, source: 'B' }); + const data1 = { + nodeName, + queueSize: 150, + responses: 30, + sent: 30, + durations, + currentSize + } + const data2 = { + nodeName, + queueSize: 4500, + responses: 150, + sent: 150, + durations, + currentSize + } + const reqRateInfo = { + queueSize: 20, + sent: 10, + delayTime: 50 + } + + await scale(data1, reqRateInfo, 4); + await scale(data2, reqRateInfo, 4, slave); + await delay(1000); + const { required } = autoScale(nodeName); + expect(required).to.be.equal(28, `required is ${required}, suppose to be 28`); + }); + + it('should not scale up based on avg master and slaves', async () => { + const nodeName = 'D'; + const currentSize = 0; + const data1 = { + nodeName, + queueSize: 300, + responses: 40, + sent: 40, + durations, + currentSize + } + const data2 = { + nodeName, + queueSize: 300, + responses: 60, + sent: 60, + durations, + currentSize + } + const data3 = { + nodeName, + queueSize: 300, + responses: 80, + sent: 80, + durations, + currentSize + } + const data4 = { + nodeName, + queueSize: 300, + responses: 100, + sent: 100, + durations, + currentSize + } + const reqRateInfo = { + queueSize: 20, + sent: 10, + delayTime: 50 + } + + const slave1 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); + const slave2 = new SlaveAdapter({ jobId, nodeName, source: 'B' }); + const slave3 = new SlaveAdapter({ jobId, nodeName, source: 'C' }); + const slave4 = new SlaveAdapter({ jobId, nodeName, source: 'D' }); + await scale(data1, reqRateInfo, 4, slave1); + await scale(data2, reqRateInfo, 4, slave2); + await scale(data3, reqRateInfo, 4, slave3); + await scale(data4, reqRateInfo, 4, slave4); + await delay(1000); + const { required } = autoScale(nodeName); + expect(required).to.be.gte(49, `required is ${required}, suppose to be 49-50`); + expect(required).to.be.lte(50, `required is ${required}, suppose to be 49-50`); + }); }); + describe('no-scale', () => { it('should not scale when no relevant data', async () => { - const reportStats = async (data) => { - streamService.reportStats(data); - await delay(100); - } - const list = [{ + const data = { nodeName: 'D' - }]; - await reportStats(list); - await reportStats(list); - await reportStats(list); - const scale = autoScale(list[0].nodeName); - expect(scale.required).to.eql(0); + }; + + await scale(data, { delayTime: 100 }, 3); + const { required } = autoScale(data.nodeName); + expect(required).to.eql(0); }); + it('should not over the maxSizeWindow', async () => { const nodeName = 'D'; - const data = [{ + const data = { nodeName, queueSize: 10, - netDurations - }]; - streamService.reportStats(data); - streamService.reportStats(data); - streamService.reportStats(data); - streamService.reportStats(data); - streamService.reportStats(data); - streamService.reportStats(data); + durations + }; + + await scale(data, {}, 14); let masters = getMasters(); masters = masters.filter(m => m.nodeName === nodeName); const statsData = masters[0]._autoScaler._statistics._data; const key = Object.keys(statsData)[0]; const stats = statsData[key]; - const { requests, responses, durations } = stats; + const { requests, responses, grossDurations } = stats; const maxSizeWindow = testParams.config.streaming.autoScaler.statistics.maxSizeWindow; expect(requests.items).to.have.lengthOf(maxSizeWindow); expect(responses.items).to.have.lengthOf(maxSizeWindow); - expect(durations.items).to.have.lengthOf(maxSizeWindow * 10); + expect(grossDurations.items).to.have.lengthOf(maxSizeWindow * 10); }); }); + describe('metrics', () => { it('should scale and update metrics', async () => { const nodeName = 'D'; @@ -717,6 +511,7 @@ describe('Streaming', () => { expect(metricsUid3.totalDropped).to.eql(dropped * 3); }); }); + describe('master-slaves', () => { it('should get slaves', async () => { const nodeName = 'D'; @@ -738,6 +533,7 @@ describe('Streaming', () => { const slaves = masters[0].slaves(); expect(slaves.sort()).to.deep.equal([slave1.source, slave2.source]) }); + it('metrics test', async () => { const nodeName = 'D'; const requests = async (data) => { @@ -753,36 +549,21 @@ describe('Streaming', () => { await delay(20); } const currentSize = 0; - const list = [{ nodeName, queueSize: 150, responses: 30, netDurations, currentSize }]; - const list1 = { nodeName, queueSize: 300, responses: 80, netDurations, currentSize }; - const list2 = { nodeName, queueSize: 450, responses: 140, netDurations, currentSize }; + const list = [{ nodeName, queueSize: 150, responses: 30, currentSize }]; + const list1 = { nodeName, queueSize: 300, responses: 80, currentSize }; + const list2 = { nodeName, queueSize: 450, responses: 140, currentSize }; const slave1 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); const slave2 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); const slave3 = new SlaveAdapter({ jobId, nodeName, source: 'A' }); const slave4 = new SlaveAdapter({ jobId, nodeName, source: 'B' }); - await requests(list); - await requests(list); - await requests(list); - await requests(list); - await reportSlave(slave1, list1); - await reportSlave(slave1, list1); - await reportSlave(slave1, list1); - await reportSlave(slave1, list1); - - await reportSlave(slave2, list1); - await reportSlave(slave2, list1); - await reportSlave(slave2, list1); - await reportSlave(slave2, list1); - - await reportSlave(slave3, list1); - await reportSlave(slave3, list1); - await reportSlave(slave3, list1); - await reportSlave(slave3, list1); - - await reportSlave(slave4, list2); - await reportSlave(slave4, list2); - await reportSlave(slave4, list2); - await reportSlave(slave4, list2); + for ( i = 0; i < 4; i++) { await requests(list); } + for ( i = 0; i < 4; i++) { await reportSlave(slave1, list1); } + + for ( i = 0; i < 4; i++) { await reportSlave(slave2, list1); } + + for ( i = 0; i < 4; i++) { await reportSlave(slave3, list1); } + + for ( i = 0; i < 4; i++) { await reportSlave(slave4, list2); } await delay(200); autoScale(nodeName); @@ -792,6 +573,7 @@ describe('Streaming', () => { expect(metrics.map(t => t.source).sort()).to.eql(['A', 'B', 'C']); expect(metrics).to.have.lengthOf(3); }); + it('should start and finish correctly', async () => { expect(streamService._jobData).to.be.not.null; expect(streamService._election).to.be.not.null; @@ -808,10 +590,12 @@ describe('Streaming', () => { await streamService.start(job); }); }); + describe('discovery', () => { beforeEach(() => { discovery._discoveryMap = Object.create(null); }); + it('should add discovery and get right changes', async () => { const jobId = uid(); const nodeName = uid(); @@ -823,6 +607,7 @@ describe('Streaming', () => { expect(changes2[0].type).to.eql('Add'); expect(changes2[0].nodeName).to.eql(nodeName); }); + it('should add discovery multiple times and get right changes', async () => { const jobId = uid(); const nodeName1 = uid(); @@ -844,6 +629,7 @@ describe('Streaming', () => { expect(changes2[0].nodeName).to.eql(nodeName2); expect(changes2[1].nodeName).to.eql(nodeName1); }); + it('should add and delete discovery and get right changes', async () => { const jobId = uid(); const nodeName = uid(); @@ -859,6 +645,7 @@ describe('Streaming', () => { expect(changes2[0].type).to.eql('Del'); expect(changes2[1].type).to.eql('Del'); }); + it('should add discovery and get right changes', async () => { const jobId = uid(); const nodeName1 = uid();