Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 (function(global, binding, v8) { | 5 (function(global, binding, v8) { |
| 6 'use strict'; | 6 'use strict'; |
| 7 | 7 |
| 8 const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); | |
| 9 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); | |
| 8 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); | 10 const readableStreamController = v8.createPrivateSymbol('[[controller]]'); |
| 9 const readableStreamQueue = v8.createPrivateSymbol('[[queue]]'); | |
| 10 const readableStreamQueueSize = | |
| 11 v8.createPrivateSymbol('[[queue]] total size'); | |
| 12 const readableStreamReader = v8.createPrivateSymbol('[[reader]]'); | |
| 13 const readableStreamState = v8.createPrivateSymbol('[[state]]'); | |
| 14 const readableStreamStoredError = v8.createPrivateSymbol('[[storedError]]'); | |
| 15 const readableStreamStrategySize = v8.createPrivateSymbol('[[strategySize]]'); | |
| 16 const readableStreamStrategyHWM = v8.createPrivateSymbol('[[strategyHWM]]'); | |
| 17 const readableStreamUnderlyingSource = | |
| 18 v8.createPrivateSymbol('[[underlyingSource]]'); | |
| 19 | |
| 20 const readableStreamControllerControlledReadableStream = | |
| 21 v8.createPrivateSymbol('[[controlledReadableStream]]'); | |
| 22 | 11 |
| 23 const readableStreamReaderClosedPromise = | 12 const readableStreamReaderClosedPromise = |
| 24 v8.createPrivateSymbol('[[closedPromise]]'); | 13 v8.createPrivateSymbol('[[closedPromise]]'); |
| 25 const readableStreamReaderOwnerReadableStream = | 14 const readableStreamReaderOwnerReadableStream = |
| 26 v8.createPrivateSymbol('[[ownerReadableStream]]'); | 15 v8.createPrivateSymbol('[[ownerReadableStream]]'); |
| 27 const readableStreamReaderReadRequests = | 16 |
| 17 const readableStreamDefaultReaderReadRequests = | |
| 28 v8.createPrivateSymbol('[[readRequests]]'); | 18 v8.createPrivateSymbol('[[readRequests]]'); |
| 29 | 19 |
| 30 const createWithExternalControllerSentinel = | 20 const createWithExternalControllerSentinel = |
| 31 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass'); | 21 v8.createPrivateSymbol('flag for UA-created ReadableStream to pass'); |
| 32 | 22 |
| 23 const readableStreamBits = v8.createPrivateSymbol('bit field for [[state]] and [[disturbed]]'); | |
| 24 const DISTURBED = 0b1; | |
| 25 // The 2nd and 3rd bit are for [[state]]. | |
| 26 const STATE_MASK = 0b110; | |
| 27 const STATE_BITS_OFFSET = 1; | |
| 33 const STATE_READABLE = 0; | 28 const STATE_READABLE = 0; |
| 34 const STATE_CLOSED = 1; | 29 const STATE_CLOSED = 1; |
| 35 const STATE_ERRORED = 2; | 30 const STATE_ERRORED = 2; |
| 36 | 31 |
| 37 const readableStreamBits = v8.createPrivateSymbol( | 32 const readableStreamDefaultControllerUnderlyingSource = |
| 38 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] , [[disturbed]]'); | 33 v8.createPrivateSymbol('[[underlyingSource]]'); |
| 34 const readableStreamDefaultControllerControlledReadableStream = | |
| 35 v8.createPrivateSymbol('[[controlledReadableStream]]'); | |
| 36 const readableStreamDefaultControllerQueue = v8.createPrivateSymbol('[[queue]] '); | |
| 37 const readableStreamDefaultControllerQueueSize = | |
| 38 v8.createPrivateSymbol('[[queue]] total size'); | |
| 39 const readableStreamDefaultControllerStrategySize = | |
| 40 v8.createPrivateSymbol('[[strategySize]]'); | |
| 41 const readableStreamDefaultControllerStrategyHWM = | |
| 42 v8.createPrivateSymbol('[[strategyHWM]]'); | |
| 43 | |
| 44 const readableStreamDefaultControllerBits = v8.createPrivateSymbol( | |
| 45 'bit field for [[started]], [[closeRequested]], [[pulling]], [[pullAgain]] '); | |
| 39 const STARTED = 0b1; | 46 const STARTED = 0b1; |
| 40 const CLOSE_REQUESTED = 0b10; | 47 const CLOSE_REQUESTED = 0b10; |
| 41 const PULLING = 0b100; | 48 const PULLING = 0b100; |
| 42 const PULL_AGAIN = 0b1000; | 49 const PULL_AGAIN = 0b1000; |
| 43 const DISTURBED = 0b10000; | 50 const EXTERNALLY_CONTROLLED = 0b10000; |
| 51 | |
| 52 const readableStreamControllerCancel = | |
| 53 v8.createPrivateSymbol('[[InternalCancel]]'); | |
| 54 const readableStreamControllerPull = v8.createPrivateSymbol('[[InternalPull]]' ); | |
| 44 | 55 |
| 45 const undefined = global.undefined; | 56 const undefined = global.undefined; |
| 46 const Infinity = global.Infinity; | 57 const Infinity = global.Infinity; |
| 47 | 58 |
| 48 const defineProperty = global.Object.defineProperty; | 59 const defineProperty = global.Object.defineProperty; |
| 49 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); | 60 const hasOwnProperty = v8.uncurryThis(global.Object.hasOwnProperty); |
| 50 const callFunction = v8.uncurryThis(global.Function.prototype.call); | 61 const callFunction = v8.uncurryThis(global.Function.prototype.call); |
| 51 | 62 |
| 52 const TypeError = global.TypeError; | 63 const TypeError = global.TypeError; |
| 53 const RangeError = global.RangeError; | 64 const RangeError = global.RangeError; |
| 54 | 65 |
| 55 const Number = global.Number; | 66 const Number = global.Number; |
| 56 const Number_isNaN = Number.isNaN; | 67 const Number_isNaN = Number.isNaN; |
| 57 const Number_isFinite = Number.isFinite; | 68 const Number_isFinite = Number.isFinite; |
| 58 | 69 |
| 59 const Promise = global.Promise; | 70 const Promise = global.Promise; |
| 60 const thenPromise = v8.uncurryThis(Promise.prototype.then); | 71 const thenPromise = v8.uncurryThis(Promise.prototype.then); |
| 61 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); | 72 const Promise_resolve = v8.simpleBind(Promise.resolve, Promise); |
| 62 const Promise_reject = v8.simpleBind(Promise.reject, Promise); | 73 const Promise_reject = v8.simpleBind(Promise.reject, Promise); |
| 63 | 74 |
| 64 const errIllegalInvocation = 'Illegal invocation'; | 75 const errIllegalInvocation = 'Illegal invocation'; |
| 65 const errIllegalConstructor = 'Illegal constructor'; | 76 const errIllegalConstructor = 'Illegal constructor'; |
| 66 const errCancelLockedStream = | 77 const errCancelLockedStream = |
| 67 'Cannot cancel a readable stream that is locked to a reader'; | 78 'Cannot cancel a readable stream that is locked to a reader'; |
| 68 const errEnqueueInCloseRequestedStream = | 79 const errEnqueueCloseRequestedStream = |
| 69 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; | 80 'Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed'; |
| 70 const errCancelReleasedReader = | 81 const errCancelReleasedReader = |
| 71 'This readable stream reader has been released and cannot be used to cance l its previous owner stream'; | 82 'This readable stream reader has been released and cannot be used to cance l its previous owner stream'; |
| 72 const errReadReleasedReader = | 83 const errReadReleasedReader = |
| 73 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; | 84 'This readable stream reader has been released and cannot be used to read from its previous owner stream'; |
| 74 const errCloseCloseRequestedStream = | 85 const errCloseCloseRequestedStream = |
| 75 'Cannot close a readable stream that has already been requested to be clos ed'; | 86 'Cannot close a readable stream that has already been requested to be clos ed'; |
| 87 const errEnqueueClosedStream = 'Cannot enqueue a chunk into a closed readable stream'; | |
| 88 const errEnqueueErroredStream = 'Cannot enqueue a chunk into an errored readab le stream'; | |
| 89 const errCloseClosedStream = 'Cannot close a closed readable stream'; | |
| 76 const errCloseErroredStream = 'Cannot close an errored readable stream'; | 90 const errCloseErroredStream = 'Cannot close an errored readable stream'; |
| 77 const errErrorClosedStream = 'Cannot error a close readable stream'; | 91 const errErrorClosedStream = 'Cannot error a close readable stream'; |
| 78 const errErrorErroredStream = | 92 const errErrorErroredStream = |
| 79 'Cannot error a readable stream that is already errored'; | 93 'Cannot error a readable stream that is already errored'; |
| 94 const errGetReaderNotByteStream = 'This readable stream does not support BYOB readers'; | |
| 95 const errGetReaderBadMode = 'Invalid reader mode given: expected undefined or "byob"'; | |
| 80 const errReaderConstructorBadArgument = | 96 const errReaderConstructorBadArgument = |
| 81 'ReadableStreamReader constructor argument is not a readable stream'; | 97 'ReadableStreamReader constructor argument is not a readable stream'; |
| 82 const errReaderConstructorStreamAlreadyLocked = | 98 const errReaderConstructorStreamAlreadyLocked = |
| 83 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; | 99 'ReadableStreamReader constructor can only accept readable streams that ar e not yet locked to a reader'; |
| 84 const errReleaseReaderWithPendingRead = | 100 const errReleaseReaderWithPendingRead = |
| 85 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; | 101 'Cannot release a readable stream reader when it still has outstanding rea d() calls that have not yet settled'; |
| 86 const errReleasedReaderClosedPromise = | 102 const errReleasedReaderClosedPromise = |
| 87 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; | 103 'This readable stream reader has been released and cannot be used to monit or the stream\'s state'; |
| 88 const errInvalidSize = | 104 const errInvalidSize = |
| 89 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number'; | 105 'The return value of a queuing strategy\'s size function must be a finite, non-NaN, non-negative number'; |
| 90 const errSizeNotAFunction = | 106 const errSizeNotAFunction = |
| 91 'A queuing strategy\'s size property must be a function'; | 107 'A queuing strategy\'s size property must be a function'; |
| 92 const errInvalidHWM = | 108 const errInvalidHWM = |
| 93 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number'; | 109 'A queueing strategy\'s highWaterMark property must be a nonnegative, non- NaN number'; |
| 94 const errTmplMustBeFunctionOrUndefined = name => | 110 const errTmplMustBeFunctionOrUndefined = name => |
| 95 `${name} must be a function or undefined`; | 111 `${name} must be a function or undefined`; |
| 96 | 112 |
| 97 class ReadableStream { | 113 class ReadableStream { |
| 98 constructor() { | 114 constructor() { |
| 99 // TODO(domenic): when V8 gets default parameters and destructuring, all | 115 // TODO(domenic): when V8 gets default parameters and destructuring, all |
| 100 // this can be cleaned up. | 116 // this can be cleaned up. |
| 101 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; | 117 const underlyingSource = arguments[0] === undefined ? {} : arguments[0]; |
| 102 const strategy = arguments[1] === undefined ? {} : arguments[1]; | 118 const strategy = arguments[1] === undefined ? {} : arguments[1]; |
| 103 const size = strategy.size; | 119 const size = strategy.size; |
| 104 let highWaterMark = strategy.highWaterMark; | 120 let highWaterMark = strategy.highWaterMark; |
| 105 if (highWaterMark === undefined) { | 121 if (highWaterMark === undefined) { |
| 106 highWaterMark = 1; | 122 highWaterMark = 1; |
| 107 } | 123 } |
| 108 | 124 |
| 109 const normalizedStrategy = | 125 this[readableStreamBits] = 0b0 | STATE_READABLE; |
|
yhirano
2016/05/02 13:17:03
STATE_READBLE << OFFSET?
tyoshino (SeeGerritForStatus)
2016/05/06 06:24:57
Changed to use ReadableStreamSetState()
| |
| 110 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | |
| 111 | |
| 112 this[readableStreamUnderlyingSource] = underlyingSource; | |
| 113 | |
| 114 this[readableStreamQueue] = new v8.InternalPackedArray(); | |
| 115 this[readableStreamQueueSize] = 0; | |
| 116 | |
| 117 this[readableStreamState] = STATE_READABLE; | |
| 118 this[readableStreamBits] = 0b0; | |
| 119 this[readableStreamReader] = undefined; | 126 this[readableStreamReader] = undefined; |
| 120 this[readableStreamStoredError] = undefined; | 127 this[readableStreamStoredError] = undefined; |
| 121 | 128 |
| 122 this[readableStreamStrategySize] = normalizedStrategy.size; | |
| 123 this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; | |
| 124 | |
| 125 // Avoid allocating the controller if the stream is going to be controlled | 129 // Avoid allocating the controller if the stream is going to be controlled |
| 126 // externally (i.e. from C++) anyway. All calls to underlyingSource | 130 // externally (i.e. from C++) anyway. All calls to underlyingSource |
| 127 // methods will disregard their controller argument in such situations | 131 // methods will disregard their controller argument in such situations |
| 128 // (but see below). | 132 // (but see below). |
| 129 | 133 |
| 130 const isControlledExternally = | 134 this[readableStreamController] = undefined; |
| 131 arguments[2] === createWithExternalControllerSentinel; | |
| 132 const controller = | |
| 133 isControlledExternally ? null : new ReadableStreamController(this); | |
| 134 this[readableStreamController] = controller; | |
| 135 | 135 |
| 136 // We need to pass ourself to the underlyingSource start method for | 136 const type = underlyingSource.type; |
| 137 // externally-controlled streams. We use the now-useless controller | 137 const typeString = String(type); |
| 138 // argument to do so. | 138 if (typeString === 'bytes') { |
| 139 const argToStart = isControlledExternally ? this : controller; | 139 throw new RangeError('bytes type is not yet implemented'); |
| 140 } else if (type !== undefined) { | |
| 141 throw new RangeError('Invalid type is specified'); | |
| 142 } | |
| 140 | 143 |
| 141 const startResult = CallOrNoop( | 144 this[readableStreamController] = |
| 142 underlyingSource, 'start', argToStart, 'underlyingSource.start'); | 145 new ReadableStreamDefaultController(this, underlyingSource, size, high WaterMark, arguments[2] === createWithExternalControllerSentinel); |
| 143 thenPromise(Promise_resolve(startResult), | |
| 144 () => { | |
| 145 this[readableStreamBits] |= STARTED; | |
| 146 RequestReadableStreamPull(this); | |
| 147 }, | |
| 148 r => { | |
| 149 if (this[readableStreamState] === STATE_READABLE) { | |
| 150 return ErrorReadableStream(this, r); | |
| 151 } | |
| 152 }); | |
| 153 } | 146 } |
| 154 | 147 |
| 155 get locked() { | 148 get locked() { |
| 156 if (IsReadableStream(this) === false) { | 149 if (IsReadableStream(this) === false) { |
| 157 throw new TypeError(errIllegalInvocation); | 150 throw new TypeError(errIllegalInvocation); |
| 158 } | 151 } |
| 159 | 152 |
| 160 return IsReadableStreamLocked(this); | 153 return IsReadableStreamLocked(this); |
| 161 } | 154 } |
| 162 | 155 |
| 163 cancel(reason) { | 156 cancel(reason) { |
| 164 if (IsReadableStream(this) === false) { | 157 if (IsReadableStream(this) === false) { |
| 165 return Promise_reject(new TypeError(errIllegalInvocation)); | 158 return Promise_reject(new TypeError(errIllegalInvocation)); |
| 166 } | 159 } |
| 167 | 160 |
| 168 if (IsReadableStreamLocked(this) === true) { | 161 if (IsReadableStreamLocked(this) === true) { |
| 169 return Promise_reject(new TypeError(errCancelLockedStream)); | 162 return Promise_reject(new TypeError(errCancelLockedStream)); |
| 170 } | 163 } |
| 171 | 164 |
| 172 return CancelReadableStream(this, reason); | 165 return ReadableStreamCancel(this, reason); |
| 173 } | 166 } |
| 174 | 167 |
| 175 getReader() { | 168 getReader({ mode } = {}) { |
| 176 if (IsReadableStream(this) === false) { | 169 if (IsReadableStream(this) === false) { |
| 177 throw new TypeError(errIllegalInvocation); | 170 throw new TypeError(errIllegalInvocation); |
| 178 } | 171 } |
| 179 | 172 |
| 180 return AcquireReadableStreamReader(this); | 173 if (mode === 'byob') { |
| 174 if (IsReadableByteStreamDefaultController(this[readableStreamController] ) === false) { | |
| 175 throw new TypeError(errGetReaderNotByteStream); | |
| 176 } | |
| 177 | |
| 178 return AcquireReadableStreamBYOBReader(this); | |
| 179 } | |
| 180 | |
| 181 if (mode === undefined) { | |
| 182 return AcquireReadableStreamDefaultReader(this); | |
| 183 } | |
| 184 | |
| 185 throw new RangeError(errGetReaderBadMode);; | |
|
yhirano
2016/05/02 13:17:03
;;
tyoshino (SeeGerritForStatus)
2016/05/06 06:24:57
Done.
| |
| 181 } | 186 } |
| 182 | 187 |
| 183 tee() { | 188 tee() { |
| 184 if (IsReadableStream(this) === false) { | 189 if (IsReadableStream(this) === false) { |
| 185 throw new TypeError(errIllegalInvocation); | 190 throw new TypeError(errIllegalInvocation); |
| 186 } | 191 } |
| 187 | 192 |
| 188 return TeeReadableStream(this); | 193 return ReadableStreamTee(this); |
| 189 } | 194 } |
| 190 } | 195 } |
| 191 | 196 |
| 192 class ReadableStreamController { | 197 class ReadableStreamDefaultController { |
| 193 constructor(stream) { | 198 constructor(stream, underlyingSource, size, highWaterMark, isExternallyContr olled) { |
| 194 if (IsReadableStream(stream) === false) { | 199 if (IsReadableStream(stream) === false) { |
| 195 throw new TypeError(errIllegalConstructor); | 200 throw new TypeError(errIllegalConstructor); |
| 196 } | 201 } |
| 197 | 202 |
| 198 if (stream[readableStreamController] !== undefined) { | 203 if (stream[readableStreamController] !== undefined) { |
| 199 throw new TypeError(errIllegalConstructor); | 204 throw new TypeError(errIllegalConstructor); |
| 200 } | 205 } |
| 201 | 206 |
| 202 this[readableStreamControllerControlledReadableStream] = stream; | 207 this[readableStreamDefaultControllerControlledReadableStream] = stream; |
| 208 | |
| 209 this[readableStreamDefaultControllerUnderlyingSource] = underlyingSource; | |
| 210 | |
| 211 this[readableStreamDefaultControllerQueue] = new v8.InternalPackedArray(); | |
| 212 this[readableStreamDefaultControllerQueueSize] = 0; | |
| 213 | |
| 214 this[readableStreamDefaultControllerBits] = 0b0; | |
| 215 if (isExternallyControlled === true) { | |
| 216 this[readableStreamDefaultControllerBits] |= EXTERNALLY_CONTROLLED; | |
| 217 } | |
| 218 | |
| 219 const normalizedStrategy = | |
| 220 ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | |
| 221 this[readableStreamDefaultControllerStrategySize] = normalizedStrategy.siz e; | |
| 222 this[readableStreamDefaultControllerStrategyHWM] = normalizedStrategy.high WaterMark; | |
| 223 | |
| 224 const controller = this; | |
| 225 | |
| 226 const startResult = CallOrNoop( | |
| 227 underlyingSource, 'start', this, 'underlyingSource.start'); | |
| 228 thenPromise(Promise_resolve(startResult), | |
| 229 () => { | |
| 230 controller[readableStreamDefaultControllerBits] |= STARTED; | |
| 231 ReadableStreamDefaultControllerCallPullIfNeeded(controller); | |
| 232 }, | |
| 233 r => { | |
| 234 if ((stream[readableStreamBits] & STATE_MASK) >> 1 === STATE_READABL E) { | |
|
yhirano
2016/05/02 13:17:03
ReadableStreamGetState(this) or s/1/STATE_BITS_OFF
tyoshino (SeeGerritForStatus)
2016/05/06 06:24:56
Replaced with ReadableStreamGetState() call.
| |
| 235 ReadableStreamDefaultControllerError(controller, r); | |
| 236 } | |
| 237 }); | |
| 203 } | 238 } |
| 204 | 239 |
| 205 get desiredSize() { | 240 get desiredSize() { |
| 206 if (IsReadableStreamController(this) === false) { | 241 if (IsReadableStreamDefaultController(this) === false) { |
| 207 throw new TypeError(errIllegalInvocation); | 242 throw new TypeError(errIllegalInvocation); |
| 208 } | 243 } |
| 209 | 244 |
| 210 return GetReadableStreamDesiredSize( | 245 return ReadableStreamDefaultControllerGetDesiredSize(this); |
| 211 this[readableStreamControllerControlledReadableStream]); | |
| 212 } | 246 } |
| 213 | 247 |
| 214 close() { | 248 close() { |
| 215 if (IsReadableStreamController(this) === false) { | 249 if (IsReadableStreamDefaultController(this) === false) { |
| 216 throw new TypeError(errIllegalInvocation); | 250 throw new TypeError(errIllegalInvocation); |
| 217 } | 251 } |
| 218 | 252 |
| 219 const stream = this[readableStreamControllerControlledReadableStream]; | 253 const stream = this[readableStreamDefaultControllerControlledReadableStrea m]; |
| 220 | 254 |
| 221 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | 255 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| 222 throw new TypeError(errCloseCloseRequestedStream); | 256 throw new TypeError(errCloseCloseRequestedStream); |
| 223 } | 257 } |
| 224 if (stream[readableStreamState] === STATE_ERRORED) { | 258 |
| 259 const state = (stream[readableStreamBits] & STATE_MASK) >> 1; | |
|
yhirano
2016/05/02 13:17:04
ditto
tyoshino (SeeGerritForStatus)
2016/05/06 06:24:56
Replaced with ReadableStreamGetState() call.
| |
| 260 if (state === STATE_ERRORED) { | |
| 225 throw new TypeError(errCloseErroredStream); | 261 throw new TypeError(errCloseErroredStream); |
| 226 } | 262 } |
| 263 if (state === STATE_CLOSED) { | |
| 264 throw new TypeError(errCloseClosedStream); | |
| 265 } | |
| 227 | 266 |
| 228 return CloseReadableStream(stream); | 267 return ReadableStreamDefaultControllerClose(this); |
| 229 } | 268 } |
| 230 | 269 |
| 231 enqueue(chunk) { | 270 enqueue(chunk) { |
| 232 if (IsReadableStreamController(this) === false) { | 271 if (IsReadableStreamDefaultController(this) === false) { |
| 233 throw new TypeError(errIllegalInvocation); | 272 throw new TypeError(errIllegalInvocation); |
| 234 } | 273 } |
| 235 | 274 |
| 236 const stream = this[readableStreamControllerControlledReadableStream]; | 275 const stream = this[readableStreamDefaultControllerControlledReadableStrea m]; |
| 237 | 276 |
| 238 if (stream[readableStreamState] === STATE_ERRORED) { | 277 if (this[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| 239 throw stream[readableStreamStoredError]; | 278 throw new TypeError(errEnqueueCloseRequestedStream); |
| 240 } | 279 } |
| 241 | 280 |
| 242 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | 281 const state = (stream[readableStreamBits] & STATE_MASK) >> 1; |
|
yhirano
2016/05/02 13:17:04
ditto
tyoshino (SeeGerritForStatus)
2016/05/06 06:24:57
Replaced with ReadableStreamGetState() call.
| |
| 243 throw new TypeError(errEnqueueInCloseRequestedStream); | 282 if (state === STATE_ERRORED) { |
| 283 throw new TypeError(errEnqueueErroredStream); | |
| 284 } | |
| 285 if (state === STATE_CLOSED) { | |
| 286 throw new TypeError(errEnqueueClosedStream); | |
| 244 } | 287 } |
| 245 | 288 |
| 246 return EnqueueInReadableStream(stream, chunk); | 289 return ReadableStreamDefaultControllerEnqueue(this, chunk); |
| 247 } | 290 } |
| 248 | 291 |
| 249 error(e) { | 292 error(e) { |
| 250 if (IsReadableStreamController(this) === false) { | 293 if (IsReadableStreamDefaultController(this) === false) { |
| 251 throw new TypeError(errIllegalInvocation); | 294 throw new TypeError(errIllegalInvocation); |
| 252 } | 295 } |
| 253 | 296 |
| 254 const stream = this[readableStreamControllerControlledReadableStream]; | 297 const stream = this[readableStreamDefaultControllerControlledReadableStrea m]; |
| 255 | 298 |
| 256 const state = stream[readableStreamState]; | 299 const state = (stream[readableStreamBits] & STATE_MASK) >> 1; |
|
yhirano
2016/05/02 13:17:04
ditto
tyoshino (SeeGerritForStatus)
2016/05/06 06:24:57
Replaced with ReadableStreamGetState() call.
| |
| 257 if (state !== STATE_READABLE) { | 300 if (state === STATE_ERRORED) { |
| 258 if (state === STATE_ERRORED) { | 301 throw new TypeError(errErrorErroredStream); |
| 259 throw new TypeError(errErrorErroredStream); | 302 } |
| 260 } | 303 if (state === STATE_CLOSED) { |
| 261 if (state === STATE_CLOSED) { | 304 throw new TypeError(errErrorClosedStream); |
| 262 throw new TypeError(errErrorClosedStream); | |
| 263 } | |
| 264 } | 305 } |
| 265 | 306 |
| 266 return ErrorReadableStream(stream, e); | 307 return ReadableStreamDefaultControllerError(this, e); |
| 267 } | 308 } |
| 268 } | 309 } |
| 269 | 310 |
| 270 class ReadableStreamReader { | 311 function ReadableStreamDefaultControllerCancel(controller, reason) { |
| 312 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y(); | |
| 313 | |
| 314 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource]; | |
| 315 return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSour ce.cancel'); | |
| 316 } | |
| 317 | |
| 318 function ReadableStreamDefaultControllerPull(controller) { | |
| 319 const stream = controller[readableStreamDefaultControllerControlledReadableS tream]; | |
| 320 | |
| 321 if (controller[readableStreamDefaultControllerQueue].length > 0) { | |
| 322 const chunk = DequeueValue(controller); | |
| 323 | |
| 324 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED && c ontroller[readableStreamDefaultControllerQueue].length === 0) { | |
|
yhirano
2016/05/02 13:17:04
Can you add a pair of parenthesis around (controll
tyoshino (SeeGerritForStatus)
2016/05/06 06:24:57
Done.
| |
| 325 ReadableStreamClose(stream); | |
| 326 } else { | |
| 327 ReadableStreamDefaultControllerCallPullIfNeeded(controller); | |
| 328 } | |
| 329 | |
| 330 return Promise_resolve(CreateIterResultObject(chunk, false)); | |
| 331 } | |
| 332 | |
| 333 const pendingPromise = ReadableStreamAddReadRequest(stream); | |
| 334 ReadableStreamDefaultControllerCallPullIfNeeded(controller); | |
| 335 return pendingPromise; | |
| 336 } | |
| 337 | |
| 338 function ReadableStreamAddReadRequest(stream) { | |
| 339 const promise = v8.createPromise(); | |
| 340 stream[readableStreamReader][readableStreamDefaultReaderReadRequests].push(p romise); | |
| 341 return promise; | |
| 342 } | |
| 343 | |
| 344 class ReadableStreamDefaultReader { | |
| 271 constructor(stream) { | 345 constructor(stream) { |
| 272 if (IsReadableStream(stream) === false) { | 346 if (IsReadableStream(stream) === false) { |
| 273 throw new TypeError(errReaderConstructorBadArgument); | 347 throw new TypeError(errReaderConstructorBadArgument); |
| 274 } | 348 } |
| 275 if (IsReadableStreamLocked(stream) === true) { | 349 if (IsReadableStreamLocked(stream) === true) { |
| 276 throw new TypeError(errReaderConstructorStreamAlreadyLocked); | 350 throw new TypeError(errReaderConstructorStreamAlreadyLocked); |
| 277 } | 351 } |
| 278 | 352 |
| 279 // TODO(yhirano): Remove this when we don't need hasPendingActivity in | 353 ReadableStreamReaderGenericInitialize(this, stream); |
| 280 // blink::UnderlyingSourceBase. | |
| 281 if (stream[readableStreamController] === null) { | |
| 282 // The stream is created with an external controller (i.e. made in | |
| 283 // Blink). | |
| 284 const underlyingSource = stream[readableStreamUnderlyingSource]; | |
| 285 callFunction(underlyingSource.notifyLockAcquired, underlyingSource); | |
| 286 } | |
| 287 | 354 |
| 288 this[readableStreamReaderOwnerReadableStream] = stream; | 355 this[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArray (); |
| 289 stream[readableStreamReader] = this; | |
| 290 | |
| 291 this[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
| 292 | |
| 293 switch (stream[readableStreamState]) { | |
| 294 case STATE_READABLE: | |
| 295 this[readableStreamReaderClosedPromise] = v8.createPromise(); | |
| 296 break; | |
| 297 case STATE_CLOSED: | |
| 298 this[readableStreamReaderClosedPromise] = Promise_resolve(undefined); | |
| 299 break; | |
| 300 case STATE_ERRORED: | |
| 301 this[readableStreamReaderClosedPromise] = | |
| 302 Promise_reject(stream[readableStreamStoredError]); | |
| 303 break; | |
| 304 } | |
| 305 } | 356 } |
| 306 | 357 |
| 307 get closed() { | 358 get closed() { |
| 308 if (IsReadableStreamReader(this) === false) { | 359 if (IsReadableStreamDefaultReader(this) === false) { |
| 309 return Promise_reject(new TypeError(errIllegalInvocation)); | 360 return Promise_reject(new TypeError(errIllegalInvocation)); |
| 310 } | 361 } |
| 311 | 362 |
| 312 return this[readableStreamReaderClosedPromise]; | 363 return this[readableStreamReaderClosedPromise]; |
| 313 } | 364 } |
| 314 | 365 |
| 315 cancel(reason) { | 366 cancel(reason) { |
| 316 if (IsReadableStreamReader(this) === false) { | 367 if (IsReadableStreamDefaultReader(this) === false) { |
| 317 return Promise_reject(new TypeError(errIllegalInvocation)); | 368 return Promise_reject(new TypeError(errIllegalInvocation)); |
| 318 } | 369 } |
| 319 | 370 |
| 320 const stream = this[readableStreamReaderOwnerReadableStream]; | 371 const stream = this[readableStreamReaderOwnerReadableStream]; |
| 321 if (stream === undefined) { | 372 if (stream === undefined) { |
| 322 return Promise_reject(new TypeError(errCancelReleasedReader)); | 373 return Promise_reject(new TypeError(errCancelReleasedReader)); |
| 323 } | 374 } |
| 324 | 375 |
| 325 return CancelReadableStream(stream, reason); | 376 return ReadableStreamReaderGenericCancel(this, reason); |
| 326 } | 377 } |
| 327 | 378 |
| 328 read() { | 379 read() { |
| 329 if (IsReadableStreamReader(this) === false) { | 380 if (IsReadableStreamDefaultReader(this) === false) { |
| 330 return Promise_reject(new TypeError(errIllegalInvocation)); | 381 return Promise_reject(new TypeError(errIllegalInvocation)); |
| 331 } | 382 } |
| 332 | 383 |
| 333 if (this[readableStreamReaderOwnerReadableStream] === undefined) { | 384 if (this[readableStreamReaderOwnerReadableStream] === undefined) { |
| 334 return Promise_reject(new TypeError(errReadReleasedReader)); | 385 return Promise_reject(new TypeError(errReadReleasedReader)); |
| 335 } | 386 } |
| 336 | 387 |
| 337 return ReadFromReadableStreamReader(this); | 388 return ReadableStreamDefaultReaderRead(this); |
| 338 } | 389 } |
| 339 | 390 |
| 340 releaseLock() { | 391 releaseLock() { |
| 341 if (IsReadableStreamReader(this) === false) { | 392 if (IsReadableStreamDefaultReader(this) === false) { |
| 342 throw new TypeError(errIllegalInvocation); | 393 throw new TypeError(errIllegalInvocation); |
| 343 } | 394 } |
| 344 | 395 |
| 345 const stream = this[readableStreamReaderOwnerReadableStream]; | 396 const stream = this[readableStreamReaderOwnerReadableStream]; |
| 346 if (stream === undefined) { | 397 if (stream === undefined) { |
| 347 return undefined; | 398 return undefined; |
| 348 } | 399 } |
| 349 | 400 |
| 350 if (this[readableStreamReaderReadRequests].length > 0) { | 401 if (this[readableStreamDefaultReaderReadRequests].length > 0) { |
| 351 throw new TypeError(errReleaseReaderWithPendingRead); | 402 throw new TypeError(errReleaseReaderWithPendingRead); |
| 352 } | 403 } |
| 353 | 404 |
| 354 // TODO(yhirano): Remove this when we don't need hasPendingActivity in | 405 ReadableStreamReaderGenericRelease(this); |
| 355 // blink::UnderlyingSourceBase. | 406 } |
| 356 if (stream[readableStreamController] === null) { | 407 } |
| 357 // The stream is created with an external controller (i.e. made in | |
| 358 // Blink). | |
| 359 const underlyingSource = stream[readableStreamUnderlyingSource]; | |
| 360 callFunction(underlyingSource.notifyLockReleased, underlyingSource); | |
| 361 } | |
| 362 | 408 |
| 363 if (stream[readableStreamState] === STATE_READABLE) { | 409 function ReadableStreamReaderGenericCancel(reader, reason) { |
| 364 v8.rejectPromise(this[readableStreamReaderClosedPromise], | 410 return ReadableStreamCancel(reader[readableStreamReaderOwnerReadableStream], reason); |
| 365 new TypeError(errReleasedReaderClosedPromise)); | |
| 366 } else { | |
| 367 this[readableStreamReaderClosedPromise] = | |
| 368 Promise_reject(new TypeError(errReleasedReaderClosedPromise)); | |
| 369 } | |
| 370 | |
| 371 this[readableStreamReaderOwnerReadableStream][readableStreamReader] = | |
| 372 undefined; | |
| 373 this[readableStreamReaderOwnerReadableStream] = undefined; | |
| 374 } | |
| 375 } | 411 } |
| 376 | 412 |
| 377 // | 413 // |
| 378 // Readable stream abstract operations | 414 // Readable stream abstract operations |
| 379 // | 415 // |
| 380 | 416 |
| 381 function AcquireReadableStreamReader(stream) { | 417 function AcquireReadableStreamDefaultReader(stream) { |
| 382 return new ReadableStreamReader(stream); | 418 return new ReadableStreamDefaultReader(stream); |
| 383 } | 419 } |
| 384 | 420 |
| 385 function CancelReadableStream(stream, reason) { | 421 function ReadableStreamCancel(stream, reason) { |
| 386 stream[readableStreamBits] |= DISTURBED; | 422 stream[readableStreamBits] |= DISTURBED; |
| 387 | 423 |
| 388 const state = stream[readableStreamState]; | 424 const state = ReadableStreamGetState(stream); |
| 389 if (state === STATE_CLOSED) { | 425 if (state === STATE_CLOSED) { |
| 390 return Promise_resolve(undefined); | 426 return Promise_resolve(undefined); |
| 391 } | 427 } |
| 392 if (state === STATE_ERRORED) { | 428 if (state === STATE_ERRORED) { |
| 393 return Promise_reject(stream[readableStreamStoredError]); | 429 return Promise_reject(stream[readableStreamStoredError]); |
| 394 } | 430 } |
| 395 | 431 |
| 396 stream[readableStreamQueue] = new v8.InternalPackedArray(); | 432 ReadableStreamClose(stream); |
| 397 FinishClosingReadableStream(stream); | |
| 398 | 433 |
| 399 const underlyingSource = stream[readableStreamUnderlyingSource]; | 434 const sourceCancelPromise = ReadableStreamDefaultControllerCancel(stream[rea dableStreamController], reason); |
| 400 const sourceCancelPromise = PromiseCallOrNoop( | |
| 401 underlyingSource, 'cancel', reason, 'underlyingSource.cancel'); | |
| 402 return thenPromise(sourceCancelPromise, () => undefined); | 435 return thenPromise(sourceCancelPromise, () => undefined); |
| 403 } | 436 } |
| 404 | 437 |
| 405 function CloseReadableStream(stream) { | 438 function ReadableStreamDefaultControllerClose(controller) { |
| 406 if (stream[readableStreamState] === STATE_CLOSED) { | 439 const stream = controller[readableStreamDefaultControllerControlledReadableS tream]; |
| 407 return undefined; | |
| 408 } | |
| 409 | 440 |
| 410 stream[readableStreamBits] |= CLOSE_REQUESTED; | 441 controller[readableStreamDefaultControllerBits] |= CLOSE_REQUESTED; |
| 411 | 442 |
| 412 if (stream[readableStreamQueue].length === 0) { | 443 if (controller[readableStreamDefaultControllerQueue].length === 0) { |
| 413 return FinishClosingReadableStream(stream); | 444 ReadableStreamClose(stream); |
| 414 } | 445 } |
| 415 } | 446 } |
| 416 | 447 |
| 417 function EnqueueInReadableStream(stream, chunk) { | 448 function ReadableStreamFulfillReadRequest(stream, chunk, done) { |
| 418 if (stream[readableStreamState] === STATE_CLOSED) { | 449 const reader = stream[readableStreamReader]; |
| 419 return undefined; | |
| 420 } | |
| 421 | 450 |
| 422 if (IsReadableStreamLocked(stream) === true && | 451 const readRequest = |
| 423 stream[readableStreamReader][readableStreamReaderReadRequests].length > | 452 stream[readableStreamReader][readableStreamDefaultReaderReadRequests] |
| 424 0) { | 453 .shift(); |
| 425 const readRequest = | 454 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, done)); |
| 426 stream[readableStreamReader][readableStreamReaderReadRequests] | 455 } |
| 427 .shift(); | 456 |
| 428 v8.resolvePromise(readRequest, CreateIterResultObject(chunk, false)); | 457 function ReadableStreamDefaultControllerEnqueue(controller, chunk) { |
| 458 const stream = controller[readableStreamDefaultControllerControlledReadableS tream]; | |
| 459 | |
| 460 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) { | |
| 461 ReadableStreamFulfillReadRequest(stream, chunk, false); | |
| 429 } else { | 462 } else { |
| 430 let chunkSize = 1; | 463 let chunkSize = 1; |
| 431 | 464 |
| 432 const strategySize = stream[readableStreamStrategySize]; | 465 const strategySize = controller[readableStreamDefaultControllerStrategySiz e]; |
| 433 if (strategySize !== undefined) { | 466 if (strategySize !== undefined) { |
| 434 try { | 467 try { |
| 435 chunkSize = strategySize(chunk); | 468 chunkSize = strategySize(chunk); |
| 436 } catch (chunkSizeE) { | 469 } catch (chunkSizeE) { |
| 437 if (stream[readableStreamState] === STATE_READABLE) { | 470 if (ReadableStreamGetState(stream) === STATE_READABLE) { |
| 438 ErrorReadableStream(stream, chunkSizeE); | 471 ReadableStreamDefaultControllerError(controller, chunkSizeE); |
| 439 } | 472 } |
| 440 throw chunkSizeE; | 473 throw chunkSizeE; |
| 441 } | 474 } |
| 442 } | 475 } |
| 443 | 476 |
| 444 try { | 477 try { |
| 445 EnqueueValueWithSize(stream, chunk, chunkSize); | 478 EnqueueValueWithSize(controller, chunk, chunkSize); |
| 446 } catch (enqueueE) { | 479 } catch (enqueueE) { |
| 447 if (stream[readableStreamState] === STATE_READABLE) { | 480 if (ReadableStreamGetState(stream) === STATE_READABLE) { |
| 448 ErrorReadableStream(stream, enqueueE); | 481 ReadableStreamDefaultControllerError(controller, enqueueE); |
| 449 } | 482 } |
| 450 throw enqueueE; | 483 throw enqueueE; |
| 451 } | 484 } |
| 452 } | 485 } |
| 453 | 486 |
| 454 RequestReadableStreamPull(stream); | 487 ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| 455 } | 488 } |
| 456 | 489 |
| 457 function ErrorReadableStream(stream, e) { | 490 function ReadableStreamGetState(stream) { |
| 458 stream[readableStreamQueue] = new v8.InternalPackedArray(); | 491 return (stream[readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET; |
| 492 } | |
| 493 | |
| 494 function ReadableStreamSetState(stream, state) { | |
| 495 stream[readableStreamBits] = (stream[readableStreamBits] & ~STATE_MASK) | | |
| 496 (state << STATE_BITS_OFFSET); | |
| 497 } | |
| 498 | |
| 499 function ReadableStreamDefaultControllerError(controller, e) { | |
| 500 controller[readableStreamDefaultControllerQueue] = new v8.InternalPackedArra y(); | |
| 501 const stream = controller[readableStreamDefaultControllerControlledReadableS tream]; | |
| 502 ReadableStreamError(stream, e); | |
| 503 } | |
| 504 | |
| 505 function ReadableStreamError(stream, e) { | |
| 459 stream[readableStreamStoredError] = e; | 506 stream[readableStreamStoredError] = e; |
| 460 stream[readableStreamState] = STATE_ERRORED; | 507 ReadableStreamSetState(stream, STATE_ERRORED); |
| 461 | 508 |
| 462 const reader = stream[readableStreamReader]; | 509 const reader = stream[readableStreamReader]; |
| 463 if (reader === undefined) { | 510 if (reader === undefined) { |
| 464 return undefined; | 511 return undefined; |
| 465 } | 512 } |
| 466 | 513 |
| 467 const readRequests = reader[readableStreamReaderReadRequests]; | 514 if (IsReadableStreamDefaultReader(reader) === true) { |
| 468 for (let i = 0; i < readRequests.length; ++i) { | 515 const readRequests = reader[readableStreamDefaultReaderReadRequests]; |
| 469 v8.rejectPromise(readRequests[i], e); | 516 for (let i = 0; i < readRequests.length; ++i) { |
| 517 v8.rejectPromise(readRequests[i], e); | |
| 518 } | |
| 519 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay(); | |
| 520 } else { | |
| 521 const readIntoRequests = reader[readableStreamReaderReadIntoRequests]; | |
| 522 for (let i = 0; i < readIntoRequests.length; ++i) { | |
| 523 v8.rejectPromise(readIntoRequests[i], e); | |
| 524 } | |
| 525 reader[readableStreamReaderReadIntoRequests] = new v8.InternalPackedArray( ); | |
| 470 } | 526 } |
| 471 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
| 472 | 527 |
| 473 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); | 528 v8.rejectPromise(reader[readableStreamReaderClosedPromise], e); |
| 474 } | 529 } |
| 475 | 530 |
| 476 function FinishClosingReadableStream(stream) { | 531 function ReadableStreamClose(stream) { |
| 477 stream[readableStreamState] = STATE_CLOSED; | 532 ReadableStreamSetState(stream, STATE_CLOSED); |
| 478 | 533 |
| 479 const reader = stream[readableStreamReader]; | 534 const reader = stream[readableStreamReader]; |
| 480 if (reader === undefined) { | 535 if (reader === undefined) { |
| 481 return undefined; | 536 return undefined; |
| 482 } | 537 } |
| 483 | 538 |
| 484 | 539 if (IsReadableStreamDefaultReader(reader) === true) { |
| 485 const readRequests = reader[readableStreamReaderReadRequests]; | 540 const readRequests = reader[readableStreamDefaultReaderReadRequests]; |
| 486 for (let i = 0; i < readRequests.length; ++i) { | 541 for (let i = 0; i < readRequests.length; ++i) { |
| 487 v8.resolvePromise( | 542 v8.resolvePromise( |
| 488 readRequests[i], CreateIterResultObject(undefined, true)); | 543 readRequests[i], CreateIterResultObject(undefined, true)); |
| 544 } | |
| 545 reader[readableStreamDefaultReaderReadRequests] = new v8.InternalPackedArr ay(); | |
| 489 } | 546 } |
| 490 reader[readableStreamReaderReadRequests] = new v8.InternalPackedArray(); | |
| 491 | 547 |
| 492 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); | 548 v8.resolvePromise(reader[readableStreamReaderClosedPromise], undefined); |
| 493 } | 549 } |
| 494 | 550 |
| 495 function GetReadableStreamDesiredSize(stream) { | 551 function ReadableStreamDefaultControllerGetDesiredSize(controller) { |
| 496 const queueSize = GetTotalQueueSize(stream); | 552 const queueSize = GetTotalQueueSize(controller); |
| 497 return stream[readableStreamStrategyHWM] - queueSize; | 553 return controller[readableStreamDefaultControllerStrategyHWM] - queueSize; |
| 498 } | 554 } |
| 499 | 555 |
| 500 function IsReadableStream(x) { | 556 function IsReadableStream(x) { |
| 501 return hasOwnProperty(x, readableStreamUnderlyingSource); | 557 return hasOwnProperty(x, readableStreamController); |
| 502 } | 558 } |
| 503 | 559 |
| 504 function IsReadableStreamDisturbed(stream) { | 560 function IsReadableStreamDisturbed(stream) { |
| 505 return stream[readableStreamBits] & DISTURBED; | 561 return stream[readableStreamBits] & DISTURBED; |
| 506 } | 562 } |
| 507 | 563 |
| 508 function SetReadableStreamDisturbed(stream) { | 564 function SetReadableStreamDisturbed(stream) { |
| 509 return stream[readableStreamBits] |= DISTURBED; | 565 return stream[readableStreamBits] |= DISTURBED; |
| 510 } | 566 } |
| 511 | 567 |
| 512 function IsReadableStreamLocked(stream) { | 568 function IsReadableStreamLocked(stream) { |
| 513 return stream[readableStreamReader] !== undefined; | 569 return stream[readableStreamReader] !== undefined; |
| 514 } | 570 } |
| 515 | 571 |
| 516 function IsReadableStreamController(x) { | 572 function IsReadableStreamDefaultController(x) { |
| 517 return hasOwnProperty(x, readableStreamControllerControlledReadableStream); | 573 return hasOwnProperty(x, readableStreamDefaultControllerControlledReadableSt ream); |
| 574 } | |
| 575 | |
| 576 function IsReadableStreamDefaultReader(x) { | |
| 577 return hasOwnProperty(x, readableStreamDefaultReaderReadRequests); | |
| 518 } | 578 } |
| 519 | 579 |
| 520 function IsReadableStreamReadable(stream) { | 580 function IsReadableStreamReadable(stream) { |
| 521 return stream[readableStreamState] === STATE_READABLE; | 581 return ReadableStreamGetState(stream) === STATE_READABLE; |
| 522 } | 582 } |
| 523 | 583 |
| 524 function IsReadableStreamClosed(stream) { | 584 function IsReadableStreamClosed(stream) { |
| 525 return stream[readableStreamState] === STATE_CLOSED; | 585 return ReadableStreamGetState(stream) === STATE_CLOSED; |
| 526 } | 586 } |
| 527 | 587 |
| 528 function IsReadableStreamErrored(stream) { | 588 function IsReadableStreamErrored(stream) { |
| 529 return stream[readableStreamState] === STATE_ERRORED; | 589 return ReadableStreamGetState(stream) === STATE_ERRORED; |
| 530 } | 590 } |
| 531 | 591 |
| 532 function IsReadableStreamReader(x) { | 592 function ReadableStreamReaderGenericInitialize(reader, stream) { |
| 533 return hasOwnProperty(x, readableStreamReaderOwnerReadableStream); | 593 // TODO(yhirano): Remove this when we don't need hasPendingActivity in |
| 594 // blink::UnderlyingSourceBase. | |
| 595 const controller = stream[readableStreamController]; | |
| 596 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) { | |
| 597 // The stream is created with an external controller (i.e. made in | |
| 598 // Blink). | |
| 599 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource]; | |
| 600 callFunction(underlyingSource.notifyLockAcquired, underlyingSource); | |
| 601 } | |
| 602 | |
| 603 reader[readableStreamReaderOwnerReadableStream] = stream; | |
| 604 stream[readableStreamReader] = reader; | |
| 605 | |
| 606 switch (ReadableStreamGetState(stream)) { | |
| 607 case STATE_READABLE: | |
| 608 reader[readableStreamReaderClosedPromise] = v8.createPromise(); | |
| 609 break; | |
| 610 case STATE_CLOSED: | |
| 611 reader[readableStreamReaderClosedPromise] = Promise_resolve(undefined); | |
| 612 break; | |
| 613 case STATE_ERRORED: | |
| 614 reader[readableStreamReaderClosedPromise] = | |
| 615 Promise_reject(stream[readableStreamStoredError]); | |
| 616 break; | |
| 617 } | |
| 534 } | 618 } |
| 535 | 619 |
| 536 function ReadFromReadableStreamReader(reader) { | 620 function ReadableStreamReaderGenericRelease(reader) { |
| 621 // TODO(yhirano): Remove this when we don't need hasPendingActivity in | |
| 622 // blink::UnderlyingSourceBase. | |
| 623 const controller = reader[readableStreamReaderOwnerReadableStream][readableS treamController]; | |
| 624 if (controller[readableStreamDefaultControllerBits] & EXTERNALLY_CONTROLLED) { | |
| 625 // The stream is created with an external controller (i.e. made in | |
| 626 // Blink). | |
| 627 const underlyingSource = controller[readableStreamDefaultControllerUnderly ingSource]; | |
| 628 callFunction(underlyingSource.notifyLockReleased, underlyingSource); | |
| 629 } | |
| 630 | |
| 631 if (ReadableStreamGetState(reader[readableStreamReaderOwnerReadableStream]) === STATE_READABLE) { | |
| 632 v8.rejectPromise(reader[readableStreamReaderClosedPromise], new TypeError( errReleasedReaderClosedPromise)); | |
| 633 } else { | |
| 634 reader[readableStreamReaderClosedPromise] = Promise_reject(new TypeError(e rrReleasedReaderClosedPromise)); | |
| 635 } | |
| 636 | |
| 637 reader[readableStreamReaderOwnerReadableStream][readableStreamReader] = | |
| 638 undefined; | |
| 639 reader[readableStreamReaderOwnerReadableStream] = undefined; | |
| 640 } | |
| 641 | |
| 642 function ReadableStreamDefaultReaderRead(reader) { | |
| 537 const stream = reader[readableStreamReaderOwnerReadableStream]; | 643 const stream = reader[readableStreamReaderOwnerReadableStream]; |
| 538 stream[readableStreamBits] |= DISTURBED; | 644 stream[readableStreamBits] |= DISTURBED; |
| 539 | 645 |
| 540 if (stream[readableStreamState] === STATE_CLOSED) { | 646 if (ReadableStreamGetState(stream) === STATE_CLOSED) { |
| 541 return Promise_resolve(CreateIterResultObject(undefined, true)); | 647 return Promise_resolve(CreateIterResultObject(undefined, true)); |
| 542 } | 648 } |
| 543 | 649 |
| 544 if (stream[readableStreamState] === STATE_ERRORED) { | 650 if (ReadableStreamGetState(stream) === STATE_ERRORED) { |
| 545 return Promise_reject(stream[readableStreamStoredError]); | 651 return Promise_reject(stream[readableStreamStoredError]); |
| 546 } | 652 } |
| 547 | 653 |
| 548 const queue = stream[readableStreamQueue]; | 654 return ReadableStreamDefaultControllerPull(stream[readableStreamController]) ; |
| 549 if (queue.length > 0) { | |
| 550 const chunk = DequeueValue(stream); | |
| 551 | |
| 552 if (stream[readableStreamBits] & CLOSE_REQUESTED && queue.length === 0) { | |
| 553 FinishClosingReadableStream(stream); | |
| 554 } else { | |
| 555 RequestReadableStreamPull(stream); | |
| 556 } | |
| 557 | |
| 558 return Promise_resolve(CreateIterResultObject(chunk, false)); | |
| 559 } else { | |
| 560 const readRequest = v8.createPromise(); | |
| 561 | |
| 562 reader[readableStreamReaderReadRequests].push(readRequest); | |
| 563 RequestReadableStreamPull(stream); | |
| 564 return readRequest; | |
| 565 } | |
| 566 } | 655 } |
| 567 | 656 |
| 568 function RequestReadableStreamPull(stream) { | 657 function ReadableStreamDefaultControllerCallPullIfNeeded(controller) { |
| 569 const shouldPull = ShouldReadableStreamPull(stream); | 658 const shouldPull = ReadableStreamDefaultControllerShouldPull(controller); |
| 570 if (shouldPull === false) { | 659 if (shouldPull === false) { |
| 571 return undefined; | 660 return undefined; |
| 572 } | 661 } |
| 573 | 662 |
| 574 if (stream[readableStreamBits] & PULLING) { | 663 if (controller[readableStreamDefaultControllerBits] & PULLING) { |
| 575 stream[readableStreamBits] |= PULL_AGAIN; | 664 controller[readableStreamDefaultControllerBits] |= PULL_AGAIN; |
| 576 return undefined; | 665 return undefined; |
| 577 } | 666 } |
| 578 | 667 |
| 579 stream[readableStreamBits] |= PULLING; | 668 controller[readableStreamDefaultControllerBits] |= PULLING; |
| 580 | 669 |
| 581 const underlyingSource = stream[readableStreamUnderlyingSource]; | 670 const underlyingSource = controller[readableStreamDefaultControllerUnderlyin gSource]; |
| 582 const controller = stream[readableStreamController]; | |
| 583 const pullPromise = PromiseCallOrNoop( | 671 const pullPromise = PromiseCallOrNoop( |
| 584 underlyingSource, 'pull', controller, 'underlyingSource.pull'); | 672 underlyingSource, 'pull', controller, 'underlyingSource.pull'); |
| 585 | 673 |
| 586 thenPromise(pullPromise, | 674 thenPromise(pullPromise, |
| 587 () => { | 675 () => { |
| 588 stream[readableStreamBits] &= ~PULLING; | 676 controller[readableStreamDefaultControllerBits] &= ~PULLING; |
| 589 | 677 |
| 590 if (stream[readableStreamBits] & PULL_AGAIN) { | 678 if (controller[readableStreamDefaultControllerBits] & PULL_AGAIN) { |
| 591 stream[readableStreamBits] &= ~PULL_AGAIN; | 679 controller[readableStreamDefaultControllerBits] &= ~PULL_AGAIN; |
| 592 return RequestReadableStreamPull(stream); | 680 return ReadableStreamDefaultControllerCallPullIfNeeded(controller); |
| 593 } | 681 } |
| 594 }, | 682 }, |
| 595 e => { | 683 e => { |
| 596 if (stream[readableStreamState] === STATE_READABLE) { | 684 if (ReadableStreamGetState(controller[readableStreamDefaultControllerC ontrolledReadableStream]) === STATE_READABLE) { |
| 597 return ErrorReadableStream(stream, e); | 685 return ReadableStreamDefaultControllerError(controller, e); |
| 598 } | 686 } |
| 599 }); | 687 }); |
| 600 } | 688 } |
| 601 | 689 |
| 602 function ShouldReadableStreamPull(stream) { | 690 function ReadableStreamDefaultControllerShouldPull(controller) { |
| 603 const state = stream[readableStreamState]; | 691 const stream = controller[readableStreamDefaultControllerControlledReadableS tream]; |
| 692 | |
| 693 const state = ReadableStreamGetState(stream); | |
| 604 if (state === STATE_CLOSED || state === STATE_ERRORED) { | 694 if (state === STATE_CLOSED || state === STATE_ERRORED) { |
| 605 return false; | 695 return false; |
| 606 } | 696 } |
| 607 | 697 |
| 608 if (stream[readableStreamBits] & CLOSE_REQUESTED) { | 698 if (controller[readableStreamDefaultControllerBits] & CLOSE_REQUESTED) { |
| 609 return false; | 699 return false; |
| 610 } | 700 } |
| 611 | 701 |
| 612 if (!(stream[readableStreamBits] & STARTED)) { | 702 if (!(controller[readableStreamDefaultControllerBits] & STARTED)) { |
| 613 return false; | 703 return false; |
| 614 } | 704 } |
| 615 | 705 |
| 616 if (IsReadableStreamLocked(stream) === true) { | 706 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadReque sts(stream) > 0) { |
| 617 const reader = stream[readableStreamReader]; | 707 return true; |
| 618 const readRequests = reader[readableStreamReaderReadRequests]; | |
| 619 if (readRequests.length > 0) { | |
| 620 return true; | |
| 621 } | |
| 622 } | 708 } |
| 623 | 709 |
| 624 const desiredSize = GetReadableStreamDesiredSize(stream); | 710 const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller ); |
| 625 if (desiredSize > 0) { | 711 if (desiredSize > 0) { |
| 626 return true; | 712 return true; |
| 627 } | 713 } |
| 628 | 714 |
| 629 return false; | 715 return false; |
| 630 } | 716 } |
| 631 | 717 |
| 718 function ReadableStreamGetNumReadRequests(stream) { | |
| 719 const reader = stream[readableStreamReader]; | |
| 720 const readRequests = reader[readableStreamDefaultReaderReadRequests]; | |
| 721 return readRequests.length; | |
| 722 } | |
| 723 | |
| 632 // Potential future optimization: use class instances for the underlying | 724 // Potential future optimization: use class instances for the underlying |
| 633 // sources, so that we don't re-create | 725 // sources, so that we don't re-create |
| 634 // closures every time. | 726 // closures every time. |
| 635 | 727 |
| 636 // TODO(domenic): shouldClone argument from spec not supported yet | 728 // TODO(domenic): shouldClone argument from spec not supported yet |
| 637 function TeeReadableStream(stream) { | 729 function ReadableStreamTee(stream) { |
| 638 const reader = AcquireReadableStreamReader(stream); | 730 const reader = AcquireReadableStreamDefaultReader(stream); |
| 639 | 731 |
| 640 let closedOrErrored = false; | 732 let closedOrErrored = false; |
| 641 let canceled1 = false; | 733 let canceled1 = false; |
| 642 let canceled2 = false; | 734 let canceled2 = false; |
| 643 let reason1; | 735 let reason1; |
| 644 let reason2; | 736 let reason2; |
| 645 let promise = v8.createPromise(); | 737 let promise = v8.createPromise(); |
| 646 | 738 |
| 647 const branch1 = new ReadableStream({pull, cancel: cancel1}); | 739 const branch1Stream = new ReadableStream({pull, cancel: cancel1}); |
| 648 | 740 |
| 649 const branch2 = new ReadableStream({pull, cancel: cancel2}); | 741 const branch2Stream = new ReadableStream({pull, cancel: cancel2}); |
| 742 | |
| 743 const branch1 = branch1Stream[readableStreamController]; | |
| 744 const branch2 = branch2Stream[readableStreamController]; | |
| 650 | 745 |
| 651 thenPromise( | 746 thenPromise( |
| 652 reader[readableStreamReaderClosedPromise], undefined, function(r) { | 747 reader[readableStreamReaderClosedPromise], undefined, function(r) { |
| 653 if (closedOrErrored === true) { | 748 if (closedOrErrored === true) { |
| 654 return; | 749 return; |
| 655 } | 750 } |
| 656 | 751 |
| 657 ErrorReadableStream(branch1, r); | 752 ReadableStreamDefaultControllerError(branch1, r); |
| 658 ErrorReadableStream(branch2, r); | 753 ReadableStreamDefaultControllerError(branch2, r); |
| 659 closedOrErrored = true; | 754 closedOrErrored = true; |
| 660 }); | 755 }); |
| 661 | 756 |
| 662 return [branch1, branch2]; | 757 return [branch1Stream, branch2Stream]; |
| 663 | |
| 664 | 758 |
| 665 function pull() { | 759 function pull() { |
| 666 return thenPromise( | 760 return thenPromise( |
| 667 ReadFromReadableStreamReader(reader), function(result) { | 761 ReadableStreamDefaultReaderRead(reader), function(result) { |
| 668 const value = result.value; | 762 const value = result.value; |
| 669 const done = result.done; | 763 const done = result.done; |
| 670 | 764 |
| 671 if (done === true && closedOrErrored === false) { | 765 if (done === true && closedOrErrored === false) { |
| 672 CloseReadableStream(branch1); | 766 if (canceled1 === false) { |
| 673 CloseReadableStream(branch2); | 767 ReadableStreamDefaultControllerClose(branch1); |
| 768 } | |
| 769 if (canceled2 === false) { | |
| 770 ReadableStreamDefaultControllerClose(branch2); | |
| 771 } | |
| 674 closedOrErrored = true; | 772 closedOrErrored = true; |
| 675 } | 773 } |
| 676 | 774 |
| 677 if (closedOrErrored === true) { | 775 if (closedOrErrored === true) { |
| 678 return; | 776 return; |
| 679 } | 777 } |
| 680 | 778 |
| 681 if (canceled1 === false) { | 779 if (canceled1 === false) { |
| 682 EnqueueInReadableStream(branch1, value); | 780 ReadableStreamDefaultControllerEnqueue(branch1, value); |
| 683 } | 781 } |
| 684 | 782 |
| 685 if (canceled2 === false) { | 783 if (canceled2 === false) { |
| 686 EnqueueInReadableStream(branch2, value); | 784 ReadableStreamDefaultControllerEnqueue(branch2, value); |
| 687 } | 785 } |
| 688 }); | 786 }); |
| 689 } | 787 } |
| 690 | 788 |
| 691 function cancel1(reason) { | 789 function cancel1(reason) { |
| 692 canceled1 = true; | 790 canceled1 = true; |
| 693 reason1 = reason; | 791 reason1 = reason; |
| 694 | 792 |
| 695 if (canceled2 === true) { | 793 if (canceled2 === true) { |
| 696 const compositeReason = [reason1, reason2]; | 794 const compositeReason = [reason1, reason2]; |
| 697 const cancelResult = CancelReadableStream(stream, compositeReason); | 795 const cancelResult = ReadableStreamCancel(stream, compositeReason); |
| 698 v8.resolvePromise(promise, cancelResult); | 796 v8.resolvePromise(promise, cancelResult); |
| 699 } | 797 } |
| 700 | 798 |
| 701 return promise; | 799 return promise; |
| 702 } | 800 } |
| 703 | 801 |
| 704 function cancel2(reason) { | 802 function cancel2(reason) { |
| 705 canceled2 = true; | 803 canceled2 = true; |
| 706 reason2 = reason; | 804 reason2 = reason; |
| 707 | 805 |
| 708 if (canceled1 === true) { | 806 if (canceled1 === true) { |
| 709 const compositeReason = [reason1, reason2]; | 807 const compositeReason = [reason1, reason2]; |
| 710 const cancelResult = CancelReadableStream(stream, compositeReason); | 808 const cancelResult = ReadableStreamCancel(stream, compositeReason); |
| 711 v8.resolvePromise(promise, cancelResult); | 809 v8.resolvePromise(promise, cancelResult); |
| 712 } | 810 } |
| 713 | 811 |
| 714 return promise; | 812 return promise; |
| 715 } | 813 } |
| 716 } | 814 } |
| 717 | 815 |
| 718 // | 816 // |
| 719 // Queue-with-sizes | 817 // Queue-with-sizes |
| 720 // Modified from taking the queue (as in the spec) to taking the stream, so we | 818 // Modified from taking the queue (as in the spec) to taking the stream, so we |
| 721 // can modify the queue size alongside. | 819 // can modify the queue size alongside. |
| 722 // | 820 // |
| 723 | 821 |
| 724 function DequeueValue(stream) { | 822 function DequeueValue(controller) { |
| 725 const result = stream[readableStreamQueue].shift(); | 823 const result = controller[readableStreamDefaultControllerQueue].shift(); |
| 726 stream[readableStreamQueueSize] -= result.size; | 824 controller[readableStreamDefaultControllerQueueSize] -= result.size; |
| 727 return result.value; | 825 return result.value; |
| 728 } | 826 } |
| 729 | 827 |
| 730 function EnqueueValueWithSize(stream, value, size) { | 828 function EnqueueValueWithSize(controller, value, size) { |
| 731 size = Number(size); | 829 size = Number(size); |
| 732 if (Number_isNaN(size) || size === +Infinity || size < 0) { | 830 if (Number_isNaN(size) || size === +Infinity || size < 0) { |
| 733 throw new RangeError(errInvalidSize); | 831 throw new RangeError(errInvalidSize); |
| 734 } | 832 } |
| 735 | 833 |
| 736 stream[readableStreamQueueSize] += size; | 834 controller[readableStreamDefaultControllerQueueSize] += size; |
| 737 stream[readableStreamQueue].push({value, size}); | 835 controller[readableStreamDefaultControllerQueue].push({value, size}); |
| 738 } | 836 } |
| 739 | 837 |
| 740 function GetTotalQueueSize(stream) { return stream[readableStreamQueueSize]; } | 838 function GetTotalQueueSize(controller) { return controller[readableStreamDefau ltControllerQueueSize]; } |
| 741 | 839 |
| 742 // | 840 // |
| 743 // Other helpers | 841 // Other helpers |
| 744 // | 842 // |
| 745 | 843 |
| 746 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { | 844 function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { |
| 747 if (size !== undefined && typeof size !== 'function') { | 845 if (size !== undefined && typeof size !== 'function') { |
| 748 throw new TypeError(errSizeNotAFunction); | 846 throw new TypeError(errSizeNotAFunction); |
| 749 } | 847 } |
| 750 | 848 |
| (...skipping 29 matching lines...) Expand all Loading... | |
| 780 method = O[P]; | 878 method = O[P]; |
| 781 } catch (methodE) { | 879 } catch (methodE) { |
| 782 return Promise_reject(methodE); | 880 return Promise_reject(methodE); |
| 783 } | 881 } |
| 784 | 882 |
| 785 if (method === undefined) { | 883 if (method === undefined) { |
| 786 return Promise_resolve(undefined); | 884 return Promise_resolve(undefined); |
| 787 } | 885 } |
| 788 | 886 |
| 789 if (typeof method !== 'function') { | 887 if (typeof method !== 'function') { |
| 790 return Promise_reject(errTmplMustBeFunctionOrUndefined(nameForError)); | 888 return Promise_reject(new TypeError(errTmplMustBeFunctionOrUndefined(nameF orError))); |
| 791 } | 889 } |
| 792 | 890 |
| 793 try { | 891 try { |
| 794 return Promise_resolve(callFunction(method, O, arg)); | 892 return Promise_resolve(callFunction(method, O, arg)); |
| 795 } catch (e) { | 893 } catch (e) { |
| 796 return Promise_reject(e); | 894 return Promise_reject(e); |
| 797 } | 895 } |
| 798 } | 896 } |
| 799 | 897 |
| 800 function CreateIterResultObject(value, done) { return {value, done}; } | 898 function CreateIterResultObject(value, done) { return {value, done}; } |
| 801 | 899 |
| 802 | 900 |
| 803 // | 901 // |
| 804 // Additions to the global | 902 // Additions to the global |
| 805 // | 903 // |
| 806 | 904 |
| 807 defineProperty(global, 'ReadableStream', { | 905 defineProperty(global, 'ReadableStream', { |
| 808 value: ReadableStream, | 906 value: ReadableStream, |
| 809 enumerable: false, | 907 enumerable: false, |
| 810 configurable: true, | 908 configurable: true, |
| 811 writable: true | 909 writable: true |
| 812 }); | 910 }); |
| 813 | 911 |
| 814 // | 912 // |
| 815 // Exports to Blink | 913 // Exports to Blink |
| 816 // | 914 // |
| 817 | 915 |
| 818 binding.AcquireReadableStreamReader = AcquireReadableStreamReader; | 916 binding.AcquireReadableStreamDefaultReader = AcquireReadableStreamDefaultReade r; |
| 819 binding.IsReadableStream = IsReadableStream; | 917 binding.IsReadableStream = IsReadableStream; |
| 820 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; | 918 binding.IsReadableStreamDisturbed = IsReadableStreamDisturbed; |
| 821 binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed; | 919 binding.SetReadableStreamDisturbed = SetReadableStreamDisturbed; |
| 822 binding.IsReadableStreamLocked = IsReadableStreamLocked; | 920 binding.IsReadableStreamLocked = IsReadableStreamLocked; |
| 823 binding.IsReadableStreamReadable = IsReadableStreamReadable; | 921 binding.IsReadableStreamReadable = IsReadableStreamReadable; |
| 824 binding.IsReadableStreamClosed = IsReadableStreamClosed; | 922 binding.IsReadableStreamClosed = IsReadableStreamClosed; |
| 825 binding.IsReadableStreamErrored = IsReadableStreamErrored; | 923 binding.IsReadableStreamErrored = IsReadableStreamErrored; |
| 826 binding.IsReadableStreamReader = IsReadableStreamReader; | 924 binding.IsReadableStreamDefaultReader = IsReadableStreamDefaultReader; |
| 827 binding.ReadFromReadableStreamReader = ReadFromReadableStreamReader; | 925 binding.ReadableStreamDefaultReaderRead = ReadableStreamDefaultReaderRead; |
| 828 | 926 |
| 829 binding.CloseReadableStream = CloseReadableStream; | 927 binding.ReadableStreamDefaultControllerClose = ReadableStreamDefaultController Close; |
| 830 binding.GetReadableStreamDesiredSize = GetReadableStreamDesiredSize; | 928 binding.ReadableStreamDefaultControllerGetDesiredSize = ReadableStreamDefaultC ontrollerGetDesiredSize; |
| 831 binding.EnqueueInReadableStream = EnqueueInReadableStream; | 929 binding.ReadableStreamDefaultControllerEnqueue = ReadableStreamDefaultControll erEnqueue; |
| 832 binding.ErrorReadableStream = ErrorReadableStream; | 930 binding.ReadableStreamDefaultControllerError = ReadableStreamDefaultController Error; |
| 833 | 931 |
| 834 binding.createReadableStreamWithExternalController = | 932 binding.createReadableStreamWithExternalController = |
| 835 (underlyingSource, strategy) => { | 933 (underlyingSource, strategy) => { |
| 836 return new ReadableStream( | 934 return new ReadableStream( |
| 837 underlyingSource, strategy, createWithExternalControllerSentinel); | 935 underlyingSource, strategy, createWithExternalControllerSentinel); |
| 838 }; | 936 }; |
| 839 }); | 937 }); |
| OLD | NEW |