Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 (function(global, binding, v8) { | 5 (function(global, binding, v8) { |
| 6 'use strict'; | 6 'use strict'; |
| 7 | 7 |
| 8 const _reader = v8.createPrivateSymbol('[[reader]]'); | 8 const _reader = v8.createPrivateSymbol('[[reader]]'); |
| 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); | 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
| 10 const _controller = v8.createPrivateSymbol('[[controller]]'); | 10 const _controller = v8.createPrivateSymbol('[[controller]]'); |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 42 const PULLING = 0b100; | 42 const PULLING = 0b100; |
| 43 const PULL_AGAIN = 0b1000; | 43 const PULL_AGAIN = 0b1000; |
| 44 const EXTERNALLY_CONTROLLED = 0b10000; | 44 const EXTERNALLY_CONTROLLED = 0b10000; |
| 45 | 45 |
| 46 const undefined = global.undefined; | 46 const undefined = global.undefined; |
| 47 const Infinity = global.Infinity; | 47 const Infinity = global.Infinity; |
| 48 | 48 |
| 49 const defineProperty = global.Object.defineProperty; | 49 const defineProperty = global.Object.defineProperty; |
| 50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); | 50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| 51 const callFunction = v8.uncurryThis(global.Function.prototype.call); | 51 const callFunction = v8.uncurryThis(global.Function.prototype.call); |
| 52 const applyFunction = v8.uncurryThis(global.Function.prototype.apply); | |
| 52 | 53 |
| 53 const TypeError = global.TypeError; | 54 const TypeError = global.TypeError; |
| 54 const RangeError = global.RangeError; | 55 const RangeError = global.RangeError; |
| 55 | 56 |
| 56 const Number = global.Number; | 57 const Number = global.Number; |
| 57 const Number_isNaN = Number.isNaN; | 58 const Number_isNaN = Number.isNaN; |
| 58 const Number_isFinite = Number.isFinite; | 59 const Number_isFinite = Number.isFinite; |
| 59 | 60 |
| 60 const Promise = global.Promise; | 61 const Promise = global.Promise; |
| 61 const thenPromise = v8.uncurryThis(Promise.prototype.then); | 62 const thenPromise = v8.uncurryThis(Promise.prototype.then); |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 86 'ReadableStreamReader constructor argument is not a readable stream'; | 87 'ReadableStreamReader constructor argument is not a readable stream'; |
| 87 const errReaderConstructorStreamAlreadyLocked = | 88 const errReaderConstructorStreamAlreadyLocked = |
| 88 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; | 89 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; |
| 89 const errReleaseReaderWithPendingRead = | 90 const errReleaseReaderWithPendingRead = |
| 90 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; | 91 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; |
| 91 const errReleasedReaderClosedPromise = | 92 const errReleasedReaderClosedPromise = |
| 92 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; | 93 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; |
| 93 | 94 |
| 94 const errTmplMustBeFunctionOrUndefined = name => | 95 const errTmplMustBeFunctionOrUndefined = name => |
| 95 `${name} must be a function or undefined`; | 96 `${name} must be a function or undefined`; |
| 97 const errCannotPipeLockedStream = 'Cannot pipe a locked stream'; | |
| 98 const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream'; | |
| 99 const errDestinationStreamClosed = 'Destination stream closed'; | |
| 96 | 100 |
| 97 class ReadableStream { | 101 class ReadableStream { |
| 98 constructor() { | 102 constructor() { |
| 99 // TODO(domenic): when V8 gets default parameters and destructuring, all | 103 // TODO(domenic): when V8 gets default parameters and destructuring, all |
| 100 // this can be cleaned up. | 104 // this can be cleaned up. |
| 101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; | 105 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; |
| 102 const strategy = arguments[1] === undefined ? {} : arguments[1]; | 106 const strategy = arguments[1] === undefined ? {} : arguments[1]; |
| 103 const size = strategy.size; | 107 const size = strategy.size; |
| 104 let highWaterMark = strategy.highWaterMark; | 108 let highWaterMark = strategy.highWaterMark; |
| 105 if (highWaterMark === undefined) { | 109 if (highWaterMark === undefined) { |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 174 | 178 |
| 175 tee() { | 179 tee() { |
| 176 if (IsReadableStream(this) === false) { | 180 if (IsReadableStream(this) === false) { |
| 177 throw new TypeError(streamErrors.illegalInvocation); | 181 throw new TypeError(streamErrors.illegalInvocation); |
| 178 } | 182 } |
| 179 | 183 |
| 180 return ReadableStreamTee(this); | 184 return ReadableStreamTee(this); |
| 181 } | 185 } |
| 182 } | 186 } |
| 183 | 187 |
| 188 // TODO(ricea): Move this into the class definition once it ships. | |
| 189 function ReadableStream_prototype_pipeThrough({writable, readable}, options) { | |
| 190 this.pipeTo(writable, options); | |
| 191 return readable; | |
| 192 } | |
| 193 | |
| 194 // TODO(ricea): Move this into the class definition once it ships. | |
| 195 function ReadableStream_prototype_pipeTo( | |
| 196 dest, { preventClose, preventAbort, preventCancel } = {}) { | |
| 197 if (!IsReadableStream(this)) { | |
| 198 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); | |
| 199 } | |
| 200 | |
| 201 if (!binding.IsWritableStream(dest)) { | |
| 202 // TODO(ricea): Think about having a better error message. | |
| 203 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); | |
| 204 } | |
| 205 | |
| 206 preventClose = Boolean(preventClose); | |
| 207 preventAbort = Boolean(preventAbort); | |
| 208 preventCancel = Boolean(preventCancel); | |
| 209 | |
| 210 const readable = this; | |
| 211 if (IsReadableStreamLocked(readable)) { | |
| 212 return Promise_reject(new TypeError(errCannotPipeLockedStream)); | |
| 213 } | |
| 214 | |
| 215 if (binding.IsWritableStreamLocked(dest)) { | |
| 216 return Promise_reject(new TypeError(errCannotPipeToALockedStream)); | |
| 217 } | |
| 218 | |
| 219 const reader = AcquireReadableStreamDefaultReader(readable); | |
| 220 const writer = binding.AcquireWritableStreamDefaultWriter(dest); | |
| 221 let shuttingDown = false; | |
| 222 const promise = v8.createPromise(); | |
| 223 let reading = false; | |
| 224 | |
| 225 if (initialStateOk()) { | |
| 226 // Need to detect closing and error when we are not reading. | |
| 227 thenPromise(reader[_closedPromise], readableClosed, readableError); | |
| 228 // Need to detect error when we are not writing. | |
| 229 thenPromise(binding.getWritableStreamDefaultWriterClosedPromise(writer), | |
| 230 undefined, writableError); | |
| 231 pump(); | |
| 232 } | |
| 233 | |
| 234 function initialStateOk() { | |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
This function name sounds like checking states and
Adam Rice
2017/01/23 13:24:30
Done.
| |
| 235 const state = ReadableStreamGetState(readable); | |
| 236 if (state === STATE_ERRORED) { | |
| 237 readableError(); | |
| 238 return false; | |
| 239 } | |
| 240 | |
| 241 if (binding.isWritableStreamErrored(dest)) { | |
| 242 writableError(); | |
| 243 return false; | |
| 244 } | |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
are these error check blocks for optimization?
Adam Rice
2017/01/23 13:24:30
They are to perform the checks in the correct orde
tyoshino (SeeGerritForStatus)
2017/01/24 09:52:33
Oh, thanks for the pointer. Sorry for being unawar
| |
| 245 | |
| 246 if (state === STATE_CLOSED) { | |
| 247 readableClosed(); | |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
how about calling factoring out the latter half of
Adam Rice
2017/01/23 13:24:30
The first check in readableClosed() didn't belong
| |
| 248 return false; | |
| 249 } | |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
Is this an optimization or to have pipeTo finish s
Adam Rice
2017/01/23 13:24:30
It's purely to perform the checks in the correct o
| |
| 250 | |
| 251 if (binding.isWritableStreamClosingOrClosed(dest)) { | |
| 252 writableStartedClosed(); | |
| 253 return false; | |
| 254 } | |
| 255 | |
| 256 return true; | |
| 257 } | |
| 258 | |
| 259 function pump() { | |
| 260 if (shuttingDown) { | |
| 261 return; | |
| 262 } | |
| 263 const desiredSize = | |
| 264 binding.WritableStreamDefaultWriterGetDesiredSize(writer); | |
| 265 if (desiredSize === null) { | |
| 266 writableError(); | |
| 267 } | |
| 268 if (desiredSize <= 0) { | |
| 269 thenPromise(binding.getWritableStreamDefaultWriterReadyPromise(writer), | |
| 270 pump, writableError); | |
| 271 return; | |
| 272 } | |
| 273 reading = true; | |
| 274 // TODO(ricea): Delay reads heuristically when desiredSize is low. | |
| 275 thenPromise(ReadableStreamDefaultReaderRead(reader), readFulfilled, | |
| 276 readRejected); | |
| 277 } | |
| 278 | |
| 279 function readFulfilled({value, done}) { | |
| 280 reading = false; | |
| 281 if (shuttingDown) { | |
| 282 return; | |
| 283 } | |
| 284 if (done) { | |
| 285 readableClosed(); | |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
how about calling factoring out the latter half of
Adam Rice
2017/01/23 13:24:30
I removed the first half instead, to make the oper
| |
| 286 return; | |
| 287 } | |
| 288 const write = binding.WritableStreamDefaultWriterWrite(writer, value); | |
| 289 thenPromise(write, undefined, writableError); | |
| 290 pump(); | |
| 291 } | |
| 292 | |
| 293 function readRejected() { | |
| 294 reading = false; | |
| 295 readableError(); | |
| 296 } | |
| 297 | |
| 298 function readableError() { | |
| 299 if (!preventAbort) { | |
| 300 shutdownWithAction(binding.WritableStreamAbort, | |
| 301 [dest, readable[_storedError]], | |
| 302 readable[_storedError], true); | |
| 303 } else { | |
| 304 shutdown(readable[_storedError], true); | |
| 305 } | |
| 306 } | |
| 307 | |
| 308 function writableError() { | |
| 309 const storedError = binding.getWritableStreamStoredError(dest); | |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
can't we use the rejection value? or intentionally
Adam Rice
2017/01/23 13:24:30
I'm not sure about this, because now we're looking
| |
| 310 if (!preventCancel) { | |
| 311 shutdownWithAction(ReadableStreamCancel, [readable, storedError], | |
| 312 storedError, true); | |
| 313 } else { | |
| 314 shutdown(storedError, true); | |
| 315 } | |
| 316 } | |
| 317 | |
| 318 function readableClosed() { | |
| 319 if (reading) { | |
| 320 // Handle the close status from the read() method rather than the | |
| 321 // [[closedPromise]]. | |
| 322 return; | |
| 323 } | |
| 324 if (!preventClose) { | |
| 325 shutdownWithAction( | |
| 326 binding.WritableStreamDefaultWriterCloseWithErrorPropagation, | |
| 327 [writer]); | |
| 328 } else { | |
| 329 shutdown(); | |
| 330 } | |
| 331 } | |
| 332 | |
| 333 function writableStartedClosed() { | |
| 334 const destClosed = new TypeError(errDestinationStreamClosed); | |
| 335 if (!preventCancel) { | |
| 336 shutdownWithAction(ReadableStreamCancel, [readable, destClosed], | |
| 337 destClosed, true); | |
| 338 } else { | |
| 339 shutdown(destClosed, true); | |
| 340 } | |
| 341 } | |
| 342 | |
| 343 function shutdownWithAction(action, args, originalError = undefined, | |
| 344 errorGiven = false) { | |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
default argument is used for better correspondence
Adam Rice
2017/01/23 13:24:30
Exactly. This worked better before Domenic pointed
| |
| 345 if (shuttingDown) { | |
| 346 return; | |
| 347 } | |
| 348 shuttingDown = true; | |
| 349 const p = applyFunction(action, undefined, args); | |
| 350 thenPromise(p, | |
| 351 () => finalize(originalError, errorGiven), | |
| 352 newError => finalize(newError, true)); | |
| 353 } | |
|
tyoshino (SeeGerritForStatus)
2017/01/23 10:42:57
ah, we need to update the reference implementation
Adam Rice
2017/01/23 13:24:30
Yes. I discussed this with Domenic but it looks li
| |
| 354 | |
| 355 function shutdown(error = undefined, errorGiven = false) { | |
| 356 if (shuttingDown) { | |
| 357 return; | |
| 358 } | |
| 359 shuttingDown = true; | |
| 360 finalize(error, errorGiven); | |
| 361 } | |
| 362 | |
| 363 function finalize(error, errorGiven) { | |
| 364 binding.WritableStreamDefaultWriterRelease(writer); | |
| 365 ReadableStreamReaderGenericRelease(reader); | |
| 366 if (errorGiven) { | |
| 367 v8.rejectPromise(promise, error); | |
| 368 } else { | |
| 369 v8.resolvePromise(promise, undefined); | |
| 370 } | |
| 371 } | |
| 372 | |
| 373 return promise; | |
| 374 } | |
| 375 | |
| 184 class ReadableStreamDefaultController { | 376 class ReadableStreamDefaultController { |
| 185 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) { | 377 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) { |
| 186 if (IsReadableStream(stream) === false) { | 378 if (IsReadableStream(stream) === false) { |
| 187 throw new TypeError(streamErrors.illegalConstructor); | 379 throw new TypeError(streamErrors.illegalConstructor); |
| 188 } | 380 } |
| 189 | 381 |
| 190 if (stream[_controller] !== undefined) { | 382 if (stream[_controller] !== undefined) { |
| 191 throw new TypeError(streamErrors.illegalConstructor); | 383 throw new TypeError(streamErrors.illegalConstructor); |
| 192 } | 384 } |
| 193 | 385 |
| (...skipping 710 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 904 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close; | 1096 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close; |
| 905 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; | 1097 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; |
| 906 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; | 1098 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; |
| 907 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; | 1099 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; |
| 908 | 1100 |
| 909 binding.createReadableStreamWithExternalController = | 1101 binding.createReadableStreamWithExternalController = |
| 910 (underlyingSource, strategy) => { | 1102 (underlyingSource, strategy) => { |
| 911 return new ReadableStream( | 1103 return new ReadableStream( |
| 912 underlyingSource, strategy, createWithExternalControllerSentinel); | 1104 underlyingSource, strategy, createWithExternalControllerSentinel); |
| 913 }; | 1105 }; |
| 1106 | |
| 1107 // Temporary exports while pipeTo() and pipeThrough() are behind flags | |
| 1108 binding.ReadableStream_prototype_pipeThrough = | |
| 1109 ReadableStream_prototype_pipeThrough; | |
| 1110 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo; | |
| 914 }); | 1111 }); |
| OLD | NEW |