From 6087fdad614053b3c0ea11a95164984153183e42 Mon Sep 17 00:00:00 2001 From: Christoph Guttandin Date: Thu, 19 Dec 2019 13:18:56 +0100 Subject: [PATCH] feat: request an update from the first connected peer --- src/factories/timing-provider-constructor.ts | 35 ++++++++++++++++++-- src/types/data-channel-event.ts | 3 +- src/types/index.ts | 1 + src/types/request-event.ts | 9 +++++ 4 files changed, 44 insertions(+), 4 deletions(-) create mode 100644 src/types/request-event.ts diff --git a/src/factories/timing-provider-constructor.ts b/src/factories/timing-provider-constructor.ts index 3457f911..0ab2b824 100644 --- a/src/factories/timing-provider-constructor.ts +++ b/src/factories/timing-provider-constructor.ts @@ -1,7 +1,7 @@ import { ConnectableObservable, Subject, Subscription } from 'rxjs'; import { mask, wrap } from 'rxjs-broker'; import { accept } from 'rxjs-connector'; -import { expand, mergeMap, publish, scan, startWith, takeUntil, withLatestFrom } from 'rxjs/operators'; +import { expand, mapTo, mergeMap, publish, scan, startWith, takeUntil, withLatestFrom } from 'rxjs/operators'; import { ITimingProvider, ITimingStateVector, @@ -10,7 +10,7 @@ import { filterTimingStateVectorUpdate, translateTimingStateVector } from 'timing-object'; -import { TDataChannelEvent, TTimingProviderConstructor, TTimingProviderConstructorFactory, TUpdateEvent } from '../types'; +import { TDataChannelEvent, TRequestEvent, TTimingProviderConstructor, TTimingProviderConstructorFactory, TUpdateEvent } from '../types'; const SUENC_URL = 'https://suenc.io/'; @@ -39,6 +39,8 @@ export const createTimingProviderConstructor: TTimingProviderConstructorFactory private _readyState: TConnectionState; + private _remoteRequestsSubscription: null | Subscription; + private _remoteUpdatesSubscription: null | Subscription; private _skew: number; @@ -63,6 +65,7 @@ export const createTimingProviderConstructor: TTimingProviderConstructorFactory this._onreadystatechange = null; this._providerId = providerId; this._readyState = 'connecting'; + this._remoteRequestsSubscription = null; this._remoteUpdatesSubscription = null; this._skew = 0; this._startPosition = Number.NEGATIVE_INFINITY; @@ -109,11 +112,13 @@ export const createTimingProviderConstructor: TTimingProviderConstructorFactory } public destroy (): void { - if (this._remoteUpdatesSubscription === null) { + if (this._remoteRequestsSubscription === null || this._remoteUpdatesSubscription === null) { throw new Error('The timingProvider is already destroyed.'); } this._readyState = 'closed'; + this._remoteRequestsSubscription.unsubscribe(); + this._remoteRequestsSubscription = null; this._remoteUpdatesSubscription.unsubscribe(); this._remoteUpdatesSubscription = null; this._updateRequestsSubject.complete(); @@ -203,6 +208,30 @@ export const createTimingProviderConstructor: TTimingProviderConstructorFactory this._setInternalVector(vector); }); + this._remoteRequestsSubscription = openedDataChannels + .pipe( + mergeMap((dataChannel, index) => { + const dataChannelSubject = wrap(dataChannel); + const requestSubject = mask( + { type: 'request' }, + dataChannelSubject + ); + + if (index === 0) { + requestSubject.send(undefined); + } + + return requestSubject + .pipe( + mapTo(dataChannel), + takeUntil(waitForEvent(dataChannel, 'close')) + ); + }) + ) + .subscribe((dataChannel) => { + dataChannel.send(JSON.stringify({ type: 'update', message: { ...this._vector, timeOrigin: this._timeOrigin } })); + }); + this._remoteUpdatesSubscription = openedDataChannels .pipe( mergeMap((dataChannel) => { diff --git a/src/types/data-channel-event.ts b/src/types/data-channel-event.ts index 76e3cdb6..12b6a111 100644 --- a/src/types/data-channel-event.ts +++ b/src/types/data-channel-event.ts @@ -1,5 +1,6 @@ import { TPingEvent } from './ping-event'; import { TPongEvent } from './pong-event'; +import { TRequestEvent } from './request-event'; import { TUpdateEvent } from './update-event'; -export type TDataChannelEvent = TPingEvent | TPongEvent | TUpdateEvent; +export type TDataChannelEvent = TPingEvent | TPongEvent | TRequestEvent | TUpdateEvent; diff --git a/src/types/index.ts b/src/types/index.ts index 1818ac8f..521cd6f6 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -5,6 +5,7 @@ export * from './event-target-constructor'; export * from './event-target-constructor-factory'; export * from './ping-event'; export * from './pong-event'; +export * from './request-event'; export * from './timing-provider-constructor'; export * from './timing-provider-constructor-factory'; export * from './update-event'; diff --git a/src/types/request-event.ts b/src/types/request-event.ts new file mode 100644 index 00000000..a58bd9f2 --- /dev/null +++ b/src/types/request-event.ts @@ -0,0 +1,9 @@ +import { TStringifyableJsonObject } from 'rxjs-broker'; + +export type TRequestEvent = TStringifyableJsonObject<{ + + message: undefined; + + type: 'request'; + +}>;