diff --git a/src/Net/Net.ts b/src/Net/Net.ts index bf62c0929..fe2fb1939 100644 --- a/src/Net/Net.ts +++ b/src/Net/Net.ts @@ -1,10 +1,10 @@ -import { Observable, BehaviorSubject, of, from } from 'rxjs' -import { concatAll, concatMap, filter, mapTo, mergeMap, switchMap, tap } from 'rxjs/operators' +import { Observable, BehaviorSubject, of, from, empty } from 'rxjs' +import { concatAll, concatMap, filter, mapTo, mergeMap, reduce, switchMap, tap } from 'rxjs/operators' import { QueryToken, SelectorMeta, ProxySelector } from 'reactivedb/proxy' import { JoinMode } from 'reactivedb/interface' import { Database, Query, Predicate, ExecutorResult } from 'reactivedb' -import { forEach, ParsedWSMsg, WSMsgToDBHandler, GeneralSchemaDef } from '../utils' +import { forEach, identity, ParsedWSMsg, WSMsgToDBHandler, GeneralSchemaDef } from '../utils' import { SDKLogger } from '../utils/Logger' /** @@ -140,31 +140,27 @@ export class Net { public persistedDataBuffer: BufferObject[] = [] private msgToDB: WSMsgToDBHandler | undefined - private validate = (result: ApiResult) => { - const { tableName, required, padding } = result - - const hasRequiredFields = Array.isArray(required) - const hasPaddingFunction = typeof padding === 'function' + private validate = ({ tableName, required, padding }: ApiResult) => { const pk = this.primaryKeys.get(tableName) - - const fn = switchMap(data => !data.length - ? of(data) - : from(data).pipe( - mergeMap(datum => { - if (!hasRequiredFields || !hasPaddingFunction || !pk || - required!.every(k => typeof datum[k] !== 'undefined') - ) { - return of(datum) - } - const patch = padding!(datum[pk]) - return patch.pipe( - filter((r): r is T => r != null), - concatMap(r => this.database!.upsert(tableName, r).pipe(mapTo(r))), - tap(r => Object.assign(datum, r)) - ) - }) - ).pipe(mapTo(data)) - ) + const noRequiredPadding = !Array.isArray(required) || typeof padding !== 'function' || !pk + + const fn = switchMap((results) => { + return !results.length + ? of([]) + : from(results).pipe( + mergeMap((result) => { + return noRequiredPadding || required!.every(k => typeof result[k] !== 'undefined') + ? empty() + : padding!(result[pk!]).pipe( + filter((r): r is T => r != null), + concatMap((r) => this.database!.upsert(tableName, r).pipe( + tap(() => Object.assign(result, r)) + )) + ) + }), + reduce(identity, results) + ) + }) fn.toString = () => 'SDK_VALIDATE' return fn } diff --git a/test/net/net.ts b/test/net/net.ts index 029792ff7..bce8449a1 100644 --- a/test/net/net.ts +++ b/test/net/net.ts @@ -1,5 +1,5 @@ -import { defer, of, Subscription, asapScheduler } from 'rxjs' -import { share, take, tap, subscribeOn } from 'rxjs/operators' +import { defer, of, asapScheduler } from 'rxjs' +import { take, tap, subscribeOn } from 'rxjs/operators' import { describe, beforeEach, afterEach, it } from 'tman' import { Database, DataStoreType } from 'reactivedb' import { expect, use } from 'chai' @@ -22,7 +22,6 @@ describe('Net test', () => { let httpBackend: Backend let database: Database let version = 1 - let subscription: Subscription | undefined let spyFetch: sinon.SinonSpy const sdkFetch = new SDKFetch() @@ -46,9 +45,6 @@ describe('Net test', () => { afterEach(function* () { httpBackend.restore() spyFetch && spyFetch.restore() - if (subscription instanceof Subscription) { - subscription.unsubscribe() - } yield database.dispose() }) @@ -114,8 +110,6 @@ describe('Net test', () => { }) .changes() - subscription = stream$.subscribe() - yield stream$.pipe(take(1)) const newLocation = 'test_new_location' @@ -201,8 +195,6 @@ describe('Net test', () => { } as ApiResult) .changes() - subscription = stream$.subscribe() - yield stream$.pipe(take(1)) const newLocation = 'new_event_location' @@ -248,8 +240,6 @@ describe('Net test', () => { }) .changes() - subscription = stream$.subscribe() - yield stream$.pipe(take(1)) httpBackend.whenGET(`${apiHost}/api/events/${partialEvent._id}`) @@ -292,9 +282,7 @@ describe('Net test', () => { required: ['startDate'], padding: (id: string) => sdkFetch.get(`api/events/${id}`) }) - .changes().pipe(share()) - - subscription = stream$.subscribe() + .changes() yield stream$.pipe(take(1)) @@ -409,8 +397,6 @@ describe('Net test', () => { const stream$ = getToken() .changes() - subscription = stream$.subscribe() - yield stream$.pipe(take(1)) const newLocation = 'new_event_location' @@ -464,10 +450,7 @@ describe('Net test', () => { location: newLocation } - const stream$ = getToken() - .changes() - - subscription = stream$.subscribe() + const stream$ = getToken().changes() yield stream$.pipe(take(1))