-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
143 lines (110 loc) · 2.52 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
const {PassThrough, Writable} = require('stream')
function findItem(item)
{
return item === this
}
module.exports = class Dispatcher extends Writable
{
constructor({inputOptions, writers = [], ...options} = {})
{
super({...options, objectMode: true})
const inFlight = new Set
const _writers = []
const uncork = process.nextTick.bind(process, this.uncork.bind(this))
const input = new PassThrough({...inputOptions, objectMode: true})
.on('data', (data) =>
{
// Remove writer from list of writers accepting more data
const writer = _writers.shift()
inFlight.add(writer)
// After writting data, the writer can accept more data, so we push it
// back at the end of the list of writers
if(writer.write(data))
_writers.push(writer)
// Writer don't accept more data, push it back when it emits the `drain`
// event
else
{
writer.once('drain', this._add)
this._writerRemoved()
}
})
.on('end', function()
{
for(const writer of _writers) writer.end()
})
.on('drain', uncork)
.pause()
function onFinish()
{
if(!inFlight.size) input.end()
}
this
.on('finish', onFinish)
.cork()
this._input = input
this._writers = _writers
/**
* @this writer
*/
this._add = function()
{
_writers.push(this)
input.resume()
uncork()
}
const {_writableState} = this
/**
* @this writer
*/
this._allLanded = function()
{
inFlight.delete(this)
if(_writableState.finished) onFinish()
}
for(const writer of writers) this.pipe(writer)
}
pipe(writer)
{
this._add.call(writer)
writer
.on('allLanded', this._allLanded)
.emit('pipe', this)
return writer
}
unpipe(writer)
{
const {_writers} = this
const index = _writers.findIndex(findItem, writer)
if(index != null)
_writers.splice(index, 1)
else
writer.removeListener('drain', this._add)
this._writerRemoved()
writer
.removeListener('allLanded', this._allLanded)
.emit('unpipe', this)
return this
}
unshift(chunk)
{
this._input.unshift(chunk)
}
_write(chunk, _, callback)
{
if(!this._input.write(chunk)) this.cork()
callback()
}
/**
* There are no writers accepting more data, pause the input buffer
*/
_writerRemoved()
{
const {_input, _writers} = this
if(!_writers.length)
{
_input.pause()
this.cork()
}
}
}