Skip to content

Commit

Permalink
[PM-8285] Resolve app id race (#9501)
Browse files Browse the repository at this point in the history
* Do not update appId if it is not null

* Prefer linear transformations to side-effect-based changes

This leaves us open to repeat emits due to updates, but distinct until changed stops those.

Tracker improvements are due to passed in observables with replay causing immediate emits when `expectingEmission`s. This converts to a cold observable that only emits when the tracked observable does _after_ subscribing.

* Prefer while

* PR review
  • Loading branch information
MGibson1 authored Jun 4, 2024
1 parent 90ca434 commit 3154d21
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 55 deletions.
31 changes: 15 additions & 16 deletions libs/common/spec/observable-tracker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Observable, Subscription, firstValueFrom, throwError, timeout } from "rxjs";
import { Observable, Subject, Subscription, firstValueFrom, throwError, timeout } from "rxjs";

/** Test class to enable async awaiting of observable emissions */
export class ObservableTracker<T> {
private subscription: Subscription;
private emissionReceived = new Subject<T>();
emissions: T[] = [];
constructor(private observable: Observable<T>) {
constructor(observable: Observable<T>) {
this.emissions = this.trackEmissions(observable);
}

Expand All @@ -21,7 +22,7 @@ export class ObservableTracker<T> {
*/
async expectEmission(msTimeout = 50): Promise<T> {
return await firstValueFrom(
this.observable.pipe(
this.emissionReceived.pipe(
timeout({
first: msTimeout,
with: () => throwError(() => new Error("Timeout exceeded waiting for another emission.")),
Expand All @@ -34,40 +35,38 @@ export class ObservableTracker<T> {
* @param count The number of emissions to wait for
*/
async pauseUntilReceived(count: number, msTimeout = 50): Promise<T[]> {
for (let i = 0; i < count - this.emissions.length; i++) {
while (this.emissions.length < count) {
await this.expectEmission(msTimeout);
}
return this.emissions;
}

private trackEmissions<T>(observable: Observable<T>): T[] {
private trackEmissions(observable: Observable<T>): T[] {
const emissions: T[] = [];
this.subscription = observable.subscribe((value) => {
switch (value) {
case undefined:
case null:
emissions.push(value);
return;
default:
// process by type
break;
if (value == null) {
this.emissionReceived.next(null);
return;
}

switch (typeof value) {
case "string":
case "number":
case "boolean":
emissions.push(value);
this.emissionReceived.next(value);
break;
case "symbol":
// Cheating types to make symbols work at all
emissions.push(value.toString() as T);
this.emissionReceived.next(value as T);
break;
default: {
emissions.push(clone(value));
this.emissionReceived.next(clone(value));
}
}
});
this.emissionReceived.subscribe((value) => {
emissions.push(value);
});
return emissions;
}
}
Expand Down
82 changes: 50 additions & 32 deletions libs/common/src/platform/services/app-id.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { FakeGlobalStateProvider } from "../../../spec";
import { FakeGlobalState, FakeGlobalStateProvider, ObservableTracker } from "../../../spec";
import { Utils } from "../misc/utils";

import { ANONYMOUS_APP_ID_KEY, APP_ID_KEY, AppIdService } from "./app-id.service";

describe("AppIdService", () => {
const globalStateProvider = new FakeGlobalStateProvider();
const appIdState = globalStateProvider.getFake(APP_ID_KEY);
const anonymousAppIdState = globalStateProvider.getFake(ANONYMOUS_APP_ID_KEY);
let globalStateProvider: FakeGlobalStateProvider;
let appIdState: FakeGlobalState<string>;
let anonymousAppIdState: FakeGlobalState<string>;
let sut: AppIdService;

beforeEach(() => {
globalStateProvider = new FakeGlobalStateProvider();
appIdState = globalStateProvider.getFake(APP_ID_KEY);
anonymousAppIdState = globalStateProvider.getFake(ANONYMOUS_APP_ID_KEY);
sut = new AppIdService(globalStateProvider);
});

afterEach(() => {
jest.restoreAllMocks();
jest.resetAllMocks();
});

describe("getAppId", () => {
Expand All @@ -26,36 +29,42 @@ describe("AppIdService", () => {
expect(appId).toBe("existingAppId");
});

it.each([null, undefined])(
"uses the util function to create a new id when it AppId does not exist",
async (value) => {
appIdState.stateSubject.next(value);
const spy = jest.spyOn(Utils, "newGuid");
it("creates a new appId only once", async () => {
appIdState.stateSubject.next(null);

await sut.getAppId();
const appIds: string[] = [];
const promises = [async () => appIds.push(await sut.getAppId())];
promises.push(async () => appIds.push(await sut.getAppId()));
await Promise.all(promises);

expect(spy).toHaveBeenCalledTimes(1);
},
);
expect(appIds[0]).toBe(appIds[1]);
});

it.each([null, undefined])("returns a new appId when it does not exist", async (value) => {
it.each([null, undefined])("returns a new appId when %s", async (value) => {
appIdState.stateSubject.next(value);

const appId = await sut.getAppId();

expect(appId).toMatch(Utils.guidRegex);
});

it.each([null, undefined])(
"stores the new guid when it an existing one is not found",
async (value) => {
appIdState.stateSubject.next(value);
it.each([null, undefined])("stores the new guid when %s", async (value) => {
appIdState.stateSubject.next(value);

const appId = await sut.getAppId();
const appId = await sut.getAppId();

expect(appIdState.nextMock).toHaveBeenCalledWith(appId);
},
);
expect(appIdState.nextMock).toHaveBeenCalledWith(appId);
});

it("emits only once when creating a new appId", async () => {
appIdState.stateSubject.next(null);

const tracker = new ObservableTracker(sut.appId$);
const appId = await sut.getAppId();

expect(tracker.emissions).toEqual([appId]);
await expect(tracker.pauseUntilReceived(2, 50)).rejects.toThrow("Timeout exceeded");
});
});

describe("getAnonymousAppId", () => {
Expand All @@ -67,17 +76,16 @@ describe("AppIdService", () => {
expect(appId).toBe("existingAppId");
});

it.each([null, undefined])(
"uses the util function to create a new id when it AppId does not exist",
async (value) => {
anonymousAppIdState.stateSubject.next(value);
const spy = jest.spyOn(Utils, "newGuid");
it("creates a new anonymousAppId only once", async () => {
anonymousAppIdState.stateSubject.next(null);

await sut.getAnonymousAppId();
const appIds: string[] = [];
const promises = [async () => appIds.push(await sut.getAnonymousAppId())];
promises.push(async () => appIds.push(await sut.getAnonymousAppId()));
await Promise.all(promises);

expect(spy).toHaveBeenCalledTimes(1);
},
);
expect(appIds[0]).toBe(appIds[1]);
});

it.each([null, undefined])("returns a new appId when it does not exist", async (value) => {
anonymousAppIdState.stateSubject.next(value);
Expand All @@ -97,5 +105,15 @@ describe("AppIdService", () => {
expect(anonymousAppIdState.nextMock).toHaveBeenCalledWith(appId);
},
);

it("emits only once when creating a new anonymousAppId", async () => {
anonymousAppIdState.stateSubject.next(null);

const tracker = new ObservableTracker(sut.anonymousAppId$);
const appId = await sut.getAnonymousAppId();

expect(tracker.emissions).toEqual([appId]);
await expect(tracker.pauseUntilReceived(2, 50)).rejects.toThrow("Timeout exceeded");
});
});
});
22 changes: 15 additions & 7 deletions libs/common/src/platform/services/app-id.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable, filter, firstValueFrom, tap } from "rxjs";
import { Observable, concatMap, distinctUntilChanged, firstValueFrom, share } from "rxjs";

import { AppIdService as AppIdServiceAbstraction } from "../abstractions/app-id.service";
import { Utils } from "../misc/utils";
Expand All @@ -19,20 +19,28 @@ export class AppIdService implements AppIdServiceAbstraction {
const appIdState = globalStateProvider.get(APP_ID_KEY);
const anonymousAppIdState = globalStateProvider.get(ANONYMOUS_APP_ID_KEY);
this.appId$ = appIdState.state$.pipe(
tap(async (appId) => {
concatMap(async (appId) => {
if (!appId) {
await appIdState.update(() => Utils.newGuid());
return await appIdState.update(() => Utils.newGuid(), {
shouldUpdate: (v) => v == null,
});
}
return appId;
}),
filter((appId) => !!appId),
distinctUntilChanged(),
share(),
);
this.anonymousAppId$ = anonymousAppIdState.state$.pipe(
tap(async (appId) => {
concatMap(async (appId) => {
if (!appId) {
await anonymousAppIdState.update(() => Utils.newGuid());
return await anonymousAppIdState.update(() => Utils.newGuid(), {
shouldUpdate: (v) => v == null,
});
}
return appId;
}),
filter((appId) => !!appId),
distinctUntilChanged(),
share(),
);
}

Expand Down

0 comments on commit 3154d21

Please sign in to comment.