Skip to content

Commit

Permalink
...尝试用一个 rx 模块把深模块引用做掉
Browse files Browse the repository at this point in the history
并不再在代码里区分 rxjs, rxjs/operators, rxjs/ajax 这样的子模块引用
(除了在 rx 模块中)。
  • Loading branch information
chuan6 committed Sep 27, 2018
1 parent 0f60cfd commit d433513
Show file tree
Hide file tree
Showing 44 changed files with 167 additions and 143 deletions.
38 changes: 18 additions & 20 deletions src/Net/Http.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { empty, throwError, Observable, Observer, Subject } from 'rxjs'
import { catchError, map, publishReplay, refCount } from 'rxjs/operators'
import { ajax, AjaxError } from 'rxjs/ajax'
import * as rx from '../rx'
import { parseHeaders } from '../utils/index'
import { testable } from '../testable'
import { forEach } from '../utils'
Expand All @@ -24,7 +22,7 @@ type MethodParams = {
url: string,
body?: any,
_opts: any,
errorAdapter$: Subject<HttpErrorMessage>,
errorAdapter$: rx.Subject<HttpErrorMessage>,
includeHeaders: boolean
}

Expand All @@ -49,31 +47,31 @@ const coverRxAjaxHeadersBug = (normHeaders: {}) => {
})
}

export const HttpError$ = new Subject<HttpErrorMessage>() as any as Observable<HttpErrorMessage>
export const HttpError$ = new rx.Subject<HttpErrorMessage>() as any as rx.Observable<HttpErrorMessage>

