| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 (function(global, binding, v8) { | 5 (function(global, binding, v8) { |
| 6 'use strict'; | 6 'use strict'; |
| 7 | 7 |
| 8 const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); |
| 9 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); |
| 8 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); | 10 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); |
| 9 const readableStreamQueue = v8.createPrivateSymbol('[[queue]]'); | |
| 10 const readableStreamQueueSize = | |
| 11 v8.createPrivateSymbol('[[queue]] total size'); | |
| 12 const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); | |
| 13 const readableStreamState = v8.createPrivateSymbol('[[state]]'); | |
| 14 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); | |
| 15 const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]'); | |
| 16 const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); | |
| 17 const readableStreamUnderlyingSource = | |
| 18 v8.createPrivateSymbol('[[underlyingSource]]'); | |
| 19 | |
| 20 const readableStreamControllerControlledReadableStream = | |
| 21 v8.createPrivateSymbol('[[controlledReadableStream]]'); | |
| 22 | 11 |
| 23 const readableStreamReaderClosedPromise = | 12 const readableStreamReaderClosedPromise = |
| 24 v8.createPrivateSymbol('[[closedPromise]]'); | 13 v8.createPrivateSymbol('[[closedPromise]]'); |
| 25 const readableStreamReaderOwnerReadableStream = | 14 const readableStreamReaderOwnerReadableStream = |
| 26 v8.createPrivateSymbol('[[ownerReadableStream]]'); | 15 v8.createPrivateSymbol('[[ownerReadableStream]]'); |
| 27 const readableStreamReaderReadRequests = | 16 |
| 17 const readableStreamDefaultReaderReadRequests = |
| 28 v8.createPrivateSymbol('[[readRequests]]'); | 18 v8.createPrivateSymbol('[[readRequests]]'); |
| 29 | 19 |
| 30 const createWithExternalControllerSentinel = | 20 const createWithExternalControllerSentinel = |
| 31 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass'); | 21 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass'); |
| 32 | 22 |
| 23 const readableStreamBits = v8.createPrivateSymbol('bit field for [[state]] and
[[disturbed]]'); |
| 24 const DISTURBED = 0b1; |
| 25 // The 2nd and 3rd bit are for [[state]]. |
| 26 const STATE_MASK = 0b110; |
| 27 const STATE_BITS_OFFSET = 1; |
| 33 const STATE_READABLE = 0; | 28 const STATE_READABLE = 0; |
| 34 const STATE_CLOSED = 1; | 29 const STATE_CLOSED = 1; |
| 35 const STATE_ERRORED = 2; | 30 const STATE_ERRORED = 2; |
| 36 | 31 |
| 37 const readableStreamBits = v8.createPrivateSymbol( | 32 const readableStreamDefaultControllerUnderlyingSource = |
| 38 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]]
, [[disturbed]]'); | 33 v8.createPrivateSymbol('[[underlyingSource]]'); |
| 34 const readableStreamDefaultControllerControlledReadableStream = |
| 35 v8.createPrivateSymbol('[[controlledReadableStream]]'); |
| 36 const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]]
'); |
| 37 const readableStreamDefaultControllerQueueSize = |
| 38 v8.createPrivateSymbol('[[queue]] total size'); |
| 39 const readableStreamDefaultControllerStrategySize = |
| 40 v8.createPrivateSymbol('[[strategySize]]'); |
| 41 const readableStreamDefaultControllerStrategyHWM = |
| 42 v8.createPrivateSymbol('[[strategyHWM]]'); |
| 43 |
| 44 const readableStreamDefaultControllerBits = v8.createPrivateSymbol( |
| 45 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]]
'); |
| 39 const STARTED = 0b1; | 46 const STARTED = 0b1; |
| 40 const CLOSE_REQUESTED = 0b10; | 47 const CLOSE_REQUESTED = 0b10; |
| 41 const PULLING = 0b100; | 48 const PULLING = 0b100; |
| 42 const PULL_AGAIN = 0b1000; | 49 const PULL_AGAIN = 0b1000; |
| 43 const DISTURBED = 0b10000; | 50 const EXTERNALLY_CONTROLLED = 0b10000; |
| 51 |
| 52 const readableStreamControllerCancel = |
| 53 v8.createPrivateSymbol('[[InternalCancel]]'); |
| 54 const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]'
); |
| 44 | 55 |
| 45 const undefined = global.undefined; | 56 const undefined = global.undefined; |
| 46 const Infinity = global.Infinity; | 57 const Infinity = global.Infinity; |
| 47 | 58 |
| 48 const defineProperty = global.Object.defineProperty; | 59 const defineProperty = global.Object.defineProperty; |
| 49 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); | 60 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| 50 const callFunction = v8.uncurryThis(global.Function.prototype.call); | 61 const callFunction = v8.uncurryThis(global.Function.prototype.call); |
| 51 | 62 |
| 52 const TypeError = global.TypeError; | 63 const TypeError = global.TypeError; |
| 53 const RangeError = global.RangeError; | 64 const RangeError = global.RangeError; |
| 54 | 65 |
| 55 const Number = global.Number; | 66 const Number = global.Number; |
| 56 const Number_isNaN = Number.isNaN; | 67 const Number_isNaN = Number.isNaN; |
| 57 const Number_isFinite = Number.isFinite; | 68 const Number_isFinite = Number.isFinite; |
| 58 | 69 |
| 59 const Promise = global.Promise; | 70 const Promise = global.Promise; |
| 60 const thenPromise = v8.uncurryThis(Promise.prototype.then); | 71 const thenPromise = v8.uncurryThis(Promise.prototype.then); |
| 61 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); | 72 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); |
| 62 const Promise_reject = v8.simpleBind(Promise.reject, Promise); | 73 const Promise_reject = v8.simpleBind(Promise.reject, Promise); |
| 63 | 74 |
| 64 const errIllegalInvocation = 'Illegal invocation'; | 75 const errIllegalInvocation = 'Illegal invocation'; |
| 65 const errIllegalConstructor = 'Illegal constructor'; | 76 const errIllegalConstructor = 'Illegal constructor'; |
| 66 const errCancelLockedStream = | 77 const errCancelLockedStream = |
| 67 'Cannot cancel a readable stream that is locked to a reader'; | 78 'Cannot cancel a readable stream that is locked to a reader'; |
| 68 const errEnqueueInCloseRequestedStream = | 79 const errEnqueueCloseRequestedStream = |
| 69 'Cannot enqueue a chunk into a readable stream that is closed or has been
requested to be closed'; | 80 'Cannot enqueue a chunk into a readable stream that is closed or has been
requested to be closed'; |
| 70 const errCancelReleasedReader = | 81 const errCancelReleasedReader = |
| 71 'This readable stream reader has been released and cannot be used to cance
l its previous owner stream'; | 82 'This readable stream reader has been released and cannot be used to cance
l its previous owner stream'; |
| 72 const errReadReleasedReader = | 83 const errReadReleasedReader = |
| 73 'This readable stream reader has been released and cannot be used to read
from its previous owner stream'; | 84 'This readable stream reader has been released and cannot be used to read
from its previous owner stream'; |
| 74 const errCloseCloseRequestedStream = | 85 const errCloseCloseRequestedStream = |
| 75 'Cannot close a readable stream that has already been requested to be clos
ed'; | 86 'Cannot close a readable stream that has already been requested to be clos
ed'; |
| 87 const errEnqueueClosedStream = 'Cannot enqueue a chunk into a closed readable
stream'; |
| 88 const errEnqueueErroredStream = 'Cannot enqueue a chunk into an errored readab
le stream'; |
| 89 const errCloseClosedStream = 'Cannot close a closed readable stream'; |
| 76 const errCloseErroredStream = 'Cannot close an errored readable stream'; | 90 const errCloseErroredStream = 'Cannot close an errored readable stream'; |
| 77 const errErrorClosedStream = 'Cannot error a close readable stream'; | 91 const errErrorClosedStream = 'Cannot error a close readable stream'; |
| 78 const errErrorErroredStream = | 92 const errErrorErroredStream = |
| 79 'Cannot error a readable stream that is already errored'; | 93 'Cannot error a readable stream that is already errored'; |
| 94 const errGetReaderNotByteStream = 'This readable stream does not support BYOB
readers'; |
| 95 const errGetReaderBadMode = 'Invalid reader mode given: expected undefined or
"byob"'; |
| 80 const errReaderConstructorBadArgument = | 96 const errReaderConstructorBadArgument = |
| 81 'ReadableStreamReader constructor argument is not a readable stream'; | 97 'ReadableStreamReader constructor argument is not a readable stream'; |
| 82 const errReaderConstructorStreamAlreadyLocked = | 98 const errReaderConstructorStreamAlreadyLocked = |
| 83 'ReadableStreamReader constructor can only accept readable streams that ar
e not yet locked to a reader'; | 99 'ReadableStreamReader constructor can only accept readable streams that ar
e not yet locked to a reader'; |
| 84 const errReleaseReaderWithPendingRead = | 100 const errReleaseReaderWithPendingRead = |
| 85 'Cannot release a readable stream reader when it still has outstanding rea
d() calls that have not yet settled'; | 101 'Cannot release a readable stream reader when it still has outstanding rea
d() calls that have not yet settled'; |
| 86 const errReleasedReaderClosedPromise = | 102 const errReleasedReaderClosedPromise = |
| 87 'This readable stream reader has been released and cannot be used to monit
or the stream\'s state'; | 103 'This readable stream reader has been released and cannot be used to monit
or the stream\'s state'; |
| 88 const errInvalidSize = | 104 const errInvalidSize = |
| 89 'The return value of a queuing strategy\'s size function must be a finite,
non-NaN, non-negative number'; | 105 'The return value of a queuing strategy\'s size function must be a finite,
non-NaN, non-negative number'; |
| 90 const errSizeNotAFunction = | 106 const errSizeNotAFunction = |
| 91 'A queuing strategy\'s size property must be a function'; | 107 'A queuing strategy\'s size property must be a function'; |
| 92 const errInvalidHWM = | 108 const errInvalidHWM = |
| 93 'A queueing strategy\'s highWaterMark property must be a nonnegative, non-
NaN number'; | 109 'A queueing strategy\'s highWaterMark property must be a nonnegative, non-
NaN number'; |
| 94 const errTmplMustBeFunctionOrUndefined = name => | 110 const errTmplMustBeFunctionOrUndefined = name => |
| 95 `${name} must be a function or undefined`; | 111 `${name} must be a function or undefined`; |
| 96 | 112 |
| 97 class ReadableStream { | 113 class ReadableStream { |
| 98 constructor() { | 114 constructor() { |
| 99 // TODO(domenic): when V8 gets default parameters and destructuring, all | 115 // TODO(domenic): when V8 gets default parameters and destructuring, all |
| 100 // this can be cleaned up. | 116 // this can be cleaned up. |
| 101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; | 117 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; |
| 102 const strategy = arguments[1] === undefined ? {} : arguments[1]; | 118 const strategy = arguments[1] === undefined ? {} : arguments[1]; |
| 103 const size = strategy.size; | 119 const size = strategy.size; |
| 104 let highWaterMark = strategy.highWaterMark; | 120 let highWaterMark = strategy.highWaterMark; |
| 105 if (highWaterMark === undefined) { | 121 if (highWaterMark === undefined) { |
| 106 highWaterMark = 1; | 122 highWaterMark = 1; |
| 107 } | 123 } |
| 108 | 124 |
| 109 const normalizedStrategy = | |
| 110 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | |
| 111 | |
| 112 this[readableStreamUnderlyingSource] = underlyingSource; | |
| 113 | |
| 114 this[readableStreamQueue] = new v8.InternalPackedArray(); | |
| 115 this[readableStreamQueueSize] = 0; | |
| 116 | |
| 117 this[readableStreamState] = STATE_READABLE; | |
| 118 this[readableStreamBits] = 0b0; | 125 this[readableStreamBits] = 0b0; |
| 126 ReadableStreamSetState(this, STATE_READABLE); |
| 119 this[readableStreamReader] = undefined; | 127 this[readableStreamReader] = undefined; |
| 120 this[readableStreamStoredError] = undefined; | 128 this[readableStreamStoredError] = undefined; |
| 121 | 129 |
| 122 this[readableStreamStrategySize] = normalizedStrategy.size; | |
| 123 this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; | |
| 124 | |
| 125 // Avoid allocating the controller if the stream is going to be controlled | 130 // Avoid allocating the controller if the stream is going to be controlled |
| 126 // externally (i.e. from C++) anyway. All calls to underlyingSource | 131 // externally (i.e. from C++) anyway. All calls to underlyingSource |
| 127 // methods will disregard their controller argument in such situations | 132 // methods will disregard their controller argument in such situations |
| 128 // (but see below). | 133 // (but see below). |
| 129 | 134 |
| 130 const isControlledExternally = | 135 this[readableStreamController] = undefined; |
| 131 arguments[2] === createWithExternalControllerSentinel; | |
| 132 const controller = | |
| 133 isControlledExternally ? null : new ReadableStreamController(this); | |
| 134 this[readableStreamController] = controller; | |
| 135 | 136 |
| 136 // We need to pass ourself to the underlyingSource start method for | 137 const type = underlyingSource.type; |
| 137 // externally-controlled streams. We use the now-useless controller | 138 const typeString = String(type); |
| 138 // argument to do so. | 139 if (typeString === 'bytes') { |
| 139 const argToStart = isControlledExternally ? this : controller; | 140 throw new RangeError('bytes type is not yet implemented'); |
| 141 } else if (type !== undefined) { |
| 142 throw new RangeError('Invalid type is specified'); |
| 143 } |
| 140 | 144 |
| 141 const startResult = CallOrNoop( | 145 this[readableStreamController] = |
| 142 underlyingSource, 'start', argToStart, 'underlyingSource.start'); | 146 new ReadableStreamDefaultController(this, underlyingSource, size, high
WaterMark, arguments[2] === createWithExternalControllerSentinel); |
| 143 thenPromise(Promise_resolve(startResult), | |
| 144 () => { | |
| 145 this[readableStreamBits] |= STARTED; | |
| 146 RequestReadableStreamPull(this); | |
| 147 }, | |
| 148 r => { | |
| 149 if (this[readableStreamState] === STATE_READABLE) { | |
| 150 return ErrorReadableStream(this, r); | |
| 151 } | |
| 152 }); | |
| 153 } | 147 } |
| 154 | 148 |
| 155 get locked() { | 149 get locked() { |
| 156 if (IsReadableStream(this) === false) { | 150 if (IsReadableStream(this) === false) { |
| 157 throw new TypeError(errIllegalInvocation); | 151 throw new TypeError(errIllegalInvocation); |
| 158 } | 152 } |
| 159 | 153 |
| 160 return IsReadableStreamLocked(this); | 154 return IsReadableStreamLocked(this); |
| 161 } | 155 } |
| 162 | 156 |
| 163 cancel(reason) { | 157 cancel(reason) { |
| 164 if (IsReadableStream(this) === false) { | 158 if (IsReadableStream(this) === false) { |
| 165 return Promise_reject(new TypeError(errIllegalInvocation)); | 159 return Promise_reject(new TypeError(errIllegalInvocation)); |
| 166 } | 160 } |
| 167 | 161 |
| 168 if (IsReadableStreamLocked(this) === true) { | 162 if (IsReadableStreamLocked(this) === true) { |
| 169 return Promise_reject(new TypeError(errCancelLockedStream)); | 163 return Promise_reject(new TypeError(errCancelLockedStream)); |
| 170 } | 164 } |
| 171 | 165 |
| 172 return CancelReadableStream(this, reason); | 166 return ReadableStreamCancel(this, reason); |
| 173 } | 167 } |
| 174 | 168 |
| 175 getReader() { | 169 getReader({ mode } = {}) { |
| 176 if (IsReadableStream(this) === false) { | 170 if (IsReadableStream(this) === false) { |
| 177 throw new TypeError(errIllegalInvocation); | 171 throw new TypeError(errIllegalInvocation); |
| 178 } | 172 } |
| 179 | 173 |
| 180 return AcquireReadableStreamReader(this); | 174 if (mode === 'byob') { |
| 175 if (IsReadableByteStreamDefaultController(this[readableStreamController]
) === false) { |
| 176 throw new TypeError(errGetReaderNotByteStream); |
| 177 } |
| 178 |
| 179 return AcquireReadableStreamBYOBReader(this); |
| 180 } |
| 181 |
| 182 if (mode === undefined) { |
| 183 return AcquireReadableStreamDefaultReader(this); |
| 184 } |
| 185 |
| 186 throw new RangeError(errGetReaderBadMode); |
| 181 } | 187 } |
| 182 | 188 |
| 183 tee() { | 189 tee() { |
| 184 if (IsReadableStream(this) === false) { | 190 if (IsReadableStream(this) === false) { |
| 185 throw new TypeError(errIllegalInvocation); | 191 throw new TypeError(errIllegalInvocation); |
| 186 } | 192 } |
| 187 | 193 |
| 188 return TeeReadableStream(this); | 194 return ReadableStreamTee(this); |
| 189 } | 195 } |
| 190 } | 196 } |
| 191 | 197 |
| 192 class ReadableStreamController { | 198 class ReadableStreamDefaultController { |
| 193 constructor(stream) { | 199 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr
olled) { |
| 194 if (IsReadableStream(stream) === false) { | 200 if (IsReadableStream(stream) === false) { |
| 195 throw new TypeError(errIllegalConstructor); | 201 throw new TypeError(errIllegalConstructor); |
| 196 } | 202 } |
| 197 | 203 |
| 198 if (stream[readableStreamController] !== undefined) { | 204 if (stream[readableStreamController] !== undefined) { |
| 199 throw new TypeError(errIllegalConstructor); | 205 throw new TypeError(errIllegalConstructor); |
| 200 } | 206 } |
| 201 | 207 |
| 202 this[readableStreamControllerControlledReadableStream] = stream; | 208 this[readableStreamDefaultControllerControlledReadableStream] = stream; |
| 209 |
| 210 this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource; |
| 211 |
| 212 this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray(); |
| 213 this[readableStreamDefaultControllerQueueSize] = 0; |
| 214 |
| 215 this[readableStreamDefaultControllerBits] = 0b0; |
| 216 if (isExternallyControlled === true) { |
| 217 this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; |
| 218 } |
| 219 |
| 220 const normalizedStrategy = |
| 221 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); |
| 222 this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.siz
e; |
| 223 this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.high
WaterMark; |
| 224 |
| 225 const controller = this; |
| 226 |
| 227 const startResult = CallOrNoop( |
| 228 underlyingSource, 'start', this, 'underlyingSource.start'); |
| 229 thenPromise(Promise_resolve(startResult), |
| 230 () => { |
| 231 controller[readableStreamDefaultControllerBits] |= STARTED; |
| 232 ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| 233 }, |
| 234 r => { |
| 235 if (ReadableStreamGetState(stream) === STATE_READABLE) { |
| 236 ReadableStreamDefaultControllerError(controller, r); |
| 237 } |
| 238 }); |
| 203 } | 239 } |
| 204 | 240 |
| 205 get desiredSize() { | 241 get desiredSize() { |
| 206 if (IsReadableStreamController(this) === false) { | 242 if (IsReadableStreamDefaultController(this) === false) { |
| 207 throw new TypeError(errIllegalInvocation); | 243 throw new TypeError(errIllegalInvocation); |
| 208 } | 244 } |
| 209 | 245 |
| 210 return GetReadableStreamDesiredSize( | 246 return ReadableStreamDefaultControllerGetDesiredSize(this); |
| 211 this[readableStreamControllerControlledReadableStream]); | |
| 212 } | 247 } |
| 213 | 248 |
| 214 close() { | 249 close() { |
| 215 if (IsReadableStreamController(this) === false) { | 250 if (IsReadableStreamDefaultController(this) === false) { |
| 216 throw new TypeError(errIllegalInvocation); | 251 throw new TypeError(errIllegalInvocation); |
| 217 } | 252 } |
| 218 | 253 |
| 219 const stream = this[readableStreamControllerControlledReadableStream]; | 254 const stream = this[readableStreamDefaultControllerControlledReadableStrea
m]; |
| 220 | 255 |
| 221 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | 256 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| 222 throw new TypeError(errCloseCloseRequestedStream); | 257 throw new TypeError(errCloseCloseRequestedStream); |
| 223 } | 258 } |
| 224 if (stream[readableStreamState] === STATE_ERRORED) { | 259 |
| 260 const state = ReadableStreamGetState(stream); |
| 261 if (state === STATE_ERRORED) { |
| 225 throw new TypeError(errCloseErroredStream); | 262 throw new TypeError(errCloseErroredStream); |
| 226 } | 263 } |
| 264 if (state === STATE_CLOSED) { |
| 265 throw new TypeError(errCloseClosedStream); |
| 266 } |
| 227 | 267 |
| 228 return CloseReadableStream(stream); | 268 return ReadableStreamDefaultControllerClose(this); |
| 229 } | 269 } |
| 230 | 270 |
| 231 enqueue(chunk) { | 271 enqueue(chunk) { |
| 232 if (IsReadableStreamController(this) === false) { | 272 if (IsReadableStreamDefaultController(this) === false) { |
| 233 throw new TypeError(errIllegalInvocation); | 273 throw new TypeError(errIllegalInvocation); |
| 234 } | 274 } |
| 235 | 275 |
| 236 const stream = this[readableStreamControllerControlledReadableStream]; | 276 const stream = this[readableStreamDefaultControllerControlledReadableStrea
m]; |
| 237 | 277 |
| 238 if (stream[readableStreamState] === STATE_ERRORED) { | 278 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| 239 throw stream[readableStreamStoredError]; | 279 throw new TypeError(errEnqueueCloseRequestedStream); |
| 240 } | 280 } |
| 241 | 281 |
| 242 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | 282 const state = ReadableStreamGetState(stream); |
| 243 throw new TypeError(errEnqueueInCloseRequestedStream); | 283 if (state === STATE_ERRORED) { |
| 284 throw new TypeError(errEnqueueErroredStream); |
| 285 } |
| 286 if (state === STATE_CLOSED) { |
| 287 throw new TypeError(errEnqueueClosedStream); |
| 244 } | 288 } |
| 245 | 289 |
| 246 return EnqueueInReadableStream(stream, chunk); | 290 return ReadableStreamDefaultControllerEnqueue(this, chunk); |
| 247 } | 291 } |
| 248 | 292 |
| 249 error(e) { | 293 error(e) { |
| 250 if (IsReadableStreamController(this) === false) { | 294 if (IsReadableStreamDefaultController(this) === false) { |
| 251 throw new TypeError(errIllegalInvocation); | 295 throw new TypeError(errIllegalInvocation); |
| 252 } | 296 } |
| 253 | 297 |
| 254 const stream = this[readableStreamControllerControlledReadableStream]; | 298 const stream = this[readableStreamDefaultControllerControlledReadableStrea
m]; |
| 255 | 299 |
| 256 const state = stream[readableStreamState]; | 300 const state = ReadableStreamGetState(stream); |
| 257 if (state !== STATE_READABLE) { | 301 if (state === STATE_ERRORED) { |
| 258 if (state === STATE_ERRORED) { | 302 throw new TypeError(errErrorErroredStream); |
| 259 throw new TypeError(errErrorErroredStream); | 303 } |
| 260 } | 304 if (state === STATE_CLOSED) { |
| 261 if (state === STATE_CLOSED) { | 305 throw new TypeError(errErrorClosedStream); |
| 262 throw new TypeError(errErrorClosedStream); | |
| 263 } | |
| 264 } | 306 } |
| 265 | 307 |
| 266 return ErrorReadableStream(stream, e); | 308 return ReadableStreamDefaultControllerError(this, e); |
| 267 } | 309 } |
| 268 } | 310 } |
| 269 | 311 |
| 270 class ReadableStreamReader { | 312 function ReadableStreamDefaultControllerCancel(controller, reason) { |
| 313 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra
y(); |
| 314 |
| 315 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin
gSource]; |
| 316 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour
ce.cancel'); |
| 317 } |
| 318 |
| 319 function ReadableStreamDefaultControllerPull(controller) { |
| 320 const stream = controller[readableStreamDefaultControllerControlledReadableS
tream]; |
| 321 |
| 322 if (controller[readableStreamDefaultControllerQueue].length > 0) { |
| 323 const chunk = DequeueValue(controller); |
| 324 |
| 325 if ((controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) && |
| 326 controller[readableStreamDefaultControllerQueue].length === 0) { |
| 327 ReadableStreamClose(stream); |
| 328 } else { |
| 329 ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| 330 } |
| 331 |
| 332 return Promise_resolve(CreateIterResultObject(chunk, false)); |
| 333 } |
| 334 |
| 335 const pendingPromise = ReadableStreamAddReadRequest(stream); |
| 336 ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| 337 return pendingPromise; |
| 338 } |
| 339 |
| 340 function ReadableStreamAddReadRequest(stream) { |
| 341 const promise = v8.createPromise(); |
| 342 stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(p
romise); |
| 343 return promise; |
| 344 } |
| 345 |
| 346 class ReadableStreamDefaultReader { |
| 271 constructor(stream) { | 347 constructor(stream) { |
| 272 if (IsReadableStream(stream) === false) { | 348 if (IsReadableStream(stream) === false) { |
| 273 throw new TypeError(errReaderConstructorBadArgument); | 349 throw new TypeError(errReaderConstructorBadArgument); |
| 274 } | 350 } |
| 275 if (IsReadableStreamLocked(stream) === true) { | 351 if (IsReadableStreamLocked(stream) === true) { |
| 276 throw new TypeError(errReaderConstructorStreamAlreadyLocked); | 352 throw new TypeError(errReaderConstructorStreamAlreadyLocked); |
| 277 } | 353 } |
| 278 | 354 |
| 279 // TODO(yhirano): Remove this when we don't need hasPendingActivity in | 355 ReadableStreamReaderGenericInitialize(this, stream); |
| 280 // blink::UnderlyingSourceBase. | |
| 281 if (stream[readableStreamController] === null) { | |
| 282 // The stream is created with an external controller (i.e. made in | |
| 283 // Blink). | |
| 284 const underlyingSource = stream[readableStreamUnderlyingSource]; | |
| 285 callFunction(underlyingSource.notifyLockAcquired, underlyingSource); | |
| 286 } | |
| 287 | 356 |
| 288 this[readableStreamReaderOwnerReadableStream] = stream; | 357 this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray
(); |
| 289 stream[readableStreamReader] = this; | |
| 290 | |
| 291 this[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
| 292 | |
| 293 switch (stream[readableStreamState]) { | |
| 294 case STATE_READABLE: | |
| 295 this[readableStreamReaderClosedPromise] = v8.createPromise(); | |
| 296 break; | |
| 297 case STATE_CLOSED: | |
| 298 this[readableStreamReaderClosedPromise] = Promise_resolve(undefined); | |
| 299 break; | |
| 300 case STATE_ERRORED: | |
| 301 this[readableStreamReaderClosedPromise] = | |
| 302 Promise_reject(stream[readableStreamStoredError]); | |
| 303 break; | |
| 304 } | |
| 305 } | 358 } |
| 306 | 359 |
| 307 get closed() { | 360 get closed() { |
| 308 if (IsReadableStreamReader(this) === false) { | 361 if (IsReadableStreamDefaultReader(this) === false) { |
| 309 return Promise_reject(new TypeError(errIllegalInvocation)); | 362 return Promise_reject(new TypeError(errIllegalInvocation)); |
| 310 } | 363 } |
| 311 | 364 |
| 312 return this[readableStreamReaderClosedPromise]; | 365 return this[readableStreamReaderClosedPromise]; |
| 313 } | 366 } |
| 314 | 367 |
| 315 cancel(reason) { | 368 cancel(reason) { |
| 316 if (IsReadableStreamReader(this) === false) { | 369 if (IsReadableStreamDefaultReader(this) === false) { |
| 317 return Promise_reject(new TypeError(errIllegalInvocation)); | 370 return Promise_reject(new TypeError(errIllegalInvocation)); |
| 318 } | 371 } |
| 319 | 372 |
| 320 const stream = this[readableStreamReaderOwnerReadableStream]; | 373 const stream = this[readableStreamReaderOwnerReadableStream]; |
| 321 if (stream === undefined) { | 374 if (stream === undefined) { |
| 322 return Promise_reject(new TypeError(errCancelReleasedReader)); | 375 return Promise_reject(new TypeError(errCancelReleasedReader)); |
| 323 } | 376 } |
| 324 | 377 |
| 325 return CancelReadableStream(stream, reason); | 378 return ReadableStreamReaderGenericCancel(this, reason); |
| 326 } | 379 } |
| 327 | 380 |
| 328 read() { | 381 read() { |
| 329 if (IsReadableStreamReader(this) === false) { | 382 if (IsReadableStreamDefaultReader(this) === false) { |
| 330 return Promise_reject(new TypeError(errIllegalInvocation)); | 383 return Promise_reject(new TypeError(errIllegalInvocation)); |
| 331 } | 384 } |
| 332 | 385 |
| 333 if (this[readableStreamReaderOwnerReadableStream] === undefined) { | 386 if (this[readableStreamReaderOwnerReadableStream] === undefined) { |
| 334 return Promise_reject(new TypeError(errReadReleasedReader)); | 387 return Promise_reject(new TypeError(errReadReleasedReader)); |
| 335 } | 388 } |
| 336 | 389 |
| 337 return ReadFromReadableStreamReader(this); | 390 return ReadableStreamDefaultReaderRead(this); |
| 338 } | 391 } |
| 339 | 392 |
| 340 releaseLock() { | 393 releaseLock() { |
| 341 if (IsReadableStreamReader(this) === false) { | 394 if (IsReadableStreamDefaultReader(this) === false) { |
| 342 throw new TypeError(errIllegalInvocation); | 395 throw new TypeError(errIllegalInvocation); |
| 343 } | 396 } |
| 344 | 397 |
| 345 const stream = this[readableStreamReaderOwnerReadableStream]; | 398 const stream = this[readableStreamReaderOwnerReadableStream]; |
| 346 if (stream === undefined) { | 399 if (stream === undefined) { |
| 347 return undefined; | 400 return undefined; |
| 348 } | 401 } |
| 349 | 402 |
| 350 if (this[readableStreamReaderReadRequests].length > 0) { | 403 if (this[readableStreamDefaultReaderReadRequests].length > 0) { |
| 351 throw new TypeError(errReleaseReaderWithPendingRead); | 404 throw new TypeError(errReleaseReaderWithPendingRead); |
| 352 } | 405 } |
| 353 | 406 |
| 354 // TODO(yhirano): Remove this when we don't need hasPendingActivity in | 407 ReadableStreamReaderGenericRelease(this); |
| 355 // blink::UnderlyingSourceBase. | 408 } |
| 356 if (stream[readableStreamController] === null) { | 409 } |
| 357 // The stream is created with an external controller (i.e. made in | |
| 358 // Blink). | |
| 359 const underlyingSource = stream[readableStreamUnderlyingSource]; | |
| 360 callFunction(underlyingSource.notifyLockReleased, underlyingSource); | |
| 361 } | |
| 362 | 410 |
| 363 if (stream[readableStreamState] === STATE_READABLE) { | 411 function ReadableStreamReaderGenericCancel(reader, reason) { |
| 364 v8.rejectPromise(this[readableStreamReaderClosedPromise], | 412 return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream],
reason); |
| 365 new TypeError(errReleasedReaderClosedPromise)); | |
| 366 } else { | |
| 367 this[readableStreamReaderClosedPromise] = | |
| 368 Promise_reject(new TypeError(errReleasedReaderClosedPromise)); | |
| 369 } | |
| 370 | |
| 371 this[readableStreamReaderOwnerReadableStream][readableStreamReader] = | |
| 372 undefined; | |
| 373 this[readableStreamReaderOwnerReadableStream] = undefined; | |
| 374 } | |
| 375 } | 413 } |
| 376 | 414 |
| 377 // | 415 // |
| 378 // Readable stream abstract operations | 416 // Readable stream abstract operations |
| 379 // | 417 // |
| 380 | 418 |
| 381 function AcquireReadableStreamReader(stream) { | 419 function AcquireReadableStreamDefaultReader(stream) { |
| 382 return new ReadableStreamReader(stream); | 420 return new ReadableStreamDefaultReader(stream); |
| 383 } | 421 } |
| 384 | 422 |
| 385 function CancelReadableStream(stream, reason) { | 423 function ReadableStreamCancel(stream, reason) { |
| 386 stream[readableStreamBits] |= DISTURBED; | 424 stream[readableStreamBits] |= DISTURBED; |
| 387 | 425 |
| 388 const state = stream[readableStreamState]; | 426 const state = ReadableStreamGetState(stream); |
| 389 if (state === STATE_CLOSED) { | 427 if (state === STATE_CLOSED) { |
| 390 return Promise_resolve(undefined); | 428 return Promise_resolve(undefined); |
| 391 } | 429 } |
| 392 if (state === STATE_ERRORED) { | 430 if (state === STATE_ERRORED) { |
| 393 return Promise_reject(stream[readableStreamStoredError]); | 431 return Promise_reject(stream[readableStreamStoredError]); |
| 394 } | 432 } |
| 395 | 433 |
| 396 stream[readableStreamQueue] = new v8.InternalPackedArray(); | 434 ReadableStreamClose(stream); |
| 397 FinishClosingReadableStream(stream); | |
| 398 | 435 |
| 399 const underlyingSource = stream[readableStreamUnderlyingSource]; | 436 const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[rea
dableStreamController], reason); |
| 400 const sourceCancelPromise = PromiseCallOrNoop( | |
| 401 underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); | |
| 402 return thenPromise(sourceCancelPromise, () => undefined); | 437 return thenPromise(sourceCancelPromise, () => undefined); |
| 403 } | 438 } |
| 404 | 439 |
| 405 function CloseReadableStream(stream) { | 440 function ReadableStreamDefaultControllerClose(controller) { |
| 406 if (stream[readableStreamState] === STATE_CLOSED) { | 441 const stream = controller[readableStreamDefaultControllerControlledReadableS
tream]; |
| 407 return undefined; | |
| 408 } | |
| 409 | 442 |
| 410 stream[readableStreamBits] |= CLOSE_REQUESTED; | 443 controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED; |
| 411 | 444 |
| 412 if (stream[readableStreamQueue].length === 0) { | 445 if (controller[readableStreamDefaultControllerQueue].length === 0) { |
| 413 return FinishClosingReadableStream(stream); | 446 ReadableStreamClose(stream); |
| 414 } | 447 } |
| 415 } | 448 } |
| 416 | 449 |
| 417 function EnqueueInReadableStream(stream, chunk) { | 450 function ReadableStreamFulfillReadRequest(stream, chunk, done) { |
| 418 if (stream[readableStreamState] === STATE_CLOSED) { | 451 const reader = stream[readableStreamReader]; |
| 419 return undefined; | |
| 420 } | |
| 421 | 452 |
| 422 if (IsReadableStreamLocked(stream) === true && | 453 const readRequest = |
| 423 stream[readableStreamReader][readableStreamReaderReadRequests].length > | 454 stream[readableStreamReader][readableStreamDefaultReaderReadRequests] |
| 424 0) { | 455 .shift(); |
| 425 const readRequest = | 456 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done)); |
| 426 stream[readableStreamReader][readableStreamReaderReadRequests] | 457 } |
| 427 .shift(); | 458 |
| 428 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); | 459 function ReadableStreamDefaultControllerEnqueue(controller, chunk) { |
| 460 const stream = controller[readableStreamDefaultControllerControlledReadableS
tream]; |
| 461 |
| 462 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque
sts(stream) > 0) { |
| 463 ReadableStreamFulfillReadRequest(stream, chunk, false); |
| 429 } else { | 464 } else { |
| 430 let chunkSize = 1; | 465 let chunkSize = 1; |
| 431 | 466 |
| 432 const strategySize = stream[readableStreamStrategySize]; | 467 const strategySize = controller[readableStreamDefaultControllerStrategySiz
e]; |
| 433 if (strategySize !== undefined) { | 468 if (strategySize !== undefined) { |
| 434 try { | 469 try { |
| 435 chunkSize = strategySize(chunk); | 470 chunkSize = strategySize(chunk); |
| 436 } catch (chunkSizeE) { | 471 } catch (chunkSizeE) { |
| 437 if (stream[readableStreamState] === STATE_READABLE) { | 472 if (ReadableStreamGetState(stream) === STATE_READABLE) { |
| 438 ErrorReadableStream(stream, chunkSizeE); | 473 ReadableStreamDefaultControllerError(controller, chunkSizeE); |
| 439 } | 474 } |
| 440 throw chunkSizeE; | 475 throw chunkSizeE; |
| 441 } | 476 } |
| 442 } | 477 } |
| 443 | 478 |
| 444 try { | 479 try { |
| 445 EnqueueValueWithSize(stream, chunk, chunkSize); | 480 EnqueueValueWithSize(controller, chunk, chunkSize); |
| 446 } catch (enqueueE) { | 481 } catch (enqueueE) { |
| 447 if (stream[readableStreamState] === STATE_READABLE) { | 482 if (ReadableStreamGetState(stream) === STATE_READABLE) { |
| 448 ErrorReadableStream(stream, enqueueE); | 483 ReadableStreamDefaultControllerError(controller, enqueueE); |
| 449 } | 484 } |
| 450 throw enqueueE; | 485 throw enqueueE; |
| 451 } | 486 } |
| 452 } | 487 } |
| 453 | 488 |
| 454 RequestReadableStreamPull(stream); | 489 ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| 455 } | 490 } |
| 456 | 491 |
| 457 function ErrorReadableStream(stream, e) { | 492 function ReadableStreamGetState(stream) { |
| 458 stream[readableStreamQueue] = new v8.InternalPackedArray(); | 493 return (stream[readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; |
| 494 } |
| 495 |
| 496 function ReadableStreamSetState(stream, state) { |
| 497 stream[readableStreamBits] = (stream[readableStreamBits] & ~STATE_MASK) | |
| 498 (state << STATE_BITS_OFFSET); |
| 499 } |
| 500 |
| 501 function ReadableStreamDefaultControllerError(controller, e) { |
| 502 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra
y(); |
| 503 const stream = controller[readableStreamDefaultControllerControlledReadableS
tream]; |
| 504 ReadableStreamError(stream, e); |
| 505 } |
| 506 |
| 507 function ReadableStreamError(stream, e) { |
| 459 stream[readableStreamStoredError] = e; | 508 stream[readableStreamStoredError] = e; |
| 460 stream[readableStreamState] = STATE_ERRORED; | 509 ReadableStreamSetState(stream, STATE_ERRORED); |
| 461 | 510 |
| 462 const reader = stream[readableStreamReader]; | 511 const reader = stream[readableStreamReader]; |
| 463 if (reader === undefined) { | 512 if (reader === undefined) { |
| 464 return undefined; | 513 return undefined; |
| 465 } | 514 } |
| 466 | 515 |
| 467 const readRequests = reader[readableStreamReaderReadRequests]; | 516 if (IsReadableStreamDefaultReader(reader) === true) { |
| 468 for (let i = 0; i < readRequests.length; ++i) { | 517 const readRequests = reader[readableStreamDefaultReaderReadRequests]; |
| 469 v8.rejectPromise(readRequests[i], e); | 518 for (let i = 0; i < readRequests.length; i++) { |
| 519 v8.rejectPromise(readRequests[i], e); |
| 520 } |
| 521 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr
ay(); |
| 470 } | 522 } |
| 471 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
| 472 | 523 |
| 473 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); | 524 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); |
| 474 } | 525 } |
| 475 | 526 |
| 476 function FinishClosingReadableStream(stream) { | 527 function ReadableStreamClose(stream) { |
| 477 stream[readableStreamState] = STATE_CLOSED; | 528 ReadableStreamSetState(stream, STATE_CLOSED); |
| 478 | 529 |
| 479 const reader = stream[readableStreamReader]; | 530 const reader = stream[readableStreamReader]; |
| 480 if (reader === undefined) { | 531 if (reader === undefined) { |
| 481 return undefined; | 532 return undefined; |
| 482 } | 533 } |
| 483 | 534 |
| 484 | 535 if (IsReadableStreamDefaultReader(reader) === true) { |
| 485 const readRequests = reader[readableStreamReaderReadRequests]; | 536 const readRequests = reader[readableStreamDefaultReaderReadRequests]; |
| 486 for (let i = 0; i < readRequests.length; ++i) { | 537 for (let i = 0; i < readRequests.length; i++) { |
| 487 v8.resolvePromise( | 538 v8.resolvePromise( |
| 488 readRequests[i], CreateIterResultObject(undefined, true)); | 539 readRequests[i], CreateIterResultObject(undefined, true)); |
| 540 } |
| 541 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr
ay(); |
| 489 } | 542 } |
| 490 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
| 491 | 543 |
| 492 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); | 544 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); |
| 493 } | 545 } |
| 494 | 546 |
| 495 function GetReadableStreamDesiredSize(stream) { | 547 function ReadableStreamDefaultControllerGetDesiredSize(controller) { |
| 496 const queueSize = GetTotalQueueSize(stream); | 548 const queueSize = GetTotalQueueSize(controller); |
| 497 return stream[readableStreamStrategyHWM] - queueSize; | 549 return controller[readableStreamDefaultControllerStrategyHWM] - queueSize; |
| 498 } | 550 } |
| 499 | 551 |
| 500 function IsReadableStream(x) { | 552 function IsReadableStream(x) { |
| 501 return hasOwnProperty(x, readableStreamUnderlyingSource); | 553 return hasOwnProperty(x, readableStreamController); |
| 502 } | 554 } |
| 503 | 555 |
| 504 function IsReadableStreamDisturbed(stream) { | 556 function IsReadableStreamDisturbed(stream) { |
| 505 return stream[readableStreamBits] & DISTURBED; | 557 return stream[readableStreamBits] & DISTURBED; |
| 506 } | 558 } |
| 507 | 559 |
| 508 function IsReadableStreamLocked(stream) { | 560 function IsReadableStreamLocked(stream) { |
| 509 return stream[readableStreamReader] !== undefined; | 561 return stream[readableStreamReader] !== undefined; |
| 510 } | 562 } |
| 511 | 563 |
| 512 function IsReadableStreamController(x) { | 564 function IsReadableStreamDefaultController(x) { |
| 513 return hasOwnProperty(x, readableStreamControllerControlledReadableStream); | 565 return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableSt
ream); |
| 566 } |
| 567 |
| 568 function IsReadableStreamDefaultReader(x) { |
| 569 return hasOwnProperty(x, readableStreamDefaultReaderReadRequests); |
| 514 } | 570 } |
| 515 | 571 |
| 516 function IsReadableStreamReadable(stream) { | 572 function IsReadableStreamReadable(stream) { |
| 517 return stream[readableStreamState] === STATE_READABLE; | 573 return ReadableStreamGetState(stream) === STATE_READABLE; |
| 518 } | 574 } |
| 519 | 575 |
| 520 function IsReadableStreamClosed(stream) { | 576 function IsReadableStreamClosed(stream) { |
| 521 return stream[readableStreamState] === STATE_CLOSED; | 577 return ReadableStreamGetState(stream) === STATE_CLOSED; |
| 522 } | 578 } |
| 523 | 579 |
| 524 function IsReadableStreamErrored(stream) { | 580 function IsReadableStreamErrored(stream) { |
| 525 return stream[readableStreamState] === STATE_ERRORED; | 581 return ReadableStreamGetState(stream) === STATE_ERRORED; |
| 526 } | 582 } |
| 527 | 583 |
| 528 function IsReadableStreamReader(x) { | 584 function ReadableStreamReaderGenericInitialize(reader, stream) { |
| 529 return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); | 585 // TODO(yhirano): Remove this when we don't need hasPendingActivity in |
| 586 // blink::UnderlyingSourceBase. |
| 587 const controller = stream[readableStreamController]; |
| 588 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED)
{ |
| 589 // The stream is created with an external controller (i.e. made in |
| 590 // Blink). |
| 591 const underlyingSource = controller[readableStreamDefaultControllerUnderly
ingSource]; |
| 592 callFunction(underlyingSource.notifyLockAcquired, underlyingSource); |
| 593 } |
| 594 |
| 595 reader[readableStreamReaderOwnerReadableStream] = stream; |
| 596 stream[readableStreamReader] = reader; |
| 597 |
| 598 switch (ReadableStreamGetState(stream)) { |
| 599 case STATE_READABLE: |
| 600 reader[readableStreamReaderClosedPromise] = v8.createPromise(); |
| 601 break; |
| 602 case STATE_CLOSED: |
| 603 reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined); |
| 604 break; |
| 605 case STATE_ERRORED: |
| 606 reader[readableStreamReaderClosedPromise] = |
| 607 Promise_reject(stream[readableStreamStoredError]); |
| 608 break; |
| 609 } |
| 530 } | 610 } |
| 531 | 611 |
| 532 function ReadFromReadableStreamReader(reader) { | 612 function ReadableStreamReaderGenericRelease(reader) { |
| 613 // TODO(yhirano): Remove this when we don't need hasPendingActivity in |
| 614 // blink::UnderlyingSourceBase. |
| 615 const controller = reader[readableStreamReaderOwnerReadableStream][readableS
treamController]; |
| 616 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED)
{ |
| 617 // The stream is created with an external controller (i.e. made in |
| 618 // Blink). |
| 619 const underlyingSource = controller[readableStreamDefaultControllerUnderly
ingSource]; |
| 620 callFunction(underlyingSource.notifyLockReleased, underlyingSource); |
| 621 } |
| 622 |
| 623 if (ReadableStreamGetState(reader[readableStreamReaderOwnerReadableStream])
=== STATE_READABLE) { |
| 624 v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError(
errReleasedReaderClosedPromise)); |
| 625 } else { |
| 626 reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(e
rrReleasedReaderClosedPromise)); |
| 627 } |
| 628 |
| 629 reader[readableStreamReaderOwnerReadableStream][readableStreamReader] = |
| 630 undefined; |
| 631 reader[readableStreamReaderOwnerReadableStream] = undefined; |
| 632 } |
| 633 |
| 634 function ReadableStreamDefaultReaderRead(reader) { |
| 533 const stream = reader[readableStreamReaderOwnerReadableStream]; | 635 const stream = reader[readableStreamReaderOwnerReadableStream]; |
| 534 stream[readableStreamBits] |= DISTURBED; | 636 stream[readableStreamBits] |= DISTURBED; |
| 535 | 637 |
| 536 if (stream[readableStreamState] === STATE_CLOSED) { | 638 if (ReadableStreamGetState(stream) === STATE_CLOSED) { |
| 537 return Promise_resolve(CreateIterResultObject(undefined, true)); | 639 return Promise_resolve(CreateIterResultObject(undefined, true)); |
| 538 } | 640 } |
| 539 | 641 |
| 540 if (stream[readableStreamState] === STATE_ERRORED) { | 642 if (ReadableStreamGetState(stream) === STATE_ERRORED) { |
| 541 return Promise_reject(stream[readableStreamStoredError]); | 643 return Promise_reject(stream[readableStreamStoredError]); |
| 542 } | 644 } |
| 543 | 645 |
| 544 const queue = stream[readableStreamQueue]; | 646 return ReadableStreamDefaultControllerPull(stream[readableStreamController])
; |
| 545 if (queue.length > 0) { | |
| 546 const chunk = DequeueValue(stream); | |
| 547 | |
| 548 if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) { | |
| 549 FinishClosingReadableStream(stream); | |
| 550 } else { | |
| 551 RequestReadableStreamPull(stream); | |
| 552 } | |
| 553 | |
| 554 return Promise_resolve(CreateIterResultObject(chunk, false)); | |
| 555 } else { | |
| 556 const readRequest = v8.createPromise(); | |
| 557 | |
| 558 reader[readableStreamReaderReadRequests].push(readRequest); | |
| 559 RequestReadableStreamPull(stream); | |
| 560 return readRequest; | |
| 561 } | |
| 562 } | 647 } |
| 563 | 648 |
| 564 function RequestReadableStreamPull(stream) { | 649 function ReadableStreamDefaultControllerCallPullIfNeeded(controller) { |
| 565 const shouldPull = ShouldReadableStreamPull(stream); | 650 const shouldPull = ReadableStreamDefaultControllerShouldCallPull(controller)
; |
| 566 if (shouldPull === false) { | 651 if (shouldPull === false) { |
| 567 return undefined; | 652 return undefined; |
| 568 } | 653 } |
| 569 | 654 |
| 570 if (stream[readableStreamBits] & PULLING) { | 655 if (controller[readableStreamDefaultControllerBits] & PULLING) { |
| 571 stream[readableStreamBits] |= PULL_AGAIN; | 656 controller[readableStreamDefaultControllerBits] |= PULL_AGAIN; |
| 572 return undefined; | 657 return undefined; |
| 573 } | 658 } |
| 574 | 659 |
| 575 stream[readableStreamBits] |= PULLING; | 660 controller[readableStreamDefaultControllerBits] |= PULLING; |
| 576 | 661 |
| 577 const underlyingSource = stream[readableStreamUnderlyingSource]; | 662 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin
gSource]; |
| 578 const controller = stream[readableStreamController]; | |
| 579 const pullPromise = PromiseCallOrNoop( | 663 const pullPromise = PromiseCallOrNoop( |
| 580 underlyingSource, 'pull', controller, 'underlyingSource.pull'); | 664 underlyingSource, 'pull', controller, 'underlyingSource.pull'); |
| 581 | 665 |
| 582 thenPromise(pullPromise, | 666 thenPromise(pullPromise, |
| 583 () => { | 667 () => { |
| 584 stream[readableStreamBits] &= ~PULLING; | 668 controller[readableStreamDefaultControllerBits] &= ~PULLING; |
| 585 | 669 |
| 586 if (stream[readableStreamBits] & PULL_AGAIN) { | 670 if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) { |
| 587 stream[readableStreamBits] &= ~PULL_AGAIN; | 671 controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN; |
| 588 return RequestReadableStreamPull(stream); | 672 ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| 589 } | 673 } |
| 590 }, | 674 }, |
| 591 e => { | 675 e => { |
| 592 if (stream[readableStreamState] === STATE_READABLE) { | 676 if (ReadableStreamGetState(controller[readableStreamDefaultControllerC
ontrolledReadableStream]) === STATE_READABLE) { |
| 593 return ErrorReadableStream(stream, e); | 677 ReadableStreamDefaultControllerError(controller, e); |
| 594 } | 678 } |
| 595 }); | 679 }); |
| 596 } | 680 } |
| 597 | 681 |
| 598 function ShouldReadableStreamPull(stream) { | 682 function ReadableStreamDefaultControllerShouldCallPull(controller) { |
| 599 const state = stream[readableStreamState]; | 683 const stream = controller[readableStreamDefaultControllerControlledReadableS
tream]; |
| 684 |
| 685 const state = ReadableStreamGetState(stream); |
| 600 if (state === STATE_CLOSED || state === STATE_ERRORED) { | 686 if (state === STATE_CLOSED || state === STATE_ERRORED) { |
| 601 return false; | 687 return false; |
| 602 } | 688 } |
| 603 | 689 |
| 604 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | 690 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| 605 return false; | 691 return false; |
| 606 } | 692 } |
| 607 | 693 |
| 608 if (!(stream[readableStreamBits] & STARTED)) { | 694 if (!(controller[readableStreamDefaultControllerBits] & STARTED)) { |
| 609 return false; | 695 return false; |
| 610 } | 696 } |
| 611 | 697 |
| 612 if (IsReadableStreamLocked(stream) === true) { | 698 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque
sts(stream) > 0) { |
| 613 const reader = stream[readableStreamReader]; | 699 return true; |
| 614 const readRequests = reader[readableStreamReaderReadRequests]; | |
| 615 if (readRequests.length > 0) { | |
| 616 return true; | |
| 617 } | |
| 618 } | 700 } |
| 619 | 701 |
| 620 const desiredSize = GetReadableStreamDesiredSize(stream); | 702 const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller
); |
| 621 if (desiredSize > 0) { | 703 if (desiredSize > 0) { |
| 622 return true; | 704 return true; |
| 623 } | 705 } |
| 624 | 706 |
| 625 return false; | 707 return false; |
| 626 } | 708 } |
| 627 | 709 |
| 710 function ReadableStreamGetNumReadRequests(stream) { |
| 711 const reader = stream[readableStreamReader]; |
| 712 const readRequests = reader[readableStreamDefaultReaderReadRequests]; |
| 713 return readRequests.length; |
| 714 } |
| 715 |
| 628 // Potential future optimization: use class instances for the underlying | 716 // Potential future optimization: use class instances for the underlying |
| 629 // sources, so that we don't re-create | 717 // sources, so that we don't re-create |
| 630 // closures every time. | 718 // closures every time. |
| 631 | 719 |
| 632 // TODO(domenic): shouldClone argument from spec not supported yet | 720 // TODO(domenic): shouldClone argument from spec not supported yet |
| 633 function TeeReadableStream(stream) { | 721 function ReadableStreamTee(stream) { |
| 634 const reader = AcquireReadableStreamReader(stream); | 722 const reader = AcquireReadableStreamDefaultReader(stream); |
| 635 | 723 |
| 636 let closedOrErrored = false; | 724 let closedOrErrored = false; |
| 637 let canceled1 = false; | 725 let canceled1 = false; |
| 638 let canceled2 = false; | 726 let canceled2 = false; |
| 639 let reason1; | 727 let reason1; |
| 640 let reason2; | 728 let reason2; |
| 641 let promise = v8.createPromise(); | 729 let promise = v8.createPromise(); |
| 642 | 730 |
| 643 const branch1 = new ReadableStream({pull, cancel: cancel1}); | 731 const branch1Stream = new ReadableStream({pull, cancel: cancel1}); |
| 644 | 732 |
| 645 const branch2 = new ReadableStream({pull, cancel: cancel2}); | 733 const branch2Stream = new ReadableStream({pull, cancel: cancel2}); |
| 734 |
| 735 const branch1 = branch1Stream[readableStreamController]; |
| 736 const branch2 = branch2Stream[readableStreamController]; |
| 646 | 737 |
| 647 thenPromise( | 738 thenPromise( |
| 648 reader[readableStreamReaderClosedPromise], undefined, function(r) { | 739 reader[readableStreamReaderClosedPromise], undefined, function(r) { |
| 649 if (closedOrErrored === true) { | 740 if (closedOrErrored === true) { |
| 650 return; | 741 return; |
| 651 } | 742 } |
| 652 | 743 |
| 653 ErrorReadableStream(branch1, r); | 744 ReadableStreamDefaultControllerError(branch1, r); |
| 654 ErrorReadableStream(branch2, r); | 745 ReadableStreamDefaultControllerError(branch2, r); |
| 655 closedOrErrored = true; | 746 closedOrErrored = true; |
| 656 }); | 747 }); |
| 657 | 748 |
| 658 return [branch1, branch2]; | 749 return [branch1Stream, branch2Stream]; |
| 659 | |
| 660 | 750 |
| 661 function pull() { | 751 function pull() { |
| 662 return thenPromise( | 752 return thenPromise( |
| 663 ReadFromReadableStreamReader(reader), function(result) { | 753 ReadableStreamDefaultReaderRead(reader), function(result) { |
| 664 const value = result.value; | 754 const value = result.value; |
| 665 const done = result.done; | 755 const done = result.done; |
| 666 | 756 |
| 667 if (done === true && closedOrErrored === false) { | 757 if (done === true && closedOrErrored === false) { |
| 668 CloseReadableStream(branch1); | 758 if (canceled1 === false) { |
| 669 CloseReadableStream(branch2); | 759 ReadableStreamDefaultControllerClose(branch1); |
| 760 } |
| 761 if (canceled2 === false) { |
| 762 ReadableStreamDefaultControllerClose(branch2); |
| 763 } |
| 670 closedOrErrored = true; | 764 closedOrErrored = true; |
| 671 } | 765 } |
| 672 | 766 |
| 673 if (closedOrErrored === true) { | 767 if (closedOrErrored === true) { |
| 674 return; | 768 return; |
| 675 } | 769 } |
| 676 | 770 |
| 677 if (canceled1 === false) { | 771 if (canceled1 === false) { |
| 678 EnqueueInReadableStream(branch1, value); | 772 ReadableStreamDefaultControllerEnqueue(branch1, value); |
| 679 } | 773 } |
| 680 | 774 |
| 681 if (canceled2 === false) { | 775 if (canceled2 === false) { |
| 682 EnqueueInReadableStream(branch2, value); | 776 ReadableStreamDefaultControllerEnqueue(branch2, value); |
| 683 } | 777 } |
| 684 }); | 778 }); |
| 685 } | 779 } |
| 686 | 780 |
| 687 function cancel1(reason) { | 781 function cancel1(reason) { |
| 688 canceled1 = true; | 782 canceled1 = true; |
| 689 reason1 = reason; | 783 reason1 = reason; |
| 690 | 784 |
| 691 if (canceled2 === true) { | 785 if (canceled2 === true) { |
| 692 const compositeReason = [reason1, reason2]; | 786 const compositeReason = [reason1, reason2]; |
| 693 const cancelResult = CancelReadableStream(stream, compositeReason); | 787 const cancelResult = ReadableStreamCancel(stream, compositeReason); |
| 694 v8.resolvePromise(promise, cancelResult); | 788 v8.resolvePromise(promise, cancelResult); |
| 695 } | 789 } |
| 696 | 790 |
| 697 return promise; | 791 return promise; |
| 698 } | 792 } |
| 699 | 793 |
| 700 function cancel2(reason) { | 794 function cancel2(reason) { |
| 701 canceled2 = true; | 795 canceled2 = true; |
| 702 reason2 = reason; | 796 reason2 = reason; |
| 703 | 797 |
| 704 if (canceled1 === true) { | 798 if (canceled1 === true) { |
| 705 const compositeReason = [reason1, reason2]; | 799 const compositeReason = [reason1, reason2]; |
| 706 const cancelResult = CancelReadableStream(stream, compositeReason); | 800 const cancelResult = ReadableStreamCancel(stream, compositeReason); |
| 707 v8.resolvePromise(promise, cancelResult); | 801 v8.resolvePromise(promise, cancelResult); |
| 708 } | 802 } |
| 709 | 803 |
| 710 return promise; | 804 return promise; |
| 711 } | 805 } |
| 712 } | 806 } |
| 713 | 807 |
| 714 // | 808 // |
| 715 // Queue-with-sizes | 809 // Queue-with-sizes |
| 716 // Modified from taking the queue (as in the spec) to taking the stream, so we | 810 // Modified from taking the queue (as in the spec) to taking the stream, so we |
| 717 // can modify the queue size alongside. | 811 // can modify the queue size alongside. |
| 718 // | 812 // |
| 719 | 813 |
| 720 function DequeueValue(stream) { | 814 function DequeueValue(controller) { |
| 721 const result = stream[readableStreamQueue].shift(); | 815 const result = controller[readableStreamDefaultControllerQueue].shift(); |
| 722 stream[readableStreamQueueSize] -= result.size; | 816 controller[readableStreamDefaultControllerQueueSize] -= result.size; |
| 723 return result.value; | 817 return result.value; |
| 724 } | 818 } |
| 725 | 819 |
| 726 function EnqueueValueWithSize(stream, value, size) { | 820 function EnqueueValueWithSize(controller, value, size) { |
| 727 size = Number(size); | 821 size = Number(size); |
| 728 if (Number_isNaN(size) || size === +Infinity || size < 0) { | 822 if (Number_isNaN(size) || size === +Infinity || size < 0) { |
| 729 throw new RangeError(errInvalidSize); | 823 throw new RangeError(errInvalidSize); |
| 730 } | 824 } |
| 731 | 825 |
| 732 stream[readableStreamQueueSize] += size; | 826 controller[readableStreamDefaultControllerQueueSize] += size; |
| 733 stream[readableStreamQueue].push({value, size}); | 827 controller[readableStreamDefaultControllerQueue].push({value, size}); |
| 734 } | 828 } |
| 735 | 829 |
| 736 function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } | 830 function GetTotalQueueSize(controller) { return controller[readableStreamDefau
ltControllerQueueSize]; } |
| 737 | 831 |
| 738 // | 832 // |
| 739 // Other helpers | 833 // Other helpers |
| 740 // | 834 // |
| 741 | 835 |
| 742 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { | 836 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { |
| 743 if (size !== undefined && typeof size !== 'function') { | 837 if (size !== undefined && typeof size !== 'function') { |
| 744 throw new TypeError(errSizeNotAFunction); | 838 throw new TypeError(errSizeNotAFunction); |
| 745 } | 839 } |
| 746 | 840 |
| (...skipping 29 matching lines...) Expand all Loading... |
| 776 method = O[P]; | 870 method = O[P]; |
| 777 } catch (methodE) { | 871 } catch (methodE) { |
| 778 return Promise_reject(methodE); | 872 return Promise_reject(methodE); |
| 779 } | 873 } |
| 780 | 874 |
| 781 if (method === undefined) { | 875 if (method === undefined) { |
| 782 return Promise_resolve(undefined); | 876 return Promise_resolve(undefined); |
| 783 } | 877 } |
| 784 | 878 |
| 785 if (typeof method !== 'function') { | 879 if (typeof method !== 'function') { |
| 786 return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); | 880 return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameF
orError))); |
| 787 } | 881 } |
| 788 | 882 |
| 789 try { | 883 try { |
| 790 return Promise_resolve(callFunction(method, O, arg)); | 884 return Promise_resolve(callFunction(method, O, arg)); |
| 791 } catch (e) { | 885 } catch (e) { |
| 792 return Promise_reject(e); | 886 return Promise_reject(e); |
| 793 } | 887 } |
| 794 } | 888 } |
| 795 | 889 |
| 796 function CreateIterResultObject(value, done) { return {value, done}; } | 890 function CreateIterResultObject(value, done) { return {value, done}; } |
| 797 | 891 |
| 798 | 892 |
| 799 // | 893 // |
| 800 // Additions to the global | 894 // Additions to the global |
| 801 // | 895 // |
| 802 | 896 |
| 803 defineProperty(global, 'ReadableStream', { | 897 defineProperty(global, 'ReadableStream', { |
| 804 value: ReadableStream, | 898 value: ReadableStream, |
| 805 enumerable: false, | 899 enumerable: false, |
| 806 configurable: true, | 900 configurable: true, |
| 807 writable: true | 901 writable: true |
| 808 }); | 902 }); |
| 809 | 903 |
| 810 // | 904 // |
| 811 // Exports to Blink | 905 // Exports to Blink |
| 812 // | 906 // |
| 813 | 907 |
| 814 binding.AcquireReadableStreamReader = AcquireReadableStreamReader; | 908 binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReade
r; |
| 815 binding.IsReadableStream = IsReadableStream; | 909 binding.IsReadableStream = IsReadableStream; |
| 816 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; | 910 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; |
| 817 binding.IsReadableStreamLocked = IsReadableStreamLocked; | 911 binding.IsReadableStreamLocked = IsReadableStreamLocked; |
| 818 binding.IsReadableStreamReadable = IsReadableStreamReadable; | 912 binding.IsReadableStreamReadable = IsReadableStreamReadable; |
| 819 binding.IsReadableStreamClosed = IsReadableStreamClosed; | 913 binding.IsReadableStreamClosed = IsReadableStreamClosed; |
| 820 binding.IsReadableStreamErrored = IsReadableStreamErrored; | 914 binding.IsReadableStreamErrored = IsReadableStreamErrored; |
| 821 binding.IsReadableStreamReader = IsReadableStreamReader; | 915 binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader; |
| 822 binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader; | 916 binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead; |
| 823 | 917 |
| 824 binding.CloseReadableStream = CloseReadableStream; | 918 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController
Close; |
| 825 binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize; | 919 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC
ontrollerGetDesiredSize; |
| 826 binding.EnqueueInReadableStream = EnqueueInReadableStream; | 920 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll
erEnqueue; |
| 827 binding.ErrorReadableStream = ErrorReadableStream; | 921 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController
Error; |
| 828 | 922 |
| 829 binding.createReadableStreamWithExternalController = | 923 binding.createReadableStreamWithExternalController = |
| 830 (underlyingSource, strategy) => { | 924 (underlyingSource, strategy) => { |
| 831 return new ReadableStream( | 925 return new ReadableStream( |
| 832 underlyingSource, strategy, createWithExternalControllerSentinel); | 926 underlyingSource, strategy, createWithExternalControllerSentinel); |
| 833 }; | 927 }; |
| 834 }); | 928 }); |
| OLD | NEW |