Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve PartialECChangeUnifier to handle very large changeset #7607

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
4 changes: 3 additions & 1 deletion common/api/core-backend.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -4440,7 +4440,9 @@ export class OrthographicViewDefinition extends SpatialViewDefinition {
}

// @beta
export class PartialECChangeUnifier {
export class PartialECChangeUnifier implements Disposable {
[Symbol.dispose](): void;
constructor(_db: AnyDb);
appendFrom(adaptor: ChangesetECAdaptor): void;
get instances(): IterableIterator<ChangedECInstance>;
stripMetaData(): void;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@itwin/core-backend",
"comment": "Improve change unifer to handle very large changeset",
"type": "none"
}
],
"packageName": "@itwin/core-backend"
}
137 changes: 112 additions & 25 deletions core/backend/src/ChangesetECAdaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
/** @packageDocumentation
* @module ECDb
*/
import { DbResult, GuidString, Id64String } from "@itwin/core-bentley";
import { DbResult, Guid, GuidString, Id64String } from "@itwin/core-bentley";
import { AnyDb, SqliteChange, SqliteChangeOp, SqliteChangesetReader, SqliteValueStage } from "./SqliteChangesetReader";
import { Base64EncodedString } from "@itwin/core-common";

interface IClassRef {
classId: Id64String;
Expand Down Expand Up @@ -410,16 +411,29 @@ namespace DateTime {
* span multiple tables.
* @beta
*/
export class PartialECChangeUnifier {
private _cache = new Map<string, ChangedECInstance>();
export class PartialECChangeUnifier implements Disposable {
private readonly _cacheTable = Guid.createValue();
private _readonly = false;
public constructor(private _db: AnyDb) {
this.createTempTable();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about turning using a temp table into an option? Because this method should be quite a bit slower than the old method of just keeping the values in a map in RAM.

Maybe just a boolean parameter, then the question is which should be the default? Only stupidly large iModels/Changesets will have this issue - but those aren't THAT uncommon or unexpected. It'd be nice if we could automatically make an educated guess for which method to use based on the size of the iModel and changes - but is that feasible here?

}

/**
* Dispose the instance.
*/
public [Symbol.dispose](): void {
if (this._db.isOpen) {
this.dropTempTable();
}
}

/**
* Get root class id for a given class
* @param classId given class id
* @param db use to find root class
* @returns return root class id
*/
private static getRootClassId(classId: Id64String, db: AnyDb): Id64String | undefined {
private getRootClassId(classId: Id64String): Id64String | undefined {
const sql = `
WITH
[base_class]([classId], [baseClassId], [Level]) AS(
Expand All @@ -443,7 +457,7 @@ export class PartialECChangeUnifier {
AND [ss].[Name] = 'CoreCustomAttributes'))
ORDER BY [Level] DESC`;

return db.withSqliteStatement(sql, (stmt) => {
return this._db.withSqliteStatement(sql, (stmt) => {
stmt.bindId(1, classId);
if (stmt.step() === DbResult.BE_SQLITE_ROW && !stmt.isValueNull(0)) {
return stmt.getValueString(0);
Expand All @@ -455,12 +469,12 @@ export class PartialECChangeUnifier {
* Combine partial instance with instance with same key if already exists.
* @param rhs partial instance
*/
private combine(rhs: ChangedECInstance, db?: AnyDb): void {
private combine(rhs: ChangedECInstance): void {
if (!rhs.$meta) {
throw new Error("PartialECChange being combine must have '$meta' property");
}
const key = PartialECChangeUnifier.buildKey(rhs, db);
const lhs = this._cache.get(key);
const key = this.buildKey(rhs);
const lhs = this.getInstance(key); // get from temp db instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is probably not needed anymore

if (lhs) {
const { $meta: _, ...restOfRhs } = rhs;
Object.assign(lhs, restOfRhs);
Expand All @@ -469,10 +483,10 @@ export class PartialECChangeUnifier {
lhs.$meta.changeIndexes = [...rhs.$meta?.changeIndexes, ...lhs.$meta?.changeIndexes];

// we preserve child class name & id when merging instance.
if (rhs.$meta.fallbackClassId && lhs.$meta.fallbackClassId && db && rhs.$meta.fallbackClassId !== lhs.$meta.fallbackClassId) {
if (rhs.$meta.fallbackClassId && lhs.$meta.fallbackClassId && rhs.$meta.fallbackClassId !== lhs.$meta.fallbackClassId) {
const lhsClassId = lhs.$meta.fallbackClassId;
const rhsClassId = rhs.$meta.fallbackClassId;
const isRhsIsSubClassOfLhs = db.withPreparedStatement("SELECT ec_instanceof(?,?)", (stmt) => {
const isRhsIsSubClassOfLhs = this._db.withPreparedStatement("SELECT ec_instanceof(?,?)", (stmt) => {
stmt.bindId(1, rhsClassId);
stmt.bindId(2, lhsClassId);
stmt.step();
Expand All @@ -484,20 +498,83 @@ export class PartialECChangeUnifier {
}
}
}
this.setInstance(key, lhs);
} else {
this._cache.set(key, rhs);
this.setInstance(key, rhs);
}
}

private getInstance(key: string): ChangedECInstance | undefined {
return this._db.withPreparedSqliteStatement(`SELECT [value] FROM [temp].[${this._cacheTable}] WHERE [key]=?`, (stmt) => {
stmt.bindString(1, key);
if (stmt.step() === DbResult.BE_SQLITE_ROW) {
const out = JSON.parse(stmt.getValueString(0)) as ChangedECInstance;
PartialECChangeUnifier.replaceBase64WithUint8Array(out);
return out;
}
return undefined;
});
}
private static replaceBase64WithUint8Array(row: any): void {
Comment on lines +517 to +518
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add newline between methods

Suggested change
}
private static replaceBase64WithUint8Array(row: any): void {
}
private static replaceBase64WithUint8Array(row: any): void {

for (const key of Object.keys(row)) {
const val = row[key];
if (typeof val === "string") {
if (Base64EncodedString.hasPrefix(val)) {
row[key] = Base64EncodedString.toUint8Array(val);
}
} else if (typeof val === "object" && val !== null) {
this.replaceBase64WithUint8Array(val);
}
}
}
private static replaceUint8ArrayWithBase64(row: any): void {
Comment on lines 529 to +530
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add newline between methods

Suggested change
}
private static replaceUint8ArrayWithBase64(row: any): void {
}
private static replaceUint8ArrayWithBase64(row: any): void {

for (const key of Object.keys(row)) {
const val = row[key];
if (val instanceof Uint8Array) {
row[key] = Base64EncodedString.fromUint8Array(val);
} else if (typeof val === "object" && val !== null) {
this.replaceUint8ArrayWithBase64(val);
}
}
}
private setInstance(key: string, value: ChangedECInstance): void {
Comment on lines +539 to +540
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add newline between methods

Suggested change
}
private setInstance(key: string, value: ChangedECInstance): void {
}
private setInstance(key: string, value: ChangedECInstance): void {

const shallowCopy = Object.assign({}, value);
PartialECChangeUnifier.replaceUint8ArrayWithBase64(shallowCopy);
this._db.withPreparedSqliteStatement(`INSERT INTO [temp].[${this._cacheTable}] ([key], [value]) VALUES (?, ?) ON CONFLICT ([key]) DO UPDATE SET [value] = [excluded].[value]`, (stmt) => {
stmt.bindString(1, key);
stmt.bindString(2, JSON.stringify(shallowCopy));
stmt.step();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

});
}

private createTempTable(): void {
// table name should be unique in case two PartialECChangeUnifiers are made
this._db.withPreparedSqliteStatement(`CREATE TABLE [temp].[${this._cacheTable}] ([key] text primary key, [value] text)`, (stmt) => {
if (DbResult.BE_SQLITE_DONE !== stmt.step()) {
// throw new Error(db.nativeDb.getLastError());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out line

throw new Error("unable to create temp table");
}
});
}
private dropTempTable(): void {
Comment on lines +559 to +560
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add newline between methods

Suggested change
}
private dropTempTable(): void {
}
private dropTempTable(): void {

this._db.withPreparedSqliteStatement(`DROP TABLE IF EXISTS [temp].[${this._cacheTable}]`, (stmt) => {
if (DbResult.BE_SQLITE_DONE !== stmt.step()) {
// throw new Error(db.nativeDb.getLastError());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out line

throw new Error("unable to drop temp table");
}
});
}
/**
Comment on lines +567 to 568
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add newline between methods

Suggested change
}
/**
}
/**

* Build key from EC change.
* @param change EC change
* @returns key created from EC change.
*/
private static buildKey(change: ChangedECInstance, db?: AnyDb): string {
private buildKey(change: ChangedECInstance): string {
let classId = change.ECClassId;
if (typeof classId === "undefined") {
if (db && change.$meta?.fallbackClassId) {
classId = this.getRootClassId(change.$meta.fallbackClassId, db);
if (change.$meta?.fallbackClassId) {
classId = this.getRootClassId(change.$meta.fallbackClassId);
}
if (typeof classId === "undefined") {
throw new Error(`unable to resolve ECClassId to root class id.`);
Expand All @@ -520,33 +597,43 @@ export class PartialECChangeUnifier {
throw new Error("this instance is marked as readonly.");
}

// todo: create table here if not exist on adapter.reader.db

if (adaptor.op === "Updated" && adaptor.inserted && adaptor.deleted) {
this.combine(adaptor.inserted, adaptor.reader.db);
this.combine(adaptor.deleted, adaptor.reader.db);
this.combine(adaptor.inserted);
this.combine(adaptor.deleted);
} else if (adaptor.op === "Inserted" && adaptor.inserted) {
this.combine(adaptor.inserted, adaptor.reader.db);
this.combine(adaptor.inserted);
} else if (adaptor.op === "Deleted" && adaptor.deleted) {
this.combine(adaptor.deleted, adaptor.reader.db);
this.combine(adaptor.deleted);
}
}
/**
* Delete $meta from all the instances.
*/
public stripMetaData(): void {
for (const inst of this._cache.values()) {
if ("$meta" in inst) {
delete inst.$meta;
}
}
this._db.withPreparedSqliteStatement(`UPDATE [temp].[${this._cacheTable}] SET [value] = json_remove([value], '$meta')`, (stmt) => {
stmt.step();
});
this._readonly = true;
}
private *getInstances(): IterableIterator<ChangedECInstance> {
Comment on lines 619 to +620
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add newline between methods

Suggested change
}
private *getInstances(): IterableIterator<ChangedECInstance> {
}
private *getInstances(): IterableIterator<ChangedECInstance> {

const stmt = this._db.prepareSqliteStatement(`SELECT [value] FROM [temp].[${this._cacheTable}]`);
while (stmt.step() === DbResult.BE_SQLITE_ROW) {
const value = JSON.parse(stmt.getValueString(0)) as ChangedECInstance;
PartialECChangeUnifier.replaceBase64WithUint8Array(value);
yield value;
}
stmt[Symbol.dispose]();
}
/**
Comment on lines +628 to 629
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add newline between methods

Suggested change
}
/**
}
/**

* Returns complete EC change instances.
* @beta
*/
public get instances(): IterableIterator<ChangedECInstance> { return this._cache.values(); }
public get instances(): IterableIterator<ChangedECInstance> {
return this.getInstances();
}
}

/**
* Transform sqlite change to ec change. EC change is partial change as
* it is per table while a single instance can span multiple table.
Expand Down
14 changes: 6 additions & 8 deletions core/backend/src/test/standalone/ChangesetReader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ describe("Changeset Reader API", async () => {
if (true || "test local changes") {
const reader = SqliteChangesetReader.openLocalChanges({ db: rwIModel, disableSchemaCheck: true });
using adaptor = new ECChangesetAdaptor(reader);
const cci = new PartialECChangeUnifier();
const cci = new PartialECChangeUnifier(reader.db);
while (adaptor.step()) {
cci.appendFrom(adaptor);
}
Expand Down Expand Up @@ -325,7 +325,6 @@ describe("Changeset Reader API", async () => {
1,
],
stage: "New",
fallbackClassId: undefined,
});
}
const targetDir = path.join(KnownTestLocations.outputDir, rwIModelId, "changesets");
Expand All @@ -337,7 +336,7 @@ describe("Changeset Reader API", async () => {
if (true || "updated element") {
const reader = SqliteChangesetReader.openFile({ fileName: changesets[3].pathname, db: rwIModel, disableSchemaCheck: true });
const adaptor = new ECChangesetAdaptor(reader);
const cci = new PartialECChangeUnifier();
const cci = new PartialECChangeUnifier(reader.db);
while (adaptor.step()) {
cci.appendFrom(adaptor);
}
Expand Down Expand Up @@ -365,7 +364,7 @@ describe("Changeset Reader API", async () => {
const otherDb = SnapshotDb.openFile(IModelTestUtils.resolveAssetFile("test.bim"));
const reader = SqliteChangesetReader.openFile({ fileName: changesets[3].pathname, db: otherDb, disableSchemaCheck: true });
const adaptor = new ECChangesetAdaptor(reader);
const cci = new PartialECChangeUnifier();
const cci = new PartialECChangeUnifier(reader.db);
while (adaptor.step()) {
cci.appendFrom(adaptor);
}
Expand Down Expand Up @@ -396,7 +395,7 @@ describe("Changeset Reader API", async () => {
if (true || "test changeset file") {
const reader = SqliteChangesetReader.openFile({ fileName: changesets[2].pathname, db: rwIModel, disableSchemaCheck: true });
using adaptor = new ECChangesetAdaptor(reader);
const cci = new PartialECChangeUnifier();
const cci = new PartialECChangeUnifier(reader.db);
while (adaptor.step()) {
cci.appendFrom(adaptor);
}
Expand Down Expand Up @@ -461,14 +460,13 @@ describe("Changeset Reader API", async () => {
1,
],
stage: "New",
fallbackClassId: undefined,
});
}
if (true || "test ChangesetAdaptor.acceptClass()") {
const reader = SqliteChangesetReader.openFile({ fileName: changesets[2].pathname, db: rwIModel, disableSchemaCheck: true });
using adaptor = new ECChangesetAdaptor(reader);
adaptor.acceptClass("TestDomain.Test2dElement");
const cci = new PartialECChangeUnifier();
const cci = new PartialECChangeUnifier(reader.db);
while (adaptor.step()) {
cci.appendFrom(adaptor);
}
Expand All @@ -480,7 +478,7 @@ describe("Changeset Reader API", async () => {
const reader = SqliteChangesetReader.openFile({ fileName: changesets[2].pathname, db: rwIModel, disableSchemaCheck: true });
using adaptor = new ECChangesetAdaptor(reader);
adaptor.acceptOp("Updated");
const cci = new PartialECChangeUnifier();
const cci = new PartialECChangeUnifier(reader.db);
while (adaptor.step()) {
cci.appendFrom(adaptor);
}
Expand Down
Loading