diff --git a/doc/2/concepts/configuration/index.md b/doc/2/concepts/configuration/index.md index 0992f35c..e0272f18 100644 --- a/doc/2/concepts/configuration/index.md +++ b/doc/2/concepts/configuration/index.md @@ -28,8 +28,9 @@ The following global options are available: | Name | Type | Default | Description | | --------------------- | ------- | ---------------- | -------------------------------------------------------------------------------------------------- | -| `ignoreStartupErrors` | boolean | `false` | If `true`, the plugin will not throw an error if the engine is not reachable at startup. | -| `engine.autoUpdate` | boolean | `true` | If `true`, the plugin will automatically update the engine collections when the plugin is started. | +| `ignoreStartupErrors` | boolean | `false` | If `true`, the plugin will not throw an error if the engine is not reachable at startup. | +| `assetsHistorizesMeasures` | boolean | `false` | If `true`, the plugin creates an asset history document for every asset state update from measures. If set to false, only asset metadata updates will be registered. | +| `engine.autoUpdate` | boolean | `true` | If `true`, the plugin will automatically update the engine collections when the plugin is started. | | `adminIndex` | string | `device-manager` | The index name where the plugin stores its configuration and devices. | ## Collections diff --git a/lib/modules/measure/MeasureService.ts b/lib/modules/measure/MeasureService.ts index b42c626e..36e318d8 100644 --- a/lib/modules/measure/MeasureService.ts +++ b/lib/modules/measure/MeasureService.ts @@ -176,7 +176,7 @@ export class MeasureService extends BaseService { promises.push(this.mutexUpdateAsset(indexId, assetId, asset)); promises.push( - historizeAssetStates( + this.historizeAssetStates( assetStates, indexId, JSON.parse(JSON.stringify(asset.metadata)), @@ -409,9 +409,8 @@ export class MeasureService extends BaseService { promises.push( this.mutexUpdateAsset(engineId, asset._id, asset._source), ); - promises.push( - historizeAssetStates( + this.historizeAssetStates( assetStates, engineId, originalAssetMetadata, @@ -797,62 +796,110 @@ export class MeasureService extends BaseService { }), ); } -} -/** - * Create a new document in the collection asset-history, for each asset states - */ -async function historizeAssetStates( - assetStates: Map>>, - engineId: string, - originalAssetMetadata: Metadata, - assetMetadata: Metadata, -): Promise { - const metadataChanges = objectDiff(originalAssetMetadata, assetMetadata); - const lastTimestampRecorded = Array.from(assetStates.keys()).pop(); - - const histories: AssetHistoryContent[] = []; - for (const [measuredAt, assetState] of assetStates) { - const measureNames = []; - - for (const measure of Object.values(assetState._source.measures)) { - if (measure?.measuredAt === measuredAt) { - measureNames.push(measure.name); + /** + * Create a new document in the collection asset-history, for each asset state + */ + private async historizeAssetStates( + assetStates: Map>>, + engineId: string, + originalAssetMetadata: Metadata, + assetMetadata: Metadata, + ): Promise { + const metadataChanges = objectDiff(originalAssetMetadata, assetMetadata); + + if (!this.config.assetsHistorizesMeasures) { + // If there are no metadata changes, nothing to do. + if (metadataChanges.length === 0) { + return; + } + const timestamps = Array.from(assetStates.keys()); + const lastTimestampRecorded = timestamps[timestamps.length - 1]; + const lastAssetState = assetStates.get(lastTimestampRecorded); + if (!lastAssetState) { + return; } - } - const event: AssetHistoryEventMeasure = { - measure: { - names: measureNames, - }, - name: "measure", - }; + // Update the asset's metadata to the new one. + lastAssetState._source.metadata = assetMetadata; - assetState._source.metadata = originalAssetMetadata; + // Create a metadata event. + const event: AssetHistoryEventMetadata = { + metadata: { + names: metadataChanges, + }, + name: "metadata", + }; - if (metadataChanges.length !== 0 && measuredAt === lastTimestampRecorded) { - (event as unknown as AssetHistoryEventMetadata).metadata = { - names: metadataChanges, + const history: AssetHistoryContent = { + asset: lastAssetState._source, + event, + id: lastAssetState._id, + timestamp: lastTimestampRecorded, }; - assetState._source.metadata = assetMetadata; + return ask>( + "ask:device-manager:asset:history:add", + { + engineId, + histories: [history], + }, + ); } - histories.push({ - asset: assetState._source, - event, - id: assetState._id, - timestamp: measuredAt, - }); - } + // Otherwise, we record measure events (and metadata on the last state if needed) + const timestamps = Array.from(assetStates.keys()); + const lastTimestampRecorded = timestamps[timestamps.length - 1]; + + const histories: AssetHistoryContent[] = []; + for (const [measuredAt, assetState] of assetStates) { + // Gather measure names corresponding to the current timestamp. + const measureNames: string[] = []; + for (const measure of Object.values(assetState._source.measures)) { + if (measure?.measuredAt === measuredAt) { + measureNames.push(measure.name); + } + } + + // Build the measure event. + const event: AssetHistoryEventMeasure = { + measure: { + names: measureNames, + }, + name: "measure", + }; + + // Initialize with the original metadata. + assetState._source.metadata = originalAssetMetadata; + + // If we are at the last recorded state and there are metadata changes, + // update both the event and asset metadata. + if ( + metadataChanges.length !== 0 && + measuredAt === lastTimestampRecorded + ) { + (event as unknown as AssetHistoryEventMetadata).metadata = { + names: metadataChanges, + }; + assetState._source.metadata = assetMetadata; + } - return ask>( - "ask:device-manager:asset:history:add", - { - engineId, - // Reverse order because for now, measuredAt are sorted in ascending order - // While in mCreate the last item will be the first document to be created - histories: histories.reverse(), - }, - ); + histories.push({ + asset: assetState._source, + event, + id: assetState._id, + timestamp: measuredAt, + }); + } + + return ask>( + "ask:device-manager:asset:history:add", + { + engineId, + // Reverse order because measuredAt is sorted in ascending order, + // while in mCreate the last item will be the first document to be created + histories: histories.reverse(), + }, + ); + } } diff --git a/lib/modules/plugin/DeviceManagerPlugin.ts b/lib/modules/plugin/DeviceManagerPlugin.ts index c413a930..69b7f57e 100644 --- a/lib/modules/plugin/DeviceManagerPlugin.ts +++ b/lib/modules/plugin/DeviceManagerPlugin.ts @@ -363,6 +363,7 @@ export class DeviceManagerPlugin extends Plugin { this.config = { ignoreStartupErrors: false, + assetsHistorizesMeasures: false, engine: { autoUpdate: true, }, diff --git a/lib/modules/plugin/types/DeviceManagerConfiguration.ts b/lib/modules/plugin/types/DeviceManagerConfiguration.ts index 8591f8c0..a0543608 100644 --- a/lib/modules/plugin/types/DeviceManagerConfiguration.ts +++ b/lib/modules/plugin/types/DeviceManagerConfiguration.ts @@ -7,7 +7,12 @@ export type DeviceManagerConfiguration = { * Useful to start the plugin even if the mappings are not up to date. */ ignoreStartupErrors: boolean; - + /** + * Should be set to true to historize measures in asset-history collection (deprecated) + * + * Should be enabled only for backward compatibility because it leads to important performance costs. + */ + assetsHistorizesMeasures: boolean; engine: { /** * Auto update collection mappings with models diff --git a/tests/application/app.ts b/tests/application/app.ts index d47c4ff4..c31f9e0c 100644 --- a/tests/application/app.ts +++ b/tests/application/app.ts @@ -31,7 +31,6 @@ registerModels(deviceManager); registerTestPipes(app); app.plugin.use(deviceManager); - app.controller.use(new TestsController(app)); app.hook.register("request:onError", async (request: KuzzleRequest) => { diff --git a/tests/application/tests/controller.ts b/tests/application/tests/controller.ts index 67d6e468..b57a9a5a 100644 --- a/tests/application/tests/controller.ts +++ b/tests/application/tests/controller.ts @@ -16,10 +16,18 @@ export class TestsController extends Controller { createDigitalTwinFromBackend: { handler: this.createDigitalTwinFromBackend, }, + setAssetsHistorizesMeasuresConfig: { + handler: this.setAssetsHistorizesMeasuresConfig, + }, }, }; } + async setAssetsHistorizesMeasuresConfig(request: KuzzleRequest) { + this.app.plugin.get("device-manager").config.assetsHistorizesMeasures = + request.getBoolean("assetsHistorizesMeasures"); + } + async createDigitalTwinFromBackend(request: KuzzleRequest) { const engineId = request.getString("engineId"); const reference = request.getBodyString("reference"); diff --git a/tests/scenario/migrated/asset-history.test.ts b/tests/scenario/migrated/asset-history.test.ts index fb8f0b6b..25696bcb 100644 --- a/tests/scenario/migrated/asset-history.test.ts +++ b/tests/scenario/migrated/asset-history.test.ts @@ -132,10 +132,17 @@ describe("features/Asset/History", () => { }); }); - it("Historize asset after receiving a new measure", async () => { + it("Historize asset after receiving a new measure and assetsHistorizesMeasures is true", async () => { + let response; let promise; + await sdk.query({ + controller: "tests", + action: "setAssetsHistorizesMeasuresConfig", + assetsHistorizesMeasures: true, + }); + response = await sendPayloads(sdk, "dummy-temp", [ { deviceEUI: "linked1", temperature: 42.2 }, ]); @@ -164,6 +171,12 @@ describe("features/Asset/History", () => { length: 1, }, }); + + await sdk.query({ + controller: "tests", + action: "setAssetsHistorizesMeasuresConfig", + assetsHistorizesMeasures: false, + }); }); it("Historize asset when metadata have been updated when receiving a measure", async () => { @@ -187,8 +200,7 @@ describe("features/Asset/History", () => { expect(result.hits[0]._source).toMatchObject({ id: "Container-linked1", event: { - name: "measure", - measure: { names: ["temperatureExt"] }, + name: "metadata", metadata: { names: ["weight", "trailer.capacity"] }, }, asset: { diff --git a/tests/scenario/migrated/decoder-payload-controller.test.ts b/tests/scenario/migrated/decoder-payload-controller.test.ts index 70bea4a9..a86a3b70 100644 --- a/tests/scenario/migrated/decoder-payload-controller.test.ts +++ b/tests/scenario/migrated/decoder-payload-controller.test.ts @@ -141,6 +141,12 @@ describe("features/Decoder/PayloadController", () => { }); it("Receive a payload with 3 measures but only 2 are propagated to the asset", async () => { + await sdk.query({ + controller: "tests", + action: "setAssetsHistorizesMeasuresConfig", + assetsHistorizesMeasures: true, + }); + await sendPayloads(sdk, "dummy-temp-position", [ { deviceEUI: "linked2", @@ -217,6 +223,12 @@ describe("features/Decoder/PayloadController", () => { }); expect(hit.valid).toBeTruthy(); expect(hit.state).toBe("VALID"); + + await sdk.query({ + controller: "tests", + action: "setAssetsHistorizesMeasuresConfig", + assetsHistorizesMeasures: false, + }); }); it("Historize the measures with device and asset context", async () => { diff --git a/tests/scenario/modules/assets/asset-history.test.ts b/tests/scenario/modules/assets/asset-history.test.ts index 005f274a..842c8097 100644 --- a/tests/scenario/modules/assets/asset-history.test.ts +++ b/tests/scenario/modules/assets/asset-history.test.ts @@ -6,8 +6,13 @@ jest.setTimeout(10000); describe("DeviceController: receiveMeasure", () => { const sdk = setupHooks(); + it("should save asset history when measure is received and assetsHistorizesMeasures is true", async () => { + await sdk.query({ + controller: "tests", + action: "setAssetsHistorizesMeasuresConfig", + assetsHistorizesMeasures: true, + }); - it("should save asset history when measure is received", async () => { await sendDummyTempPayloads(sdk, [ { deviceEUI: "linked1", @@ -33,9 +38,20 @@ describe("DeviceController: receiveMeasure", () => { }, }); expect(result.hits[0]._source.event.metadata).toBeUndefined(); + + await sdk.query({ + controller: "tests", + action: "setAssetsHistorizesMeasuresConfig", + assetsHistorizesMeasures: false, + }); }); - it("should historize asset for each measurements of the same measure received in non-chronological order", async () => { + it("should historize asset for each measurements of the same measure received in non-chronological order and assetsHistorizesMeasures is true", async () => { + await sdk.query({ + controller: "tests", + action: "setAssetsHistorizesMeasuresConfig", + assetsHistorizesMeasures: true, + }); await sendDummyTempPayloads(sdk, [ { measurements: [ @@ -101,6 +117,11 @@ describe("DeviceController: receiveMeasure", () => { measures: { temperatureExt: { values: { temperature: 13.25 } } }, }, }); + await sdk.query({ + controller: "tests", + action: "setAssetsHistorizesMeasuresConfig", + assetsHistorizesMeasures: false, + }); }); it("should add a metadata event to the history entry", async () => { @@ -128,10 +149,7 @@ describe("DeviceController: receiveMeasure", () => { expect(result.hits[0]._source).toMatchObject({ id: "Container-linked1", event: { - name: "measure", - measure: { - names: ["temperatureExt"], - }, + name: "metadata", metadata: { names: ["weight", "trailer.capacity"], }, @@ -297,14 +315,11 @@ describe("DeviceController: receiveMeasure", () => { "assets-history", ); - expect(result.hits).toHaveLength(2); + expect(result.hits).toHaveLength(1); expect(result.hits[0]._source).toMatchObject({ id: "Container-linked1", event: { - name: "measure", - measure: { - names: ["temperatureExt"], - }, + name: "metadata", metadata: { names: ["weight", "trailer.capacity"], }, @@ -314,18 +329,5 @@ describe("DeviceController: receiveMeasure", () => { metadata: { weight: 42042, trailer: { capacity: 2048 } }, }, }); - expect(result.hits[1]._source).toMatchObject({ - id: "Container-linked1", - event: { - name: "measure", - measure: { - names: ["temperatureExt"], - }, - }, - asset: { - measures: { temperatureExt: { values: { temperature: 13.26 } } }, - metadata: { weight: 10, trailer: { capacity: 1024 } }, - }, - }); }); });