OLD | NEW |
| (Empty) |
1 'use strict'; | |
2 (function () { | |
3 | |
4 class RandomPushSource { | |
5 constructor(toPush) { | |
6 this.pushed = 0; | |
7 this.toPush = toPush; | |
8 this.started = false; | |
9 this.paused = false; | |
10 this.closed = false; | |
11 | |
12 this._intervalHandle = null; | |
13 } | |
14 | |
15 readStart() { | |
16 if (this.closed) { | |
17 return; | |
18 } | |
19 | |
20 if (!this.started) { | |
21 this._intervalHandle = setInterval(writeChunk, 2); | |
22 this.started = true; | |
23 } | |
24 | |
25 if (this.paused) { | |
26 this._intervalHandle = setInterval(writeChunk, 2); | |
27 this.paused = false; | |
28 } | |
29 | |
30 const source = this; | |
31 function writeChunk() { | |
32 if (source.paused) { | |
33 return; | |
34 } | |
35 | |
36 source.pushed++; | |
37 | |
38 if (source.toPush > 0 && source.pushed > source.toPush) { | |
39 if (source._intervalHandle) { | |
40 clearInterval(source._intervalHandle); | |
41 source._intervalHandle = undefined; | |
42 } | |
43 source.closed = true; | |
44 source.onend(); | |
45 } else { | |
46 source.ondata(randomChunk(128)); | |
47 } | |
48 } | |
49 } | |
50 | |
51 readStop() { | |
52 if (this.paused) { | |
53 return; | |
54 } | |
55 | |
56 if (this.started) { | |
57 this.paused = true; | |
58 clearInterval(this._intervalHandle); | |
59 this._intervalHandle = undefined; | |
60 } else { | |
61 throw new Error('Can\'t pause reading an unstarted source.'); | |
62 } | |
63 } | |
64 } | |
65 | |
66 function randomChunk(size) { | |
67 let chunk = ''; | |
68 | |
69 for (let i = 0; i < size; ++i) { | |
70 // Add a random character from the basic printable ASCII set. | |
71 chunk += String.fromCharCode(Math.round(Math.random() * 84) + 32); | |
72 } | |
73 | |
74 return chunk; | |
75 } | |
76 | |
77 function readableStreamToArray(readable, reader) { | |
78 if (reader === undefined) { | |
79 reader = readable.getReader(); | |
80 } | |
81 | |
82 const chunks = []; | |
83 | |
84 return pump(); | |
85 | |
86 function pump() { | |
87 return reader.read().then(result => { | |
88 if (result.done) { | |
89 return chunks; | |
90 } | |
91 | |
92 chunks.push(result.value); | |
93 return pump(); | |
94 }); | |
95 } | |
96 } | |
97 | |
98 class SequentialPullSource { | |
99 constructor(limit, options) { | |
100 const async = options && options.async; | |
101 | |
102 this.current = 0; | |
103 this.limit = limit; | |
104 this.opened = false; | |
105 this.closed = false; | |
106 | |
107 this._exec = f => f(); | |
108 if (async) { | |
109 this._exec = f => setTimeout(f, 0); | |
110 } | |
111 } | |
112 | |
113 open(cb) { | |
114 this._exec(() => { | |
115 this.opened = true; | |
116 cb(); | |
117 }); | |
118 } | |
119 | |
120 read(cb) { | |
121 this._exec(() => { | |
122 if (++this.current <= this.limit) { | |
123 cb(null, false, this.current); | |
124 } else { | |
125 cb(null, true, null); | |
126 } | |
127 }); | |
128 } | |
129 | |
130 close(cb) { | |
131 this._exec(() => { | |
132 this.closed = true; | |
133 cb(); | |
134 }); | |
135 } | |
136 } | |
137 | |
138 function sequentialReadableStream(limit, options) { | |
139 const sequentialSource = new SequentialPullSource(limit, options); | |
140 | |
141 const stream = new ReadableStream({ | |
142 start() { | |
143 return new Promise((resolve, reject) => { | |
144 sequentialSource.open(err => { | |
145 if (err) { | |
146 reject(err); | |
147 } | |
148 resolve(); | |
149 }); | |
150 }); | |
151 }, | |
152 | |
153 pull(c) { | |
154 return new Promise((resolve, reject) => { | |
155 sequentialSource.read((err, done, chunk) => { | |
156 if (err) { | |
157 reject(err); | |
158 } else if (done) { | |
159 sequentialSource.close(err2 => { | |
160 if (err2) { | |
161 reject(err2); | |
162 } | |
163 c.close(); | |
164 resolve(); | |
165 }); | |
166 } else { | |
167 c.enqueue(chunk); | |
168 resolve(); | |
169 } | |
170 }); | |
171 }); | |
172 } | |
173 }); | |
174 | |
175 stream.source = sequentialSource; | |
176 | |
177 return stream; | |
178 } | |
179 | |
180 | |
181 self.RandomPushSource = RandomPushSource; | |
182 self.readableStreamToArray = readableStreamToArray; | |
183 self.sequentialReadableStream = sequentialReadableStream; | |
184 | |
185 }()); | |
OLD | NEW |