From 76836e18b0972d3d956b7fbca77203484c8920a7 Mon Sep 17 00:00:00 2001 From: VolcanoCookies Date: Tue, 23 May 2023 13:53:19 +0200 Subject: [PATCH] feat: Add new throttle corruption --- README.md | 27 ++- package-lock.json | 66 +++++++ package.json | 2 + src/manifests/handlers/dash/segment.ts | 7 +- src/manifests/handlers/hls/media.ts | 7 +- .../utils/corruptions/throttle.test.ts | 177 ++++++++++++++++++ src/manifests/utils/corruptions/throttle.ts | 142 ++++++++++++++ src/manifests/utils/dashManifestUtils.ts | 2 +- src/routes.ts | 2 + src/segments/constants.ts | 1 + src/segments/handlers/segment.ts | 25 ++- src/segments/routes/throttlingProxy.ts | 33 ++++ src/shared/aws.utils.ts | 1 + 13 files changed, 484 insertions(+), 8 deletions(-) create mode 100644 src/manifests/utils/corruptions/throttle.test.ts create mode 100644 src/manifests/utils/corruptions/throttle.ts create mode 100644 src/segments/routes/throttlingProxy.ts diff --git a/README.md b/README.md index f87ffe0..3b4546c 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ To try it out, go to your favourite HLS/MPEG-DASH video player such as `https:// | `/api/v2/manifests/dash/proxy-master.mpd` | GET | Returns a proxy MPD file, based on query parameters | | `/api/v2/manifests/dash/proxy-segment` | GET | Applies corruption present in query parameter and may return a 302 redirect to the original segment file | | `/api/v2/segments/proxy-segment` | GET | Applies corruption present in query parameter and may return a 302 redirect to the original segment file | +| `/api/v2/throttle` | GET | Proxies a http request, throttling the response to a specified byte rate | | `/` | GET | Server health check | ### Query Parameters @@ -64,6 +65,7 @@ To try it out, go to your favourite HLS/MPEG-DASH video player such as `https:// | `delay` | Delay the response, in milliseconds, for a specific segment request | | `statusCode` | Replace the response for a specific segment request with a specified status code response | | `timeout` | Force a timeout for the response of a specific segment request | +| `throttle` | Send back the segment at a specified speed of bytes per second | ### Load Manifest url params from AWS SSM parameter store instead - Create a .env file at the root the of project @@ -78,7 +80,7 @@ LOAD_PARAMS_FROM_AWS_SSM=true ## Corruptions -Currently, the Chaos Stream Proxy supports 3 types of corruptions for HLS and MPEG-DASH streams. These corruptions may be used in combination with one another. +Currently, the Chaos Stream Proxy supports 4 types of corruptions for HLS and MPEG-DASH streams. These corruptions may be used in combination with one another. ### Specifying Corruption Configurations @@ -100,7 +102,7 @@ Delay Corruption: { i?: number | "*", // index of target segment in playlist. If "*", then target all segments. (Starts on 0 for HLS / 1 for MPEG-DASH) sq?: number | "*", // media sequence number of target segment in playlist. If "*", then target all segments - rsq?: number, // relative sequence number from where a livestream is currently at + rsq?: number, // relative sequence number from where a livestream is currently at ms?: number, // time to delay in milliseconds br?: number | "*", // apply only to specific bitrate } @@ -112,7 +114,7 @@ Status Code Corruption: { i?: number | "*", // index of target segment in playlist. If "*", then target all segments. (Starts on 0 for HLS / 1 for MPEG-DASH) sq?: number | "*", // media sequence number of target segment in playlist. If "*", then target all segments - rsq?: number, // relative sequence number from where a livestream is currently at + rsq?: number, // relative sequence number from where a livestream is currently at code?: number, // code to return in http response status header instead of media file br?: number | "*", // apply only to specific bitrate } @@ -124,11 +126,23 @@ Timeout Corruption: { i?: number | "*", // index of target segment in playlist. If "*", then target all segments. (Starts on 0 for HLS / 1 for MPEG-DASH) sq?: number | "*", // media sequence number of target segment in playlist. If "*", then target all segments - rsq?: number, // relative sequence number from where a livestream is currently at + rsq?: number, // relative sequence number from where a livestream is currently at br?: number | "*", // apply only to specific bitrate } ``` +Throttle Corruption: +```typescript +{ + i?: number | "*", // index of target segment in playlist. If "*", then target all segments. (Starts on 0 for HLS / 1 for MPEG-DASH) + sq?: number | "*", // media sequence number of target segment in playlist. If "*", then target all segments + rsq?: number, // relative sequence number from where a livestream is currently at + br?: number | "*", // apply only to specific bitrate + rate?: number // rate in bytes per second to limit the segment download speed to +} +``` + + One can either target a segment through the index parameter, `i`, or the sequence number parameter, `sq`, relative sequence numbers, `rsq`, are translated to sequence numbers, . In the case where one has entered both, the **index parameter** will take precedence. Relative sequence numbers, `rsq`, are translated to sequence numbers, `sq`, and will thus override any provided `sq`. @@ -212,6 +226,11 @@ https://chaos-proxy.prod.eyevinn.technology/api/v2/manifests/dash/proxy-master.m https://chaos-proxy.prod.eyevinn.technology/api/v2/manifests/dash/proxy-master.mpd?url=https://livesim.dashif.org/livesim/testpic_2s/Manifest.mpd&statusCode=[{sq:841164350, code:404}] ``` +6. LIVE: Example of MPEG-DASH with a segment download speed limited to 10kB/s on all segments +``` +https://chaos-proxy.prod.eyevinn.technology/api/v2/manifests/dash/proxy-master.mpd?url=https://f53accc45b7aded64ed8085068f31881.egress.mediapackage-vod.eu-north-1.amazonaws.com/out/v1/1c63bf88e2664639a6c293b4d055e6bb/64651f16da554640930b7ce2cd9f758b/66d211307b7d43d3bd515a3bfb654e1c/manifest.mpd&throttle=[{i:*,rate:10000}] +``` + ## Development Environment To deploy and update development environment create and push a tag with the suffix `-dev`, for example `my-feat-test-dev`. If you run `npm run deploy:dev` it will automatically create a tag based on git revision with the `-dev` suffix and push it. diff --git a/package-lock.json b/package-lock.json index 03b6dba..2b2faf8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "nock": "^13.2.4", "node-cache": "^5.1.2", "node-fetch": "^2.5.7", + "stream-throttle": "^0.1.3", "xml2js": "^0.4.19" }, "devDependencies": { @@ -34,6 +35,7 @@ "@types/jest": "^27.4.0", "@types/node": "^17.0.18", "@types/node-fetch": "^2.5.7", + "@types/stream-throttle": "^0.1.1", "@types/xml2js": "^0.4.11", "@typescript-eslint/eslint-plugin": "^5.13.0", "@typescript-eslint/parser": "^5.13.0", @@ -3355,6 +3357,15 @@ "resolved": "https://registry.npmjs.org/@types/stack-utils/-/stack-utils-2.0.1.tgz", "integrity": "sha512-Hl219/BT5fLAaz6NDkSuhzasy49dwQS/DSdu4MdggFB8zcXv7vflBI3xp7FEmkmdDkBUI2bPUNeMttp2knYdxw==" }, + "node_modules/@types/stream-throttle": { + "version": "0.1.1", + "resolved": "https://registry.npmjs.org/@types/stream-throttle/-/stream-throttle-0.1.1.tgz", + "integrity": "sha512-blC1VrTJBPET4IDEpRO05L0ks7cqyyc4XBRnQVroOLkcE8iaG6MYEQ/RSU7CQ3uIGk4r7wEH0W3ifbQqCVJVAA==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/xml2js": { "version": "0.4.11", "resolved": "https://registry.npmjs.org/@types/xml2js/-/xml2js-0.4.11.tgz", @@ -9806,6 +9817,11 @@ "node": ">=10" } }, + "node_modules/limiter": { + "version": "1.1.5", + "resolved": "https://registry.npmjs.org/limiter/-/limiter-1.1.5.tgz", + "integrity": "sha512-FWWMIEOxz3GwUI4Ts/IvgVy6LPvoMPgjMdQ185nN6psJyBJ4yOpzqm695/h5umdLJg2vW3GR5iG11MAkR2AzJA==" + }, "node_modules/lines-and-columns": { "version": "1.2.4", "resolved": "https://registry.npmjs.org/lines-and-columns/-/lines-and-columns-1.2.4.tgz", @@ -11768,6 +11784,26 @@ "duplexer": "~0.1.1" } }, + "node_modules/stream-throttle": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/stream-throttle/-/stream-throttle-0.1.3.tgz", + "integrity": "sha512-889+B9vN9dq7/vLbGyuHeZ6/ctf5sNuGWsDy89uNxkFTAgzy0eK7+w5fL3KLNRTkLle7EgZGvHUphZW0Q26MnQ==", + "dependencies": { + "commander": "^2.2.0", + "limiter": "^1.0.5" + }, + "bin": { + "throttleproxy": "bin/throttleproxy.js" + }, + "engines": { + "node": ">= 0.10.0" + } + }, + "node_modules/stream-throttle/node_modules/commander": { + "version": "2.20.3", + "resolved": "https://registry.npmjs.org/commander/-/commander-2.20.3.tgz", + "integrity": "sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ==" + }, "node_modules/string_decoder": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", @@ -15609,6 +15645,15 @@ "resolved": "https://registry.npmjs.org/@types/stack-utils/-/stack-utils-2.0.1.tgz", "integrity": "sha512-Hl219/BT5fLAaz6NDkSuhzasy49dwQS/DSdu4MdggFB8zcXv7vflBI3xp7FEmkmdDkBUI2bPUNeMttp2knYdxw==" }, + "@types/stream-throttle": { + "version": "0.1.1", + "resolved": "https://registry.npmjs.org/@types/stream-throttle/-/stream-throttle-0.1.1.tgz", + "integrity": "sha512-blC1VrTJBPET4IDEpRO05L0ks7cqyyc4XBRnQVroOLkcE8iaG6MYEQ/RSU7CQ3uIGk4r7wEH0W3ifbQqCVJVAA==", + "dev": true, + "requires": { + "@types/node": "*" + } + }, "@types/xml2js": { "version": "0.4.11", "resolved": "https://registry.npmjs.org/@types/xml2js/-/xml2js-0.4.11.tgz", @@ -20400,6 +20445,11 @@ "integrity": "sha512-bfTIN7lEsiooCocSISTWXkiWJkRqtL9wYtYy+8EK3Y41qh3mpwPU0ycTOgjdY9ErwXCc8QyrQp82bdL0Xkm9yA==", "dev": true }, + "limiter": { + "version": "1.1.5", + "resolved": "https://registry.npmjs.org/limiter/-/limiter-1.1.5.tgz", + "integrity": "sha512-FWWMIEOxz3GwUI4Ts/IvgVy6LPvoMPgjMdQ185nN6psJyBJ4yOpzqm695/h5umdLJg2vW3GR5iG11MAkR2AzJA==" + }, "lines-and-columns": { "version": "1.2.4", "resolved": "https://registry.npmjs.org/lines-and-columns/-/lines-and-columns-1.2.4.tgz", @@ -21858,6 +21908,22 @@ "duplexer": "~0.1.1" } }, + "stream-throttle": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/stream-throttle/-/stream-throttle-0.1.3.tgz", + "integrity": "sha512-889+B9vN9dq7/vLbGyuHeZ6/ctf5sNuGWsDy89uNxkFTAgzy0eK7+w5fL3KLNRTkLle7EgZGvHUphZW0Q26MnQ==", + "requires": { + "commander": "^2.2.0", + "limiter": "^1.0.5" + }, + "dependencies": { + "commander": { + "version": "2.20.3", + "resolved": "https://registry.npmjs.org/commander/-/commander-2.20.3.tgz", + "integrity": "sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ==" + } + } + }, "string_decoder": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", diff --git a/package.json b/package.json index 3ef6742..219fb64 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "nock": "^13.2.4", "node-cache": "^5.1.2", "node-fetch": "^2.5.7", + "stream-throttle": "^0.1.3", "xml2js": "^0.4.19" }, "devDependencies": { @@ -41,6 +42,7 @@ "@types/jest": "^27.4.0", "@types/node": "^17.0.18", "@types/node-fetch": "^2.5.7", + "@types/stream-throttle": "^0.1.1", "@types/xml2js": "^0.4.11", "@typescript-eslint/eslint-plugin": "^5.13.0", "@typescript-eslint/parser": "^5.13.0", diff --git a/src/manifests/handlers/dash/segment.ts b/src/manifests/handlers/dash/segment.ts index 1a697d7..3b56d94 100644 --- a/src/manifests/handlers/dash/segment.ts +++ b/src/manifests/handlers/dash/segment.ts @@ -9,6 +9,7 @@ import { import delaySCC from '../../utils/corruptions/delay'; import statusCodeSCC from '../../utils/corruptions/statusCode'; import timeoutSCC from '../../utils/corruptions/timeout'; +import throttleSCC from '../../utils/corruptions/throttle'; import path from 'path'; import dashManifestUtils from '../../utils/dashManifestUtils'; import { corruptorConfigUtils } from '../../utils/configs'; @@ -54,7 +55,11 @@ export default async function dashSegmentHandler( // Break down Corruption Objects // Send source URL with a corruption json (if it is appropriate) to segmentHandler... const configUtils = corruptorConfigUtils(urlSearchParams); - configUtils.register(delaySCC).register(statusCodeSCC).register(timeoutSCC); + configUtils + .register(delaySCC) + .register(statusCodeSCC) + .register(timeoutSCC) + .register(throttleSCC); const [error, allMutations] = configUtils.getAllManifestConfigs( reqSegmentIndexInt, true diff --git a/src/manifests/handlers/hls/media.ts b/src/manifests/handlers/hls/media.ts index 4d825cc..43d4bb8 100644 --- a/src/manifests/handlers/hls/media.ts +++ b/src/manifests/handlers/hls/media.ts @@ -9,6 +9,7 @@ import { import delaySCC from '../../utils/corruptions/delay'; import statusCodeSCC from '../../utils/corruptions/statusCode'; import timeoutSCC from '../../utils/corruptions/timeout'; +import throttleSCC from '../../utils/corruptions/throttle'; import path from 'path'; import hlsManifestUtils from '../../utils/hlsManifestUtils'; import { corruptorConfigUtils } from '../../utils/configs'; @@ -46,7 +47,11 @@ export default async function hlsMediaHandler( const manifestUtils = hlsManifestUtils(); const configUtils = corruptorConfigUtils(reqQueryParams); - configUtils.register(delaySCC).register(statusCodeSCC).register(timeoutSCC); + configUtils + .register(delaySCC) + .register(statusCodeSCC) + .register(timeoutSCC) + .register(throttleSCC); const [error, allMutations] = configUtils.getAllManifestConfigs( mediaM3U.get('mediaSequence') diff --git a/src/manifests/utils/corruptions/throttle.test.ts b/src/manifests/utils/corruptions/throttle.test.ts new file mode 100644 index 0000000..2893409 --- /dev/null +++ b/src/manifests/utils/corruptions/throttle.test.ts @@ -0,0 +1,177 @@ +import statusCodeConfig from './statusCode'; +import throttleConfig from './throttle'; + +describe('manifest.utils.corruptions.throttle', () => { + describe('getManifestConfigs', () => { + const { getManifestConfigs, name } = throttleConfig; + it('should have correct name', () => { + // Assert + expect(name).toEqual('throttle'); + }); + it('should handle valid input', () => { + // Arrange + const throttleValue = [ + { i: 0, rate: 1000 }, + { i: 0, rate: 5000 }, + { sq: 0, rate: 15000 } + ]; + + // Act + const actual = getManifestConfigs(throttleValue); + const expected = [ + null, + [ + { i: 0, fields: { rate: 1000 } }, + { sq: 0, fields: { rate: 15000 } } + ] + ]; + + // Assert + expect(actual).toEqual(expected); + }); + + it('should handle all * indexes', () => { + // Arrange + const throttleValue: Record[] = [ + { i: '*', rate: 1500 }, + { i: 0 } + ]; + + // Act + const actual = getManifestConfigs(throttleValue); + const expected = [ + null, + [ + { i: '*', fields: { rate: 1500 } }, + { i: 0, fields: null } + ] + ]; + + // Assert + expect(actual).toEqual(expected); + }); + + it('should handle all * sequences', () => { + // Arrange + const throttleValue: Record[] = [ + { sq: '*', rate: 1500 }, + { sq: 0 } + ]; + + // Act + const actual = getManifestConfigs(throttleValue); + const expected = [ + null, + [ + { sq: '*', fields: { rate: 1500 } }, + { sq: 0, fields: null } + ] + ]; + + // Assert + expect(actual).toEqual(expected); + }); + + it('should handle no index and no sequence correct', () => { + // Arrange + const throttleValue = [{ rate: 1500 }]; + + // Act + const actual = getManifestConfigs(throttleValue); + const expected = [ + { + message: + "Incorrect throttle query format. Either 'i' or 'sq' is required in a single query object.", + status: 400 + }, + null + ]; + + // Assert + expect(actual).toEqual(expected); + }); + + it('should handle both index and sequence in the query object', () => { + // Arrange + const throttleValue = [{ rate: 1500, i: 0, sq: 2 }]; + + // Act + const actual = getManifestConfigs(throttleValue); + const expected = [ + { + message: + "Incorrect throttle query format. 'i' and 'sq' are mutually exclusive in a single query object.", + status: 400 + }, + null + ]; + + // Assert + expect(actual).toEqual(expected); + }); + + it('should handle illegal characters in query object', () => { + // Arrange + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const throttleValue = [{ rate: 'hehe', i: false, sq: { he: 'he' } }] as any; + + // Act + const actual = getManifestConfigs(throttleValue); + const expected = [ + { + message: + 'Incorrect throttle query format. Expected format: [{i?:number, sq?:number, br?:number, rate:number}, ...n] where i and sq are mutually exclusive.', + status: 400 + }, + null + ]; + + // Assert + expect(actual).toEqual(expected); + }); + + it('should handle invalid format', () => { + // Arrange + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const throttleValue = 'Faulty' as any; + + // Act + const actual = getManifestConfigs(throttleValue); + const expected = [ + { + message: + 'Incorrect throttle query format. Expected format: [{i?:number, sq?:number, br?:number, rate:number}, ...n] where i and sq are mutually exclusive.', + status: 400 + }, + null + ]; + + // Assert + expect(actual).toEqual(expected); + }); + + it('should handle multiple defaults *', () => { + // Arrange + const throttleValue: Record[] = [ + { rate: 1000, i: '*' }, + { rate: 5000, sq: '*' }, + { rate: 10000, i: '*' } + ]; + // Act + const actual = getManifestConfigs(throttleValue); + const expected = [ + null, + [ + { + fields: { + rate: 1000 + }, + i: '*' + } + ] + ]; + // Assert + expect(actual).toEqual(expected); + }); + }); +}); diff --git a/src/manifests/utils/corruptions/throttle.ts b/src/manifests/utils/corruptions/throttle.ts new file mode 100644 index 0000000..cbca4f5 --- /dev/null +++ b/src/manifests/utils/corruptions/throttle.ts @@ -0,0 +1,142 @@ +import { unparsableError } from '../../../shared/utils'; +import { ServiceError, TargetIndex } from '../../../shared/types'; +import { CorruptorConfig, SegmentCorruptorQueryConfig } from '../configs'; + +interface ThrottleConfig extends CorruptorConfig { + i?: TargetIndex; + sq?: TargetIndex; + br?: TargetIndex; + rate?: number; +} + +// TODO:Flytta till en i en constants fil, och gruppera med and +const throttleExpectedQueryFormatMsg = + 'Incorrect throttle query format. Expected format: [{i?:number, sq?:number, br?:number, rate:number}, ...n] where i and sq are mutually exclusive.'; + +function getManifestConfigError(value: { [key: string]: unknown }): string { + const o = value as ThrottleConfig; + + if (o.rate && typeof o.rate !== 'number') { + return throttleExpectedQueryFormatMsg; + } + + if (o.i === undefined && o.sq === undefined) { + return "Incorrect throttle query format. Either 'i' or 'sq' is required in a single query object."; + } + + if ( + !(o.i === '*' || typeof o.i === 'number') && + !(o.sq === '*' || typeof o.sq === 'number') + ) { + return throttleExpectedQueryFormatMsg; + } + + if (o.i !== undefined && o.sq !== undefined) { + return "Incorrect throttle query format. 'i' and 'sq' are mutually exclusive in a single query object."; + } + + if (Number(o.sq) < 0) { + return 'Incorrect throttle query format. Field sq must be 0 or positive.'; + } + + if (Number(o.i) < 0) { + return 'Incorrect throttle query format. Field i must be 0 or positive.'; + } + + return ''; +} +function isValidSegmentConfig(value: { [key: string]: unknown }): boolean { + if (value.rate && typeof value.rate !== 'number') { + return false; + } + return true; +} + +const throttleConfig: SegmentCorruptorQueryConfig = { + getManifestConfigs( + configs: Record[] + ): [ServiceError | null, CorruptorConfig[] | null] { + // Verify it's at least an array + if (!Array.isArray(configs)) { + return [ + { + message: throttleExpectedQueryFormatMsg, + status: 400 + }, + null + ]; + } + + const configIndexMap = new Map(); + const configSqMap = new Map(); + + for (const config of configs) { + // Verify integrity of array content + const error = getManifestConfigError(config); + if (error) { + return [{ message: error, status: 400 }, null]; + } + + const { rate, i, sq } = config; + const fields = rate ? { rate } : null; + + // If * is already set, we skip + if (!configIndexMap.has('*') && !configSqMap.has('*')) { + // Index + if (i === '*') { + configIndexMap.set('*', { fields, i }); + } + // Sequence + else if (sq === '*') { + configSqMap.set('*', { fields, sq }); + } + } + + // Index numeric + if (typeof i === 'number' && !configIndexMap.has(i)) { + configIndexMap.set(i, { fields, i }); + } + + // Sequence numeric + if (typeof sq === 'number' && !configSqMap.has(sq)) { + configSqMap.set(sq, { fields, sq }); + } + } + + const corruptorConfigs = [ + ...configIndexMap.values(), + ...configSqMap.values() + ]; + + return [null, corruptorConfigs]; + }, + getSegmentConfigs( + throttleConfigString: string + ): [ServiceError | null, CorruptorConfig | null] { + const config = JSON.parse(throttleConfigString); + if (!isValidSegmentConfig(config)) { + return [ + unparsableError( + 'throttle', + throttleConfigString, + '{i?:number, sq?:number, rate:number}' + ), + null + ]; + } + + return [ + null, + { + i: config.i, + sq: config.sq, + fields: { + rate: config.rate + } + } + ]; + }, + name: 'throttle' +}; + +export default throttleConfig; diff --git a/src/manifests/utils/dashManifestUtils.ts b/src/manifests/utils/dashManifestUtils.ts index 706be9c..b427bb3 100644 --- a/src/manifests/utils/dashManifestUtils.ts +++ b/src/manifests/utils/dashManifestUtils.ts @@ -209,7 +209,7 @@ function convertRelativeToAbsoluteSegmentOffsets( const urlQuery = new URLSearchParams(originalUrlQuery); - const corruptions = ['statusCode', 'delay', 'timeout']; + const corruptions = ['statusCode', 'delay', 'timeout', 'throttle']; let changed = false; diff --git a/src/routes.ts b/src/routes.ts index a05d7aa..e1c9285 100644 --- a/src/routes.ts +++ b/src/routes.ts @@ -5,6 +5,7 @@ import { generateHeartbeatResponse, addCustomVersionHeader } from './shared/utils'; +import throttlingProxyRoutes from './segments/routes/throttlingProxy'; const apiPrefix = (version: number): string => `api/v${version}`; @@ -20,5 +21,6 @@ export function registerRoutes(app: FastifyInstance) { const opts = { prefix: apiPrefix(2) }; app.register(segmentRoutes, opts); app.register(manifestRoutes, opts); + app.register(throttlingProxyRoutes, opts); addCustomVersionHeader(app); } diff --git a/src/segments/constants.ts b/src/segments/constants.ts index 387dd6d..65e7bfd 100644 --- a/src/segments/constants.ts +++ b/src/segments/constants.ts @@ -9,3 +9,4 @@ export const HLS_PROXY_MEDIA = '/manifests/hls/proxy-media.m3u8'; export const SEGMENTS_PROXY_SEGMENT = '/segments/proxy-segment'; export const DASH_PROXY_MASTER = '/manifests/dash/proxy-master.mpd'; export const DASH_PROXY_SEGMENT = '/manifests/dash/proxy-segment'; +export const THROTTLING_PROXY = '/throttle'; diff --git a/src/segments/handlers/segment.ts b/src/segments/handlers/segment.ts index 0c15a37..13cace6 100644 --- a/src/segments/handlers/segment.ts +++ b/src/segments/handlers/segment.ts @@ -3,12 +3,14 @@ import { ServiceError } from '../../shared/types'; import delaySCC from '../../manifests/utils/corruptions/delay'; import statusCodeSCC from '../../manifests/utils/corruptions/statusCode'; import timeoutSCC from '../../manifests/utils/corruptions/timeout'; +import throttleSCC from '../../manifests/utils/corruptions/throttle'; import { corruptorConfigUtils } from '../../manifests/utils/configs'; import { generateErrorResponse, isValidUrl, refineALBEventQuery } from '../../shared/utils'; +import { THROTTLING_PROXY } from '../constants'; const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); @@ -27,7 +29,11 @@ export default async function segmentHandler( } try { const configUtils = corruptorConfigUtils(new URLSearchParams(query)); - configUtils.register(delaySCC).register(statusCodeSCC).register(timeoutSCC); + configUtils + .register(delaySCC) + .register(statusCodeSCC) + .register(timeoutSCC) + .register(throttleSCC); const [error, allSegmentCorr] = configUtils.getAllSegmentConfigs(); if (error) { @@ -63,6 +69,23 @@ export default async function segmentHandler( }) }; } + // apply Throttle + if ( + allSegmentCorr.get('throttle') && + allSegmentCorr.get('throttle').fields.rate !== 'undefined' + ) { + const rate = Number(allSegmentCorr.get('throttle').fields.rate); + return { + statusCode: 302, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Headers': 'Content-Type, Origin', + Location: + '/api/v2' + THROTTLING_PROXY + '?url=' + query.url + '&rate=' + rate + }, + body: 'stream corruptor throttling redirect' + }; + } // Redirect to Source File return { statusCode: 302, diff --git a/src/segments/routes/throttlingProxy.ts b/src/segments/routes/throttlingProxy.ts new file mode 100644 index 0000000..99bae6d --- /dev/null +++ b/src/segments/routes/throttlingProxy.ts @@ -0,0 +1,33 @@ +import { FastifyInstance } from 'fastify'; +import { composeALBEvent } from '../../shared/utils'; +import { THROTTLING_PROXY } from '../constants'; +import fetch from 'node-fetch'; +import { Throttle } from 'stream-throttle'; + +export default async function throttlingProxyRoutes(fastify: FastifyInstance) { + fastify.get(THROTTLING_PROXY, async (req, res) => { + const event = await composeALBEvent(req.method, req.url, req.headers); + + const query = event.queryStringParameters; + if (!query) { + res.code(501); + return; + } + const url = query['url']; + const rate = Number(query['rate']); + + const middle = await fetch(url); + if (middle.status != 200) { + res.code(500).send('Invalid return code for segment from remote'); + } + + const headers = {}; + middle.headers.forEach((v, k) => { + headers[k] = v; + }); + + const throttle = new Throttle({ rate }); + + res.code(middle.status).headers(headers).send(middle.body.pipe(throttle)); + }); +} diff --git a/src/shared/aws.utils.ts b/src/shared/aws.utils.ts index b552e6b..a4686b5 100644 --- a/src/shared/aws.utils.ts +++ b/src/shared/aws.utils.ts @@ -40,6 +40,7 @@ export function addSSMUrlParametersToUrl(url: string): Promise { !url.includes('delay') && !url.includes('statusCode') && !url.includes('timeout') && + !url.includes('throttle') && url.includes('proxy-master') ) { const parameterName =