-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
92 lines (74 loc) · 2.33 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
var duplexify = require('duplexify')
var pumpify = require('pumpify')
var through = require('through2')
var select = require('html-select')
var tokenize = require('html-tokenize')
var jsonComposeStream = require('json-compose-stream')
var concat = require('concat-stream')
var streamsEnd = require('./streams-end')
module.exports = function (schema) {
var writeStream = createSelectStream(schema)
var jsonStream = jsonComposeStream({end: false})
var ender = streamsEnd()
var cache = {}
var dup = duplexify.obj(writeStream, jsonStream)
return dup
function createSelectStream (schema) {
var s = select()
var stream = pumpify.obj(tokenize(), s, through.obj(skipThrough, onFlush))
Object.keys(schema).forEach(function (key) {
var keySchema = schema[key]
if (typeof keySchema === 'string') keySchema = {selector: keySchema}
if (Array.isArray(keySchema)) {
keySchema = {
selector: keySchema[0].selector || keySchema[0],
attribute: keySchema[0].attribute,
isArray: true
}
}
if (!keySchema.selector) throw new Error('Selector must be provided')
s.select(keySchema.selector, function (el) {
if (keySchema.attribute) {
var value = el.getAttribute(keySchema.attribute)
if (keySchema.isArray) {
return addToCache(key, value)
} else {
return jsonStream.set(key, value)
}
}
var tr = through.obj(function (row, enc, callback) {
if (row[0] === 'text') this.push(row[1].toString())
callback()
})
var textStream = pumpify(el.createReadStream(), tr)
if (keySchema.isArray) {
textStream.pipe(concat(function (data) {
addToCache(key, data.toString())
}))
} else {
textStream.pipe(jsonStream.createSetStream(key))
}
ender.push(textStream)
})
})
return stream
}
function onFlush (flush) {
if (Object.keys(cache).length) jsonStream.set(cache)
if (ender.ended()) {
jsonStream.end()
flush()
} else {
ender.setCallback(function () {
jsonStream.end()
flush()
})
}
}
function addToCache (key, value) {
(cache[key] = cache[key] || []).push(value)
}
}
function skipThrough (chunk, enc, callback) {
callback()
}