Skip to content

Commit

Permalink
feat: request an update from the first connected peer
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisguttandin committed Dec 19, 2019
1 parent 68a7e69 commit 6087fda
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 4 deletions.
35 changes: 32 additions & 3 deletions src/factories/timing-provider-constructor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ConnectableObservable, Subject, Subscription } from 'rxjs';
import { mask, wrap } from 'rxjs-broker';
import { accept } from 'rxjs-connector';
import { expand, mergeMap, publish, scan, startWith, takeUntil, withLatestFrom } from 'rxjs/operators';
import { expand, mapTo, mergeMap, publish, scan, startWith, takeUntil, withLatestFrom } from 'rxjs/operators';
import {
ITimingProvider,
ITimingStateVector,
Expand All @@ -10,7 +10,7 @@ import {
filterTimingStateVectorUpdate,
translateTimingStateVector
} from 'timing-object';
import { TDataChannelEvent, TTimingProviderConstructor, TTimingProviderConstructorFactory, TUpdateEvent } from '../types';
import { TDataChannelEvent, TRequestEvent, TTimingProviderConstructor, TTimingProviderConstructorFactory, TUpdateEvent } from '../types';

const SUENC_URL = 'https://suenc.io/';

Expand Down Expand Up @@ -39,6 +39,8 @@ export const createTimingProviderConstructor: TTimingProviderConstructorFactory

private _readyState: TConnectionState;

private _remoteRequestsSubscription: null | Subscription;

private _remoteUpdatesSubscription: null | Subscription;

private _skew: number;
Expand All @@ -63,6 +65,7 @@ export const createTimingProviderConstructor: TTimingProviderConstructorFactory
this._onreadystatechange = null;
this._providerId = providerId;
this._readyState = 'connecting';
this._remoteRequestsSubscription = null;
this._remoteUpdatesSubscription = null;
this._skew = 0;
this._startPosition = Number.NEGATIVE_INFINITY;
Expand Down Expand Up @@ -109,11 +112,13 @@ export const createTimingProviderConstructor: TTimingProviderConstructorFactory
}

public destroy (): void {
if (this._remoteUpdatesSubscription === null) {
if (this._remoteRequestsSubscription === null || this._remoteUpdatesSubscription === null) {
throw new Error('The timingProvider is already destroyed.');
}

this._readyState = 'closed';
this._remoteRequestsSubscription.unsubscribe();
this._remoteRequestsSubscription = null;
this._remoteUpdatesSubscription.unsubscribe();
this._remoteUpdatesSubscription = null;
this._updateRequestsSubject.complete();
Expand Down Expand Up @@ -203,6 +208,30 @@ export const createTimingProviderConstructor: TTimingProviderConstructorFactory
this._setInternalVector(vector);
});

this._remoteRequestsSubscription = openedDataChannels
.pipe(
mergeMap((dataChannel, index) => {
const dataChannelSubject = wrap<TDataChannelEvent>(dataChannel);
const requestSubject = mask<TRequestEvent['message'], TRequestEvent, TDataChannelEvent>(
{ type: 'request' },
dataChannelSubject
);

if (index === 0) {
requestSubject.send(undefined);
}

return requestSubject
.pipe(
mapTo(dataChannel),
takeUntil(waitForEvent(dataChannel, 'close'))
);
})
)
.subscribe((dataChannel) => {
dataChannel.send(JSON.stringify({ type: 'update', message: { ...this._vector, timeOrigin: this._timeOrigin } }));
});

this._remoteUpdatesSubscription = openedDataChannels
.pipe(
mergeMap((dataChannel) => {
Expand Down
3 changes: 2 additions & 1 deletion src/types/data-channel-event.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { TPingEvent } from './ping-event';
import { TPongEvent } from './pong-event';
import { TRequestEvent } from './request-event';
import { TUpdateEvent } from './update-event';

export type TDataChannelEvent = TPingEvent | TPongEvent | TUpdateEvent;
export type TDataChannelEvent = TPingEvent | TPongEvent | TRequestEvent | TUpdateEvent;
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './event-target-constructor';
export * from './event-target-constructor-factory';
export * from './ping-event';
export * from './pong-event';
export * from './request-event';
export * from './timing-provider-constructor';
export * from './timing-provider-constructor-factory';
export * from './update-event';
Expand Down
9 changes: 9 additions & 0 deletions src/types/request-event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { TStringifyableJsonObject } from 'rxjs-broker';

export type TRequestEvent = TStringifyableJsonObject<{

message: undefined;

type: 'request';

}>;

0 comments on commit 6087fda

Please sign in to comment.