Skip to content

Commit

Permalink
change scrapeStep to use a class
Browse files Browse the repository at this point in the history
  • Loading branch information
andykais committed Jan 19, 2019
1 parent f0784a3 commit 6a180ed
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 106 deletions.
6 changes: 3 additions & 3 deletions src/scraper/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { initTools } from '../tools'
import { scraperStep } from './scrape-step'
import { ScrapeStep } from './scrape-step'
import { mkdirp, rmrf } from '../util/fs'
import { normalizeConfig } from '../settings/config'
import { normalizeOptions } from '../settings/options'
Expand Down Expand Up @@ -27,8 +27,8 @@ export const scrape = async (
await initFolders(config, optionsInit, flatOptions)
const tools = initTools(config, optionsInit, flatOptions)
// create the observable
const scrapingScheme = scraperStep(config.scrape)(flatOptions, tools)
const scrapingObservable = scrapingScheme([{ parsedValue: '' }])
const scrapingScheme = new ScrapeStep(config.scrape, flatOptions, tools)
const scrapingObservable = scrapingScheme.run([{ parsedValue: '' }])
// start running the observable
const { emitter, queue, logger, store } = tools
const subscription = scrapingObservable.subscribe({
Expand Down
2 changes: 2 additions & 0 deletions src/scraper/scrape-step/downloader/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ export const downloaderClassFactory = (
if (config.download) return new HttpDownloader(config, options, tools)
else return new IdentityDownloader(config, options, tools)
}

export type DownloaderClass = IdentityDownloader | HttpDownloader
52 changes: 25 additions & 27 deletions src/scraper/scrape-step/incrementer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as ops from 'rxjs/operators'
import { ScrapeConfig } from '../../../settings/config/types'
import { ParsedValue } from '../'
import { whileLoopObservable } from '../../../util/rxjs/observables/while-loop'
import { ScrapeStep, IdentityScrapeStep } from '../'

type OkToIncrementWhileLoop = (
parsedValues: ParsedValue[],
Expand Down Expand Up @@ -37,7 +38,11 @@ type StatefulVars = {
nextPromises: Promise<{}>[]
}

const incrementer = ({ incrementUntil }: ScrapeConfig) => {
const incrementer = (
{ incrementUntil }: ScrapeConfig,
asyncFunction: DownloadParseFunction,
scrapeNextChild: ScrapeStep | IdentityScrapeStep
) => {
const okToIncrementWhileLoop: OkToIncrementWhileLoop =
incrementUntil === 'empty-parse'
? incrementUntilEmptyParse
Expand All @@ -48,36 +53,29 @@ const incrementer = ({ incrementUntil }: ScrapeConfig) => {
const ignoreFetchError =
incrementUntil === 'failed-download' ? catchFetchError : throwAnyError

return (
asyncFunction: DownloadParseFunction,
scrapeNextChild: (
parsedValues: ParsedValue[]
) => Rx.Observable<ParsedValue[]>
) => {
return (parsedValueWithId: ParsedValue): Rx.Observable<ParsedValue[]> =>
whileLoopObservable(
asyncFunction,
okToIncrementWhileLoop,
parsedValueWithId
).pipe(
ops.catchError(ignoreFetchError),
ops.flatMap((parsedValues, incrementIndex) =>
Rx.of(parsedValues).pipe(
ops.expand(parsedValues =>
scrapeNextChild(parsedValues).pipe(
ops.flatMap(parsedValues =>
parsedValues.map(parsedValue =>
asyncFunction(parsedValue, incrementIndex)
)
),
ops.mergeAll(),
ops.takeWhile(incrementUntilEmptyParse)
)
return (parsedValueWithId: ParsedValue): Rx.Observable<ParsedValue[]> =>
whileLoopObservable(
asyncFunction,
okToIncrementWhileLoop,
parsedValueWithId
).pipe(
ops.catchError(ignoreFetchError),
ops.flatMap((parsedValues, incrementIndex) =>
Rx.of(parsedValues).pipe(
ops.expand(parsedValues =>
scrapeNextChild.run(parsedValues).pipe(
ops.flatMap(parsedValues =>
parsedValues.map(parsedValue =>
asyncFunction(parsedValue, incrementIndex)
)
),
ops.mergeAll(),
ops.takeWhile(incrementUntilEmptyParse)
)
)
)
)
}
)
}

export { incrementer }
181 changes: 105 additions & 76 deletions src/scraper/scrape-step/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,102 +4,131 @@ import VError from 'verror'
import { downloaderClassFactory } from './downloader'
import { parserClassFactory } from './parser'
import { incrementer } from './incrementer'
import { wrapError } from '../../util/error'
// type imports
import { ScrapeConfig } from '../../settings/config/types'
import { ScraperName, ScrapeConfig } from '../../settings/config/types'
import { FlatOptions } from '../../settings/options/types'
import { Tools } from '../../tools'
import { SelectedRow as ParsedValueWithId } from '../../tools/store/queries/select-parsed-values'
import { DownloadParseFunction } from './incrementer'
import { DownloaderClass } from './downloader'
import { ParserClass } from './parser'

type InputValue = {
parsedValue: string
id?: number // nonexistent
}
export type ParsedValue = InputValue | ParsedValueWithId

// init setup
const scraperStep = (config: ScrapeConfig) => {
const getIncrementObservable = incrementer(config)
const childrenSetup = config.scrapeEach.map(scrapeConfig =>
scraperStep(scrapeConfig)
)
abstract class AbstractScrapeStep {
public abstract run: (
parsedValues: ParsedValue[]
) => Rx.Observable<ParsedValue[]>
}
class IdentityScrapeStep extends AbstractScrapeStep {
public run: typeof AbstractScrapeStep.prototype.run = () => Rx.empty()
}
class ScrapeStep extends AbstractScrapeStep {
private scraperName: ScraperName
private config: ScrapeConfig
private flatOptions: FlatOptions
private tools: Tools
private scraperLogger: ReturnType<Tools['logger']['scraper']>

// run setup
return (flatOptions: FlatOptions, tools: Tools) => {
const options = flatOptions.get(config.name)!
const downloader = downloaderClassFactory(config, options, tools)
const parser = parserClassFactory(config, options, tools)
private downloader: DownloaderClass
private parser: ParserClass
private incrementObservableFunction: ReturnType<typeof incrementer>
private children: ScrapeStep[]

const { store, emitter, logger } = tools
const scraperLogger = logger.scraper(config.name)!
const children = childrenSetup.map(child => child(flatOptions, tools))
const scrapeNextChild = config.scrapeNext
? scraperStep(config.scrapeNext)(flatOptions, tools)
: () => Rx.empty()
public constructor(
config: ScrapeConfig,
flatOptions: FlatOptions,
tools: Tools
) {
super()
this.scraperName = config.name
this.config = config
this.flatOptions = flatOptions
this.tools = tools

// const getIncrementObservable = incrementer(config)
this.children = config.scrapeEach.map(
scrapeConfig => new ScrapeStep(scrapeConfig, flatOptions, tools)
)
const scraperOptions = flatOptions.get(this.scraperName)!
this.downloader = downloaderClassFactory(config, scraperOptions, tools)
this.parser = parserClassFactory(config, scraperOptions, tools)

const downloadParseFunction: DownloadParseFunction = async (
{ parsedValue: value, id: parentId },
incrementIndex
) => {
const { id: downloadId } = store.qs.selectCompletedDownload({
incrementIndex,
parentId,
scraper: config.name
})
if (downloadId) {
const parsedValuesWithId = store.qs.selectParsedValues(downloadId)
scraperLogger.info(
{ parsedValuesWithId, downloadId },
'loaded cached values'
)
return parsedValuesWithId
} else {
const { downloadValue, downloadId, filename } = await downloader.run({
this.scraperLogger = tools.logger.scraper(this.scraperName)!
const scrapeNextChild = config.scrapeNext
? new ScrapeStep(config.scrapeNext, flatOptions, tools)
: new IdentityScrapeStep()
this.incrementObservableFunction = incrementer(
config,
this.downloadParseFunction,
scrapeNextChild
)
}
private downloadParseFunction: DownloadParseFunction = async (
{ parsedValue: value, id: parentId },
incrementIndex
) => {
const { store, emitter } = this.tools
const { id: downloadId } = store.qs.selectCompletedDownload({
incrementIndex,
parentId,
scraper: this.scraperName
})
if (downloadId) {
const parsedValuesWithId = store.qs.selectParsedValues(downloadId)
this.scraperLogger.info(
{ parsedValuesWithId, downloadId },
'loaded cached values'
)
return parsedValuesWithId
} else {
const { downloadValue, downloadId, filename } = await this.downloader.run(
{
incrementIndex,
parentId,
value
})
const parsedValues = parser.run(downloadValue)

store.transaction(() => {
store.qs.updateDownloadToComplete({ downloadId, filename })
store.qs.insertBatchParsedValues({
name: config.name,
parentId,
downloadId,
parsedValues
})
emitter.scraper(config.name).emit.completed(downloadId)
})()
const parsedValuesWithId = store.qs.selectParsedValues(downloadId)
}
)
const parsedValues = this.parser.run(downloadValue)

scraperLogger.info(
{ parsedValuesWithId, downloadId },
'inserted new values'
)
return parsedValuesWithId
}
}
store.transaction(() => {
store.qs.updateDownloadToComplete({ downloadId, filename })
store.qs.insertBatchParsedValues({
name: this.scraperName,
parentId,
downloadId,
parsedValues
})
emitter.scraper(this.scraperName).emit.completed(downloadId)
})()
const parsedValuesWithId = store.qs.selectParsedValues(downloadId)

// called per each value
return (parentValues: ParsedValue[]): Rx.Observable<ParsedValue[]> =>
Rx.from(parentValues).pipe(
ops.flatMap(
getIncrementObservable(downloadParseFunction, scrapeNextChild)
),
ops.catchError(e =>
Rx.throwError(
new VError({ name: e.name, cause: e }, `scraper '${config.name}'`)
)
),
ops.flatMap(
parsedValues =>
children.length
? children.map(child => child(parsedValues))
: [Rx.of(parsedValues)]
),
ops.mergeAll()
this.scraperLogger.info(
{ parsedValuesWithId, downloadId },
'inserted new values'
)
return parsedValuesWithId
}
}
public run: typeof AbstractScrapeStep.prototype.run = (
parentValues: ParsedValue[]
): Rx.Observable<ParsedValue[]> =>
Rx.from(parentValues).pipe(
ops.flatMap(this.incrementObservableFunction),
ops.catchError(wrapError(`scraper '${this.scraperName}'`)),
ops.flatMap(
parsedValues =>
this.children.length
? this.children.map(child => child.run(parsedValues))
: [Rx.of(parsedValues)]
),
ops.mergeAll()
)
}
export { scraperStep }

export { ScrapeStep, IdentityScrapeStep }
1 change: 1 addition & 0 deletions src/scraper/scrape-step/parser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ export const parserClassFactory = (
return new IdentityParser(config, options, tools)
}
}
export type ParserClass = HtmlParser | JsonParser | IdentityParser
5 changes: 5 additions & 0 deletions src/util/error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import * as Rx from 'rxjs'
import VError from 'verror'

export const wrapError = (message: any) => (e: Error) =>
Rx.throwError(new VError({ name: e.name, cause: e }, message))

0 comments on commit 6a180ed

Please sign in to comment.