export const createMethod = (method: AllowedHttpMethod) => (params: MethodParams): Observable<any> => {
export const createMethod = (method: AllowedHttpMethod) => (params: MethodParams): rx.Observable<any> => {
const { url, body, _opts, errorAdapter$, includeHeaders } = params

/* istanbul ignore if */
if (testable.UseXMLHTTPRequest && typeof window !== 'undefined') {
coverRxAjaxHeadersBug(_opts.headers)
return ajax({
return rx.ajax({
url, body, method,
headers: _opts.headers,
withCredentials: _opts.credentials === 'include',
responseType: _opts.responseType || 'json',
crossDomain: typeof _opts.crossDomain !== 'undefined' ? !!_opts.crossDomain : true
})
.pipe(
map(value => {
rx.map(value => {
const respBody = value.response
if (!includeHeaders) {
return respBody
}
const respHeaders = parseHeaders(value.xhr.getAllResponseHeaders())
return { headers: respHeaders, body: respBody }
}),
catchError((e: AjaxError) => {
rx.catch((e: rx.AjaxError) => {
const headers = e.xhr.getAllResponseHeaders()
const errorResponse = new Response(new Blob([JSON.stringify(e.xhr.response)]), {
status: e.xhr.status,
Expand All @@ -86,11 +84,11 @@ export const createMethod = (method: AllowedHttpMethod) => (params: MethodParams
setTimeout(() => {
errorAdapter$.next({ ...requestInfo, error: errorResponseClone })
}, 10)
return throwError({ ...requestInfo, error: errorResponse })
return rx.throw({ ...requestInfo, error: errorResponse })
})
)
} else { // 测试用分支
return Observable.create((observer: Observer<any>) => {
return new rx.Observable((observer) => {
const _options = {
... _opts,
method: method
Expand Down Expand Up @@ -134,16 +132,16 @@ export const createMethod = (method: AllowedHttpMethod) => (params: MethodParams

export const getHttpWithResponseHeaders = <T>(
url?: string,
errorAdapter$?: Subject<HttpErrorMessage>
errorAdapter$?: rx.Subject<HttpErrorMessage>
): Http<HttpResponseWithHeaders<T>> => {
return new Http<HttpResponseWithHeaders<T>>(url, errorAdapter$, true)
}

export class Http<T> {
private errorAdapter$: Subject<HttpErrorMessage>
private errorAdapter$: rx.Subject<HttpErrorMessage>
private cloned = false
private request: Observable<T> | undefined
public mapFn: (v$: Observable<T>) => Observable<any> = (dist$ => dist$)
private request: rx.Observable<T> | undefined
public mapFn: (v$: rx.Observable<T>) => rx.Observable<any> = (dist$ => dist$)

private static get = createMethod('get')
private static put = createMethod('put')
Expand All @@ -157,13 +155,13 @@ export class Http<T> {

constructor(
private url: string = '',
errorAdapter$?: Subject<HttpErrorMessage>,
errorAdapter$?: rx.Subject<HttpErrorMessage>,
private readonly includeHeaders: boolean = false
) {
if (errorAdapter$) {
this.errorAdapter$ = errorAdapter$
} else {
this.errorAdapter$ = HttpError$ as Subject<HttpErrorMessage>
this.errorAdapter$ = HttpError$ as rx.Subject<HttpErrorMessage>
}
}

Expand Down Expand Up @@ -221,14 +219,14 @@ export class Http<T> {
return this
}

send(): Observable<T> {
return this.request ? this.mapFn(this.request) : empty()
send(): rx.Observable<T> {
return this.request ? this.mapFn(this.request) : rx.empty()
}

clone() {
const result = new Http<T>(this.url, this.errorAdapter$)
if (!this.cloned && this.request) {
this.request = this.request.pipe(publishReplay(1), refCount())
this.request = this.request.pipe(rx.publishReplay(1), rx.refCount())
this.cloned = true
result.cloned = true
}
Expand Down
71 changes: 35 additions & 36 deletions src/Net/Net.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { empty, from, of, BehaviorSubject, Observable } from 'rxjs'
import { concatAll, concatMap, filter, mapTo, mergeMap, reduce, switchMap, tap } from 'rxjs/operators'
import * as rx from '../rx'
import { QueryToken, SelectorMeta, ProxySelector } from 'reactivedb/proxy'
import { JoinMode } from 'reactivedb/interface'
import { Database, Query, Predicate, ExecutorResult } from 'reactivedb'
Expand Down Expand Up @@ -53,7 +52,7 @@ export enum CacheStrategy {
}

export interface ApiResult<T, U extends CacheStrategy> {
request: Observable<T> | Observable<T[]>
request: rx.Observable<T> | rx.Observable<T[]>
/**
* 使用 fields 指定需要查询的字段,where 指定查询条件,
* orderBy 指定结果排序规则。更多支持的选项请见具体类型定义。
Expand All @@ -78,19 +77,19 @@ export interface ApiResult<T, U extends CacheStrategy> {
* 或为空([])。
*/
excludeFields?: string[]
padding?: (missedId: string) => Observable<T | null>
padding?: (missedId: string) => rx.Observable<T | null>
}

export type AssocField<T> = { [P in keyof T]?: AssocField<T[P]> | string[] }

export interface CApiResult<T> {
request: Observable<T>
request: rx.Observable<T>
tableName: string
method: 'create'
}

export interface UDResult<T> {
request: Observable<T>
request: rx.Observable<T>
tableName: string
method: 'update' | 'delete'
clause: Predicate<T>
Expand All @@ -113,7 +112,7 @@ export type SocketCUDBufferObject = {
export type SelectorBufferObject = {
kind: 'Selector'
realSelectorInfo: ApiResult<any, CacheStrategy>
proxySelector: BehaviorSubject<SelectorMeta<any>>
proxySelector: rx.BehaviorSubject<SelectorMeta<any>>
}

export type BufferObject = CUDBufferObject | SocketCUDBufferObject | SelectorBufferObject
Expand Down Expand Up @@ -144,21 +143,21 @@ export class Net {
const pk = this.primaryKeys.get(tableName)
const noRequiredPadding = !Array.isArray(required) || typeof padding !== 'function' || !pk

const fn = switchMap<T[], T[]>((results) => {
const fn = rx.switchMap<T[], T[]>((results) => {
return !results.length
? of([])
: from(results).pipe(
mergeMap((result) => {
? rx.of([])
: rx.from(results).pipe(
rx.mergeMap((result) => {
return noRequiredPadding || required!.every(k => typeof result[k] !== 'undefined')
? empty()
? rx.empty()
: padding!(result[pk!]).pipe(
filter((r): r is T => r != null),
concatMap((r) => this.database!.upsert(tableName, r).pipe(
tap(() => Object.assign(result, r))
rx.filter((r): r is T => r != null),
rx.concatMap((r) => this.database!.upsert(tableName, r).pipe(
rx.tap(() => Object.assign(result, r))
))
)
}),
reduce<any, T[]>(identity, results)
rx.reduce<any, T[]>(identity, results)
)
})
fn.toString = () => 'SDK_VALIDATE'
Expand Down Expand Up @@ -191,7 +190,7 @@ export class Net {

lift<T>(result: ApiResult<T, CacheStrategy>): QueryToken<T>

lift<T>(result: CUDApiResult<T>): Observable<T>
lift<T>(result: CUDApiResult<T>): rx.Observable<T>

lift<T>(result: ApiResult<T, CacheStrategy> | CUDApiResult<T>) {
if ((result as ApiResult<T, CacheStrategy>).cacheValidate) {
Expand Down Expand Up @@ -220,8 +219,8 @@ export class Net {
const database = this.database!

const { request, method, tableName } = result
let destination: Observable<ExecutorResult>
return request.pipe(concatMap(v => {
let destination: rx.Observable<ExecutorResult>
return request.pipe(rx.concatMap(v => {
switch (method) {
case 'create':
destination = database.upsert<T>(tableName, v)
Expand All @@ -235,7 +234,7 @@ export class Net {
default:
throw new Error()
}
return destination.pipe(mapTo(v))
return destination.pipe(rx.mapTo(v))
}))
}

Expand All @@ -244,10 +243,10 @@ export class Net {
this.database = database
}

const asyncQueue: Observable<any>[] = []
const asyncQueue: rx.Observable<any>[] = []

forEach(this.persistedDataBuffer, (v: BufferObject) => {
let p: Observable<any> | null = null
let p: rx.Observable<any> | null = null

switch (v.kind) {
case 'CUD':
Expand All @@ -263,7 +262,7 @@ export class Net {
const token = this.handleRequestCache(v.realSelectorInfo)
const selector$ = token.selector$

p = selector$.pipe(tap({
p = selector$.pipe(rx.tap({
next(selector) {
cacheControl$.next(selector)
}
Expand All @@ -280,10 +279,10 @@ export class Net {

this.persistedDataBuffer.length = 0

return from(asyncQueue).pipe(
concatAll(),
tap({
error: async (err: Observable<Error>) => {
return rx.from(asyncQueue).pipe(
rx.concatAll(),
rx.tap({
error: async (err: rx.Observable<Error>) => {
const errObj = await err.toPromise()
SDKLogger.error(errObj.message)
}
Expand All @@ -299,7 +298,7 @@ export class Net {
q,
tableName
)
const cacheControl$ = new BehaviorSubject<SelectorMeta<T>>(proxySelector)
const cacheControl$ = new rx.BehaviorSubject<SelectorMeta<T>>(proxySelector)
this.persistedDataBuffer.push({
kind: 'Selector',
realSelectorInfo: result,
Expand All @@ -311,7 +310,7 @@ export class Net {

bufferCUDResponse<T>(result: CUDApiResult<T>) {
const { request, method, tableName } = result as CUDApiResult<T>
return request.pipe(tap((v: T | T[]) => {
return request.pipe(rx.tap((v: T | T[]) => {
this.persistedDataBuffer.push({
kind: 'CUD',
tableName,
Expand All @@ -326,7 +325,7 @@ export class Net {
kind: 'SocketCUD',
socketMessage
})
return of(null)
return rx.of(null)
}

private genCacheKey<T>(tableName: string, q: Readonly<Query<T>>) {
Expand All @@ -344,7 +343,7 @@ export class Net {
} = this.getInfoFromResult(result)

// 将类型 Observalbe<T> | Observable<T[]> 弱化为 Observable<T | T[]>
const response$: Observable<T | T[]> = request
const response$: rx.Observable<T | T[]> = request
const cacheKey = this.genCacheKey(tableName, q)
const requestCache = this.requestMap.get(cacheKey)

Expand All @@ -354,9 +353,9 @@ export class Net {
if (!requestCache) {
/*tslint:disable no-shadowed-variable*/
const selector$ = response$.pipe(
concatMap(v => database.upsert(tableName, v)),
tap(() => this.requestMap.set(cacheKey, true)),
concatMap(() => dbGetWithSelfJoinEnabled<T>(database, tableName, q).selector$)
rx.concatMap(v => database.upsert(tableName, v)),
rx.tap(() => this.requestMap.set(cacheKey, true)),
rx.concatMap(() => dbGetWithSelfJoinEnabled<T>(database, tableName, q).selector$)
)
token = new QueryToken(selector$)
} else {
Expand All @@ -366,8 +365,8 @@ export class Net {
break
case CacheStrategy.Cache:
const selector$ = response$.pipe(
concatMap(v => database.upsert(tableName, v)),
concatMap(() => dbGetWithSelfJoinEnabled<T>(database, tableName, q).selector$)
rx.concatMap(v => database.upsert(tableName, v)),
rx.concatMap(() => dbGetWithSelfJoinEnabled<T>(database, tableName, q).selector$)
)
return new QueryToken(selector$)
default:
Expand Down
29 changes: 14 additions & 15 deletions src/Net/Pagination.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { empty, throwError, Observable, Observer, OperatorFunction } from 'rxjs'
import { catchError, finalize, map, mergeAll, startWith, tap } from 'rxjs/operators'
import * as rx from '../rx'

export type PageToken = string & { kind: 'PageToken' }

Expand Down Expand Up @@ -66,29 +65,29 @@ export const accumulateResultByConcat = <T>(state: State<T>, resp: OriginalRespo
}

export const loadAndExpand = <T>(
step: (curr: State<T>) => Observable<OriginalResponse<T>>,
step: (curr: State<T>) => rx.Observable<OriginalResponse<T>>,
initState: State<T>,
loadMore$: Observable<{}> = empty()
): Observable<State<T>> => {
loadMore$: rx.Observable<{}> = rx.empty()
): rx.Observable<State<T>> => {
return loadMore$
.pipe(
startWith({}),
rx.startWith({}),
expand(step, accumulateResultByConcat, initState),
mergeAll()
rx.mergeAll()
)
}

export const expand = <T>(
step: (curr: State<T>) => Observable<OriginalResponse<T>>,
step: (curr: State<T>) => rx.Observable<OriginalResponse<T>>,
accumulator: (state: State<T>, resp: OriginalResponse<T>) => State<T>,
initState: State<T>
): OperatorFunction<{}, Observable<State<T>>> => (
): rx.OperatorFunction<{}, rx.Observable<State<T>>> => (
source$
) => {
const state = { ...initState }
let isLoading = false

return Observable.create((observer: Observer<Observable<State<T>>>) => {
return new rx.Observable((observer) => {
const subs = source$.subscribe({
next: (_) => {
if (!state.hasMore) {
Expand All @@ -99,10 +98,10 @@ export const expand = <T>(
isLoading = true
observer.next(step(state)
.pipe(
map((stepResult) => accumulator(state, stepResult)),
tap((expanded) => Object.assign(state, expanded)),
catchError((err) => throwError(err)),
finalize(() => { isLoading = false })
rx.map((stepResult) => accumulator(state, stepResult)),
rx.tap((expanded) => Object.assign(state, expanded)),
rx.catch((err) => rx.throw(err)),
rx.finalize(() => { isLoading = false })
)
)
}
Expand All @@ -119,5 +118,5 @@ export const expand = <T>(
return () => {
subs.unsubscribe()
}
}) as Observable<Observable<State<T>>>
})
}
Loading

0 comments on commit d433513

Please sign in to comment.