Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: measures should not be historized on asset #404

Open
wants to merge 1 commit into
base: 2-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/2/concepts/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ The following global options are available:

| Name | Type | Default | Description |
| --------------------- | ------- | ---------------- | -------------------------------------------------------------------------------------------------- |
| `ignoreStartupErrors` | boolean | `false` | If `true`, the plugin will not throw an error if the engine is not reachable at startup. |
| `engine.autoUpdate` | boolean | `true` | If `true`, the plugin will automatically update the engine collections when the plugin is started. |
| `ignoreStartupErrors` | boolean | `false` | If `true`, the plugin will not throw an error if the engine is not reachable at startup. |
| `assetsHistorizesMeasures` | boolean | `false` | If `true`, the plugin creates an asset history document for every asset state update from measures. If set to false, only asset metadata updates will be registered. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Badget since

| `engine.autoUpdate` | boolean | `true` | If `true`, the plugin will automatically update the engine collections when the plugin is started. |
| `adminIndex` | string | `device-manager` | The index name where the plugin stores its configuration and devices. |

## Collections
Expand Down
149 changes: 98 additions & 51 deletions lib/modules/measure/MeasureService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export class MeasureService extends BaseService {
promises.push(this.mutexUpdateAsset(indexId, assetId, asset));

promises.push(
historizeAssetStates(
this.historizeAssetStates(
assetStates,
indexId,
JSON.parse(JSON.stringify(asset.metadata)),
Expand Down Expand Up @@ -409,9 +409,8 @@ export class MeasureService extends BaseService {
promises.push(
this.mutexUpdateAsset(engineId, asset._id, asset._source),
);

promises.push(
historizeAssetStates(
this.historizeAssetStates(
assetStates,
engineId,
originalAssetMetadata,
Expand Down Expand Up @@ -797,62 +796,110 @@ export class MeasureService extends BaseService {
}),
);
}
}

/**
* Create a new document in the collection asset-history, for each asset states
*/
async function historizeAssetStates(
assetStates: Map<number, KDocument<AssetContent<any, any>>>,
engineId: string,
originalAssetMetadata: Metadata,
assetMetadata: Metadata,
): Promise<void> {
const metadataChanges = objectDiff(originalAssetMetadata, assetMetadata);
const lastTimestampRecorded = Array.from(assetStates.keys()).pop();

const histories: AssetHistoryContent[] = [];
for (const [measuredAt, assetState] of assetStates) {
const measureNames = [];

for (const measure of Object.values(assetState._source.measures)) {
if (measure?.measuredAt === measuredAt) {
measureNames.push(measure.name);
/**
* Create a new document in the collection asset-history, for each asset state
*/
private async historizeAssetStates(
assetStates: Map<number, KDocument<AssetContent<any, any>>>,
engineId: string,
originalAssetMetadata: Metadata,
assetMetadata: Metadata,
): Promise<void> {
const metadataChanges = objectDiff(originalAssetMetadata, assetMetadata);

if (!this.config.assetsHistorizesMeasures) {
// If there are no metadata changes, nothing to do.
if (metadataChanges.length === 0) {
return;
}
const timestamps = Array.from(assetStates.keys());
const lastTimestampRecorded = timestamps[timestamps.length - 1];
const lastAssetState = assetStates.get(lastTimestampRecorded);
if (!lastAssetState) {
return;
}
}

const event: AssetHistoryEventMeasure = {
measure: {
names: measureNames,
},
name: "measure",
};
// Update the asset's metadata to the new one.
lastAssetState._source.metadata = assetMetadata;

assetState._source.metadata = originalAssetMetadata;
// Create a metadata event.
const event: AssetHistoryEventMetadata = {
metadata: {
names: metadataChanges,
},
name: "metadata",
};

if (metadataChanges.length !== 0 && measuredAt === lastTimestampRecorded) {
(event as unknown as AssetHistoryEventMetadata).metadata = {
names: metadataChanges,
const history: AssetHistoryContent = {
asset: lastAssetState._source,
event,
id: lastAssetState._id,
timestamp: lastTimestampRecorded,
};

assetState._source.metadata = assetMetadata;
return ask<AskAssetHistoryAdd<AssetHistoryEventMetadata>>(
"ask:device-manager:asset:history:add",
{
engineId,
histories: [history],
},
);
}

histories.push({
asset: assetState._source,
event,
id: assetState._id,
timestamp: measuredAt,
});
}
// Otherwise, we record measure events (and metadata on the last state if needed)
const timestamps = Array.from(assetStates.keys());
const lastTimestampRecorded = timestamps[timestamps.length - 1];

const histories: AssetHistoryContent[] = [];
for (const [measuredAt, assetState] of assetStates) {
// Gather measure names corresponding to the current timestamp.
const measureNames: string[] = [];
for (const measure of Object.values(assetState._source.measures)) {
if (measure?.measuredAt === measuredAt) {
measureNames.push(measure.name);
}
}

// Build the measure event.
const event: AssetHistoryEventMeasure = {
measure: {
names: measureNames,
},
name: "measure",
};

// Initialize with the original metadata.
assetState._source.metadata = originalAssetMetadata;

// If we are at the last recorded state and there are metadata changes,
// update both the event and asset metadata.
if (
metadataChanges.length !== 0 &&
measuredAt === lastTimestampRecorded
) {
(event as unknown as AssetHistoryEventMetadata).metadata = {
names: metadataChanges,
};
assetState._source.metadata = assetMetadata;
}

return ask<AskAssetHistoryAdd<AssetHistoryEventMeasure>>(
"ask:device-manager:asset:history:add",
{
engineId,
// Reverse order because for now, measuredAt are sorted in ascending order
// While in mCreate the last item will be the first document to be created
histories: histories.reverse(),
},
);
histories.push({
asset: assetState._source,
event,
id: assetState._id,
timestamp: measuredAt,
});
}

return ask<AskAssetHistoryAdd<AssetHistoryEventMeasure>>(
"ask:device-manager:asset:history:add",
{
engineId,
// Reverse order because measuredAt is sorted in ascending order,
// while in mCreate the last item will be the first document to be created
histories: histories.reverse(),
},
);
}
}
1 change: 1 addition & 0 deletions lib/modules/plugin/DeviceManagerPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ export class DeviceManagerPlugin extends Plugin {

this.config = {
ignoreStartupErrors: false,
assetsHistorizesMeasures: false,
engine: {
autoUpdate: true,
},
Expand Down
7 changes: 6 additions & 1 deletion lib/modules/plugin/types/DeviceManagerConfiguration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ export type DeviceManagerConfiguration = {
* Useful to start the plugin even if the mappings are not up to date.
*/
ignoreStartupErrors: boolean;

/**
* Should be set to true to historize measures in asset-history collection (deprecated)
*
* Should be enabled only for backward compatibility because it leads to important performance costs.
*/
assetsHistorizesMeasures: boolean;
engine: {
/**
* Auto update collection mappings with models
Expand Down
1 change: 0 additions & 1 deletion tests/application/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ registerModels(deviceManager);
registerTestPipes(app);

app.plugin.use(deviceManager);

app.controller.use(new TestsController(app));

app.hook.register("request:onError", async (request: KuzzleRequest) => {
Expand Down
8 changes: 8 additions & 0 deletions tests/application/tests/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ export class TestsController extends Controller {
createDigitalTwinFromBackend: {
handler: this.createDigitalTwinFromBackend,
},
setAssetsHistorizesMeasuresConfig: {
handler: this.setAssetsHistorizesMeasuresConfig,
},
},
};
}

async setAssetsHistorizesMeasuresConfig(request: KuzzleRequest) {
this.app.plugin.get("device-manager").config.assetsHistorizesMeasures =
request.getBoolean("assetsHistorizesMeasures");
}

async createDigitalTwinFromBackend(request: KuzzleRequest) {
const engineId = request.getString("engineId");
const reference = request.getBodyString("reference");
Expand Down
18 changes: 15 additions & 3 deletions tests/scenario/migrated/asset-history.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,17 @@ describe("features/Asset/History", () => {
});
});

it("Historize asset after receiving a new measure", async () => {
it("Historize asset after receiving a new measure and assetsHistorizesMeasures is true", async () => {

let response;
let promise;

await sdk.query({
controller: "tests",
action: "setAssetsHistorizesMeasuresConfig",
assetsHistorizesMeasures: true,
});

response = await sendPayloads(sdk, "dummy-temp", [
{ deviceEUI: "linked1", temperature: 42.2 },
]);
Expand Down Expand Up @@ -164,6 +171,12 @@ describe("features/Asset/History", () => {
length: 1,
},
});

await sdk.query({
controller: "tests",
action: "setAssetsHistorizesMeasuresConfig",
assetsHistorizesMeasures: false,
});
});

it("Historize asset when metadata have been updated when receiving a measure", async () => {
Expand All @@ -187,8 +200,7 @@ describe("features/Asset/History", () => {
expect(result.hits[0]._source).toMatchObject({
id: "Container-linked1",
event: {
name: "measure",
measure: { names: ["temperatureExt"] },
name: "metadata",
metadata: { names: ["weight", "trailer.capacity"] },
},
asset: {
Expand Down
12 changes: 12 additions & 0 deletions tests/scenario/migrated/decoder-payload-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ describe("features/Decoder/PayloadController", () => {
});

it("Receive a payload with 3 measures but only 2 are propagated to the asset", async () => {
await sdk.query({
controller: "tests",
action: "setAssetsHistorizesMeasuresConfig",
assetsHistorizesMeasures: true,
});

await sendPayloads(sdk, "dummy-temp-position", [
{
deviceEUI: "linked2",
Expand Down Expand Up @@ -217,6 +223,12 @@ describe("features/Decoder/PayloadController", () => {
});
expect(hit.valid).toBeTruthy();
expect(hit.state).toBe("VALID");

await sdk.query({
controller: "tests",
action: "setAssetsHistorizesMeasuresConfig",
assetsHistorizesMeasures: false,
});
});

it("Historize the measures with device and asset context", async () => {
Expand Down
Loading