OLD | NEW |
(Empty) | |
| 1 'use strict'; |
| 2 (function () { |
| 3 |
| 4 class RandomPushSource { |
| 5 constructor(toPush) { |
| 6 this.pushed = 0; |
| 7 this.toPush = toPush; |
| 8 this.started = false; |
| 9 this.paused = false; |
| 10 this.closed = false; |
| 11 |
| 12 this._intervalHandle = null; |
| 13 } |
| 14 |
| 15 readStart() { |
| 16 if (this.closed) { |
| 17 return; |
| 18 } |
| 19 |
| 20 if (!this.started) { |
| 21 this._intervalHandle = setInterval(writeChunk, 2); |
| 22 this.started = true; |
| 23 } |
| 24 |
| 25 if (this.paused) { |
| 26 this._intervalHandle = setInterval(writeChunk, 2); |
| 27 this.paused = false; |
| 28 } |
| 29 |
| 30 const source = this; |
| 31 function writeChunk() { |
| 32 if (source.paused) { |
| 33 return; |
| 34 } |
| 35 |
| 36 source.pushed++; |
| 37 |
| 38 if (source.toPush > 0 && source.pushed > source.toPush) { |
| 39 if (source._intervalHandle) { |
| 40 clearInterval(source._intervalHandle); |
| 41 source._intervalHandle = undefined; |
| 42 } |
| 43 source.closed = true; |
| 44 source.onend(); |
| 45 } else { |
| 46 source.ondata(randomChunk(128)); |
| 47 } |
| 48 } |
| 49 } |
| 50 |
| 51 readStop() { |
| 52 if (this.paused) { |
| 53 return; |
| 54 } |
| 55 |
| 56 if (this.started) { |
| 57 this.paused = true; |
| 58 clearInterval(this._intervalHandle); |
| 59 this._intervalHandle = undefined; |
| 60 } else { |
| 61 throw new Error('Can\'t pause reading an unstarted source.'); |
| 62 } |
| 63 } |
| 64 } |
| 65 |
| 66 function randomChunk(size) { |
| 67 let chunk = ''; |
| 68 |
| 69 for (let i = 0; i < size; ++i) { |
| 70 // Add a random character from the basic printable ASCII set. |
| 71 chunk += String.fromCharCode(Math.round(Math.random() * 84) + 32); |
| 72 } |
| 73 |
| 74 return chunk; |
| 75 } |
| 76 |
| 77 function readableStreamToArray(readable, reader) { |
| 78 if (reader === undefined) { |
| 79 reader = readable.getReader(); |
| 80 } |
| 81 |
| 82 const chunks = []; |
| 83 |
| 84 return pump(); |
| 85 |
| 86 function pump() { |
| 87 return reader.read().then(result => { |
| 88 if (result.done) { |
| 89 return chunks; |
| 90 } |
| 91 |
| 92 chunks.push(result.value); |
| 93 return pump(); |
| 94 }); |
| 95 } |
| 96 } |
| 97 |
| 98 class SequentialPullSource { |
| 99 constructor(limit, options) { |
| 100 const async = options && options.async; |
| 101 |
| 102 this.current = 0; |
| 103 this.limit = limit; |
| 104 this.opened = false; |
| 105 this.closed = false; |
| 106 |
| 107 this._exec = f => f(); |
| 108 if (async) { |
| 109 this._exec = f => setTimeout(f, 0); |
| 110 } |
| 111 } |
| 112 |
| 113 open(cb) { |
| 114 this._exec(() => { |
| 115 this.opened = true; |
| 116 cb(); |
| 117 }); |
| 118 } |
| 119 |
| 120 read(cb) { |
| 121 this._exec(() => { |
| 122 if (++this.current <= this.limit) { |
| 123 cb(null, false, this.current); |
| 124 } else { |
| 125 cb(null, true, null); |
| 126 } |
| 127 }); |
| 128 } |
| 129 |
| 130 close(cb) { |
| 131 this._exec(() => { |
| 132 this.closed = true; |
| 133 cb(); |
| 134 }); |
| 135 } |
| 136 } |
| 137 |
| 138 function sequentialReadableStream(limit, options) { |
| 139 const sequentialSource = new SequentialPullSource(limit, options); |
| 140 |
| 141 const stream = new ReadableStream({ |
| 142 start() { |
| 143 return new Promise((resolve, reject) => { |
| 144 sequentialSource.open(err => { |
| 145 if (err) { |
| 146 reject(err); |
| 147 } |
| 148 resolve(); |
| 149 }); |
| 150 }); |
| 151 }, |
| 152 |
| 153 pull(c) { |
| 154 return new Promise((resolve, reject) => { |
| 155 sequentialSource.read((err, done, chunk) => { |
| 156 if (err) { |
| 157 reject(err); |
| 158 } else if (done) { |
| 159 sequentialSource.close(err2 => { |
| 160 if (err2) { |
| 161 reject(err2); |
| 162 } |
| 163 c.close(); |
| 164 resolve(); |
| 165 }); |
| 166 } else { |
| 167 c.enqueue(chunk); |
| 168 resolve(); |
| 169 } |
| 170 }); |
| 171 }); |
| 172 } |
| 173 }); |
| 174 |
| 175 stream.source = sequentialSource; |
| 176 |
| 177 return stream; |
| 178 } |
| 179 |
| 180 |
| 181 self.RandomPushSource = RandomPushSource; |
| 182 self.readableStreamToArray = readableStreamToArray; |
| 183 self.sequentialReadableStream = sequentialReadableStream; |
| 184 |
| 185 }()); |
OLD | NEW |