| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 (function(global, binding, v8) { | 5 (function(global, binding, v8) { |
| 6 'use strict'; | 6 'use strict'; |
| 7 | 7 |
| 8 const _reader = v8.createPrivateSymbol('[[reader]]'); | 8 const _reader = v8.createPrivateSymbol('[[reader]]'); |
| 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); | 9 const _storedError = v8.createPrivateSymbol('[[storedError]]'); |
| 10 const _controller = v8.createPrivateSymbol('[[controller]]'); | 10 const _controller = v8.createPrivateSymbol('[[controller]]'); |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 42 const PULLING = 0b100; | 42 const PULLING = 0b100; |
| 43 const PULL_AGAIN = 0b1000; | 43 const PULL_AGAIN = 0b1000; |
| 44 const EXTERNALLY_CONTROLLED = 0b10000; | 44 const EXTERNALLY_CONTROLLED = 0b10000; |
| 45 | 45 |
| 46 const undefined = global.undefined; | 46 const undefined = global.undefined; |
| 47 const Infinity = global.Infinity; | 47 const Infinity = global.Infinity; |
| 48 | 48 |
| 49 const defineProperty = global.Object.defineProperty; | 49 const defineProperty = global.Object.defineProperty; |
| 50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); | 50 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| 51 const callFunction = v8.uncurryThis(global.Function.prototype.call); | 51 const callFunction = v8.uncurryThis(global.Function.prototype.call); |
| 52 const applyFunction = v8.uncurryThis(global.Function.prototype.apply); |
| 52 | 53 |
| 53 const TypeError = global.TypeError; | 54 const TypeError = global.TypeError; |
| 54 const RangeError = global.RangeError; | 55 const RangeError = global.RangeError; |
| 55 | 56 |
| 56 const Number = global.Number; | 57 const Number = global.Number; |
| 57 const Number_isNaN = Number.isNaN; | 58 const Number_isNaN = Number.isNaN; |
| 58 const Number_isFinite = Number.isFinite; | 59 const Number_isFinite = Number.isFinite; |
| 59 | 60 |
| 60 const Promise = global.Promise; | 61 const Promise = global.Promise; |
| 61 const thenPromise = v8.uncurryThis(Promise.prototype.then); | 62 const thenPromise = v8.uncurryThis(Promise.prototype.then); |
| (...skipping 24 matching lines...) Expand all Loading... |
| 86 'ReadableStreamReader constructor argument is not a readable stream'; | 87 'ReadableStreamReader constructor argument is not a readable stream'; |
| 87 const errReaderConstructorStreamAlreadyLocked = | 88 const errReaderConstructorStreamAlreadyLocked = |
| 88 'ReadableStreamReader constructor can only accept readable streams that ar
e not yet locked to a reader'; | 89 'ReadableStreamReader constructor can only accept readable streams that ar
e not yet locked to a reader'; |
| 89 const errReleaseReaderWithPendingRead = | 90 const errReleaseReaderWithPendingRead = |
| 90 'Cannot release a readable stream reader when it still has outstanding rea
d() calls that have not yet settled'; | 91 'Cannot release a readable stream reader when it still has outstanding rea
d() calls that have not yet settled'; |
| 91 const errReleasedReaderClosedPromise = | 92 const errReleasedReaderClosedPromise = |
| 92 'This readable stream reader has been released and cannot be used to monit
or the stream\'s state'; | 93 'This readable stream reader has been released and cannot be used to monit
or the stream\'s state'; |
| 93 | 94 |
| 94 const errTmplMustBeFunctionOrUndefined = name => | 95 const errTmplMustBeFunctionOrUndefined = name => |
| 95 `${name} must be a function or undefined`; | 96 `${name} must be a function or undefined`; |
| 97 const errCannotPipeLockedStream = 'Cannot pipe a locked stream'; |
| 98 const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream'; |
| 99 const errDestinationStreamClosed = 'Destination stream closed'; |
| 96 | 100 |
| 97 class ReadableStream { | 101 class ReadableStream { |
| 98 constructor() { | 102 constructor() { |
| 99 // TODO(domenic): when V8 gets default parameters and destructuring, all | 103 // TODO(domenic): when V8 gets default parameters and destructuring, all |
| 100 // this can be cleaned up. | 104 // this can be cleaned up. |
| 101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; | 105 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; |
| 102 const strategy = arguments[1] === undefined ? {} : arguments[1]; | 106 const strategy = arguments[1] === undefined ? {} : arguments[1]; |
| 103 const size = strategy.size; | 107 const size = strategy.size; |
| 104 let highWaterMark = strategy.highWaterMark; | 108 let highWaterMark = strategy.highWaterMark; |
| 105 if (highWaterMark === undefined) { | 109 if (highWaterMark === undefined) { |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 174 | 178 |
| 175 tee() { | 179 tee() { |
| 176 if (IsReadableStream(this) === false) { | 180 if (IsReadableStream(this) === false) { |
| 177 throw new TypeError(streamErrors.illegalInvocation); | 181 throw new TypeError(streamErrors.illegalInvocation); |
| 178 } | 182 } |
| 179 | 183 |
| 180 return ReadableStreamTee(this); | 184 return ReadableStreamTee(this); |
| 181 } | 185 } |
| 182 } | 186 } |
| 183 | 187 |
| 188 // TODO(ricea): Move this into the class definition once it ships. |
| 189 function ReadableStream_prototype_pipeThrough({writable, readable}, options) { |
| 190 this.pipeTo(writable, options); |
| 191 return readable; |
| 192 } |
| 193 |
| 194 // TODO(ricea): Move this into the class definition once it ships. |
| 195 function ReadableStream_prototype_pipeTo( |
| 196 dest, {preventClose, preventAbort, preventCancel} = {}) { |
| 197 if (!IsReadableStream(this)) { |
| 198 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
| 199 } |
| 200 |
| 201 if (!binding.IsWritableStream(dest)) { |
| 202 // TODO(ricea): Think about having a better error message. |
| 203 return Promise_reject(new TypeError(streamErrors.illegalInvocation)); |
| 204 } |
| 205 |
| 206 preventClose = Boolean(preventClose); |
| 207 preventAbort = Boolean(preventAbort); |
| 208 preventCancel = Boolean(preventCancel); |
| 209 |
| 210 const readable = this; |
| 211 if (IsReadableStreamLocked(readable)) { |
| 212 return Promise_reject(new TypeError(errCannotPipeLockedStream)); |
| 213 } |
| 214 |
| 215 if (binding.IsWritableStreamLocked(dest)) { |
| 216 return Promise_reject(new TypeError(errCannotPipeToALockedStream)); |
| 217 } |
| 218 |
| 219 const reader = AcquireReadableStreamDefaultReader(readable); |
| 220 const writer = binding.AcquireWritableStreamDefaultWriter(dest); |
| 221 let shuttingDown = false; |
| 222 const promise = v8.createPromise(); |
| 223 let reading = false; |
| 224 |
| 225 if (checkInitialState()) { |
| 226 // Need to detect closing and error when we are not reading. |
| 227 thenPromise(reader[_closedPromise], onReaderClosed, readableError); |
| 228 // Need to detect error when we are not writing. |
| 229 thenPromise( |
| 230 binding.getWritableStreamDefaultWriterClosedPromise(writer), |
| 231 undefined, writableError); |
| 232 pump(); |
| 233 } |
| 234 |
| 235 // Checks the state of the streams and executes the shutdown handlers if |
| 236 // necessary. Returns true if piping can continue. |
| 237 function checkInitialState() { |
| 238 const state = ReadableStreamGetState(readable); |
| 239 |
| 240 // Both streams can be errored or closed. To perform the right action the |
| 241 // order of the checks must match the standard. |
| 242 if (state === STATE_ERRORED) { |
| 243 readableError(readable[_storedError]); |
| 244 return false; |
| 245 } |
| 246 |
| 247 if (binding.isWritableStreamErrored(dest)) { |
| 248 writableError(binding.getWritableStreamStoredError(dest)); |
| 249 return false; |
| 250 } |
| 251 |
| 252 if (state === STATE_CLOSED) { |
| 253 readableClosed(); |
| 254 return false; |
| 255 } |
| 256 |
| 257 if (binding.isWritableStreamClosingOrClosed(dest)) { |
| 258 writableStartedClosed(); |
| 259 return false; |
| 260 } |
| 261 |
| 262 return true; |
| 263 } |
| 264 |
| 265 function pump() { |
| 266 if (shuttingDown) { |
| 267 return; |
| 268 } |
| 269 const desiredSize = |
| 270 binding.WritableStreamDefaultWriterGetDesiredSize(writer); |
| 271 if (desiredSize === null) { |
| 272 writableError(binding.getWritableStreamStoredError(dest)); |
| 273 } |
| 274 if (desiredSize <= 0) { |
| 275 thenPromise( |
| 276 binding.getWritableStreamDefaultWriterReadyPromise(writer), pump, |
| 277 writableError); |
| 278 return; |
| 279 } |
| 280 reading = true; |
| 281 // TODO(ricea): Delay reads heuristically when desiredSize is low. |
| 282 thenPromise( |
| 283 ReadableStreamDefaultReaderRead(reader), readFulfilled, readRejected); |
| 284 } |
| 285 |
| 286 function readFulfilled({value, done}) { |
| 287 reading = false; |
| 288 if (shuttingDown) { |
| 289 return; |
| 290 } |
| 291 if (done) { |
| 292 readableClosed(); |
| 293 return; |
| 294 } |
| 295 const write = binding.WritableStreamDefaultWriterWrite(writer, value); |
| 296 thenPromise(write, undefined, writableError); |
| 297 pump(); |
| 298 } |
| 299 |
| 300 function readRejected() { |
| 301 reading = false; |
| 302 readableError(readable[_storedError]); |
| 303 } |
| 304 |
| 305 // If read() is in progress, then wait for it to tell us that the stream is |
| 306 // closed so that we write all the data before shutdown. |
| 307 function onReaderClosed() { |
| 308 if (!reading) { |
| 309 readableClosed(); |
| 310 } |
| 311 } |
| 312 |
| 313 // These steps are from "Errors must be propagated forward" in the |
| 314 // standard. |
| 315 function readableError(error) { |
| 316 if (!preventAbort) { |
| 317 shutdownWithAction( |
| 318 binding.WritableStreamAbort, [dest, error], error, true); |
| 319 } else { |
| 320 shutdown(error, true); |
| 321 } |
| 322 } |
| 323 |
| 324 // These steps are from "Errors must be propagated backward". |
| 325 function writableError(error) { |
| 326 if (!preventCancel) { |
| 327 shutdownWithAction( |
| 328 ReadableStreamCancel, [readable, error], error, true); |
| 329 } else { |
| 330 shutdown(error, true); |
| 331 } |
| 332 } |
| 333 |
| 334 // These steps are from "Closing must be propagated forward". |
| 335 function readableClosed() { |
| 336 if (!preventClose) { |
| 337 shutdownWithAction( |
| 338 binding.WritableStreamDefaultWriterCloseWithErrorPropagation, |
| 339 [writer]); |
| 340 } else { |
| 341 shutdown(); |
| 342 } |
| 343 } |
| 344 |
| 345 // These steps are from "Closing must be propagated backward". |
| 346 function writableStartedClosed() { |
| 347 const destClosed = new TypeError(errDestinationStreamClosed); |
| 348 if (!preventCancel) { |
| 349 shutdownWithAction( |
| 350 ReadableStreamCancel, [readable, destClosed], destClosed, true); |
| 351 } else { |
| 352 shutdown(destClosed, true); |
| 353 } |
| 354 } |
| 355 |
| 356 function shutdownWithAction( |
| 357 action, args, originalError = undefined, errorGiven = false) { |
| 358 if (shuttingDown) { |
| 359 return; |
| 360 } |
| 361 shuttingDown = true; |
| 362 const p = applyFunction(action, undefined, args); |
| 363 thenPromise( |
| 364 p, () => finalize(originalError, errorGiven), |
| 365 newError => finalize(newError, true)); |
| 366 } |
| 367 |
| 368 function shutdown(error = undefined, errorGiven = false) { |
| 369 if (shuttingDown) { |
| 370 return; |
| 371 } |
| 372 shuttingDown = true; |
| 373 finalize(error, errorGiven); |
| 374 } |
| 375 |
| 376 function finalize(error, errorGiven) { |
| 377 binding.WritableStreamDefaultWriterRelease(writer); |
| 378 ReadableStreamReaderGenericRelease(reader); |
| 379 if (errorGiven) { |
| 380 v8.rejectPromise(promise, error); |
| 381 } else { |
| 382 v8.resolvePromise(promise, undefined); |
| 383 } |
| 384 } |
| 385 |
| 386 return promise; |
| 387 } |
| 388 |
| 184 class ReadableStreamDefaultController { | 389 class ReadableStreamDefaultController { |
| 185 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr
olled) { | 390 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr
olled) { |
| 186 if (IsReadableStream(stream) === false) { | 391 if (IsReadableStream(stream) === false) { |
| 187 throw new TypeError(streamErrors.illegalConstructor); | 392 throw new TypeError(streamErrors.illegalConstructor); |
| 188 } | 393 } |
| 189 | 394 |
| 190 if (stream[_controller] !== undefined) { | 395 if (stream[_controller] !== undefined) { |
| 191 throw new TypeError(streamErrors.illegalConstructor); | 396 throw new TypeError(streamErrors.illegalConstructor); |
| 192 } | 397 } |
| 193 | 398 |
| (...skipping 772 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 966 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController
Close; | 1171 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController
Close; |
| 967 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC
ontrollerGetDesiredSize; | 1172 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC
ontrollerGetDesiredSize; |
| 968 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll
erEnqueue; | 1173 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll
erEnqueue; |
| 969 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController
Error; | 1174 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController
Error; |
| 970 | 1175 |
| 971 binding.createReadableStreamWithExternalController = | 1176 binding.createReadableStreamWithExternalController = |
| 972 (underlyingSource, strategy) => { | 1177 (underlyingSource, strategy) => { |
| 973 return new ReadableStream( | 1178 return new ReadableStream( |
| 974 underlyingSource, strategy, createWithExternalControllerSentinel); | 1179 underlyingSource, strategy, createWithExternalControllerSentinel); |
| 975 }; | 1180 }; |
| 1181 |
| 1182 // Temporary exports while pipeTo() and pipeThrough() are behind flags |
| 1183 binding.ReadableStream_prototype_pipeThrough = |
| 1184 ReadableStream_prototype_pipeThrough; |
| 1185 binding.ReadableStream_prototype_pipeTo = ReadableStream_prototype_pipeTo; |
| 976 }); | 1186 }); |
| OLD | NEW |