Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1310)

Unified Diff: Source/platform/streams/ReadableStream.js

Issue 924713002: [WIP] ReadableStream V8 extension (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: More complete implementation Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « Source/platform/streams/ReadableStream.cpp ('k') | Source/platform/streams/WebStreams.cpp » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: Source/platform/streams/ReadableStream.js
diff --git a/Source/platform/streams/ReadableStream.js b/Source/platform/streams/ReadableStream.js
new file mode 100644
index 0000000000000000000000000000000000000000..015e5ba2732646e6c956a48bb308d2124578e9aa
--- /dev/null
+++ b/Source/platform/streams/ReadableStream.js
@@ -0,0 +1,603 @@
+(function (global) {
+ 'use strict';
+
+ native function SET_PRIVATE();
+ native function GET_PRIVATE();
+ native function HAS_PRIVATE();
+
+ const kQueueSize = Symbol('Queue-with-sizes queue size');
+ const kPromise = Symbol('promise');
+ const kResolve = Symbol('resolve corresponding promise');
+ const kReject = Symbol('reject corresponding promise');
+
+ // TODO(domenic): come up with a better way of getting at intrinsics
+ const Number = global.Number;
+ const TypeError = global.TypeError;
+ const RangeError = global.RangeError;
+ const Promise = global.Promise;
+
+ const Number_isNaN = Number.isNaN;
+ const Promise_resolve = Promise.resolve.bind(Promise);
+ const Promise_reject = Promise.reject.bind(Promise);
+
+ const uncurryThis = Function.prototype.bind.bind(Function.prototype.call);
+ const applyFunction = uncurryThis(Function.prototype.apply);
+ const thenPromise = uncurryThis(Promise.prototype.then);
+ const shiftArray = uncurryThis(Array.prototype.shift);
+ const pushArray = uncurryThis(Array.prototype.push);
+
+ // TODO(domenic): need to censor Function.prototype.toString for these; use V8 API presumably
+ class ReadableStream {
+ constructor(underlyingSource, strategy) {
+ if (underlyingSource === undefined) {
+ underlyingSource = {};
+ }
+ if (strategy === undefined) {
+ strategy = {};
+ }
+ const size = strategy.size;
+ let highWaterMark = strategy.highWaterMark;
+ if (highWaterMark === undefined) {
+ highWaterMark = 1;
+ }
+
+ const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
+
+ SET_PRIVATE(this, 'ReadableStream#underlyingSource', underlyingSource);
+
+ // TODO(domenic) use a real queue data structure
+ const queue = [];
+ queue[kQueueSize] = 0;
+ SET_PRIVATE(this, 'ReadableStream#queue', queue);
+
+ // TODO(domenic) consolidate booleans into a bit field?
+ // TODO(domenic) use integers for state? (or put in bit field?)
+ SET_PRIVATE(this, 'ReadableStream#state', 'readable');
+ SET_PRIVATE(this, 'ReadableStream#started', false);
+ SET_PRIVATE(this, 'ReadableStream#closeRequested', false);
+ SET_PRIVATE(this, 'ReadableStream#pulling', false);
+ SET_PRIVATE(this, 'ReadableStream#pullAgain', false);
+ SET_PRIVATE(this, 'ReadableStream#reader', undefined);
+
+ SET_PRIVATE(this, 'ReadableStream#storedError', undefined);
+ SET_PRIVATE(this, 'ReadableStream#strategySize', normalizedStrategy.size);
+ SET_PRIVATE(this, 'ReadableStream#stratgyHWM', normalizedStrategy.highWaterMark);
+ SET_PRIVATE(this, 'ReadableStream#stratgyHWM', normalizedStrategy.highWaterMark);
+
+ const controller = new ReadableStreamController(this);
+ SET_PRIVATE(this, 'ReadableStream#controller', controller);
+
+ const that = this;
+ const startResult = InvokeOrNoop(underlyingSource, 'start', [controller]);
+ thenPromise(Promise_resolve(startResult),
+ function() {
+ SET_PRIVATE(that, 'ReadableStream#started', true);
+ RequestReadableStreamPull(that);
+ },
+ function (r) {
+ if (GET_PRIVATE(that, 'ReadableStream#state' === 'readable')) {
+ return ErrorReadableStream(that, r);
+ }
+ }
+ );
+ }
+
+ cancel(reason) {
+ if (IsReadableStream(this) === false) {
+ return Promise_reject(new TypeError(
+ 'ReadableStream.prototype.cancel can only be used on a ReadableStream'));
+ }
+
+ if (IsReadableStreamLocked(this) === true) {
+ return Promise_reject(new TypeError(
+ 'Cannot cancel a stream that already has a reader'));
+ }
+
+ return CancelReadableStream(this, reason);
+ }
+
+ getReader() {
+ if (IsReadableStream(this) === false) {
+ throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream');
+ }
+
+ return AcquireReadableStreamReader(this);
+ }
+ }
+
+ class ReadableStreamController {
+ constructor(stream) {
+ if (IsReadableStream(stream) === false) {
+ throw new TypeError('ReadableStreamController can only be constructed with a ReadableStream instance');
+ }
+
+ if (GET_PRIVATE(stream, 'ReadableStream#controller') !== undefined) {
+ throw new TypeError(
+ 'ReadableStreamController instances can only be created by the ReadableStream constructor');
+ }
+
+ SET_PRIVATE(this, 'ReadableStreamController#controlledReadableStream', stream);
+ }
+
+ get desiredSize() {
+ if (IsReadableStreamController(this) === false) {
+ throw new TypeError(
+ 'ReadableStreamController.prototype.desiredSize can only be used on a ReadableStreamController');
+ }
+
+ return GetReadableStreamDesiredSize(GET_PRIVATE(this, 'ReadableStreamController#controlledReadableStream'));
+ }
+
+ close() {
+ if (IsReadableStreamController(this) === false) {
+ throw new TypeError(
+ 'ReadableStreamController.prototype.close can only be used on a ReadableStreamController');
+ }
+
+ const stream = GET_PRIVATE(this, 'ReadableStreamController#controlledReadableStream');
+
+ if (GET_PRIVATE(stream, 'ReadableStream#closeRequested') === true) {
+ throw new TypeError('The stream has already been closed; do not close it again!');
+ }
+ if (GET_PRIVATE(stream, 'ReadableStream#state') === 'errored') {
+ throw new TypeError('The stream is in an errored state and cannot be closed');
+ }
+
+ return CloseReadableStream(stream);
+ }
+
+ enqueue(chunk) {
+ if (IsReadableStreamController(this) === false) {
+ throw new TypeError(
+ 'ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController');
+ }
+
+ const stream = GET_PRIVATE(this, 'ReadableStreamController#controlledReadableStream');
+
+ if (GET_PRIVATE(stream, 'ReadableStream#state') === 'errored') {
+ throw GET_PRIVATE(stream, 'ReadableStream#storedError');
+ }
+
+ if (GET_PRIVATE(stream, 'ReadableStream#closeRequested') === true) {
+ throw new TypeError('stream is closed or draining');
+ }
+
+ return EnqueueInReadableStream(stream, chunk);
+ }
+
+ error(e) {
+ if (IsReadableStreamController(this) === false) {
+ throw new TypeError(
+ 'ReadableStreamController.prototype.error can only be used on a ReadableStreamController');
+ }
+
+ const stream = GET_PRIVATE(this, 'ReadableStreamController#controlledReadableStream');
+
+ const state = GET_PRIVATE(stream, 'ReadableStream#state');
+ if (state !== 'readable') {
+ throw new TypeError(`The stream is ${state} and so cannot be errored`);
+ }
+
+ return ErrorReadableStream(stream, e);
+ }
+ }
+
+ class ReadableStreamReader {
+ constructor(stream) {
+ if (IsReadableStream(stream) === false) {
+ throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance');
+ }
+ if (IsReadableStreamLocked(stream) === true) {
+ throw new TypeError('This stream has already been locked for exclusive reading by another reader');
+ }
+
+ SET_PRIVATE(stream, 'ReadableStream#reader', this);
+ SET_PRIVATE(this, 'ReadableStreamReader#ownerReadableStream', stream);
+
+ // TODO(domenic): use integers for state>
+ SET_PRIVATE(this, 'ReadableStreamReader#state', 'readable');
+ SET_PRIVATE(this, 'ReadableStreamReader#storedError', undefined);
+
+ // TODO(domenic): use a real queue data structure
+ SET_PRIVATE(this, 'ReadableStreamReader#readRequests', []);
+
+ // TODO(domenic): use faster means of creating/resolving/rejecting promises
+ const that = this;
+ SET_PRIVATE(this, 'ReadableStreamReader#closedPromise', new Promise(function (resolve, reject) {
+ SET_PRIVATE(that, 'ReadableStreamReader#closedPromise_resolve', resolve);
+ SET_PRIVATE(that, 'ReadableStreamReader#closedPromise_reject', reject);
+ }));
+
+ const streamState = GET_PRIVATE(stream, 'ReadableStream#state');
+ if (streamState === 'closed' || streamState === 'errored') {
+ ReleaseReadableStreamReader(this);
+ }
+ }
+
+ get closed() {
+ if (IsReadableStreamReader(this) === false) {
+ return Promise_reject(
+ new TypeError('ReadableStreamReader.prototype.closed can only be used on a ReadableStreamReader'));
+ }
+
+ return GET_PRIVATE(this, 'ReadableStreamReader#closedPromise');
+ }
+
+ cancel(reason) {
+ if (IsReadableStreamReader(this) === false) {
+ return Promise_reject(
+ new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader'));
+ }
+
+ const state = GET_PRIVATE(this, 'ReadableStreamReader#state');
+ if (state === 'closed') {
+ return Promise_resolve(undefined);
+ }
+
+ if (state === 'errored') {
+ return Promise_reject(GET_PRIVATE(this, 'ReadableStreamReader#storedError'));
+ }
+
+ return CancelReadableStream(GET_PRIVATE(this, 'ReadableStreamReader#ownerReadableStream'), reason);
+ }
+
+ read() {
+ if (IsReadableStreamReader(this) === false) {
+ return Promise_reject(
+ new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
+ }
+
+ return ReadFromReadableStreamReader(this);
+ }
+
+ releaseLock() {
+ if (IsReadableStreamReader(this) === false) {
+ throw new TypeError(
+ 'ReadableStreamReader.prototype.releaseLock can only be used on a ReadableStreamReader');
+ }
+
+ if (GET_PRIVATE(this, 'ReadableStreamReader#ownerReadableStream') === undefined) {
+ return undefined;
+ }
+
+ // TODO(domenic): is getting array lengths safe? I don't think so.
+ // Might become moot if we have a better data structure.
+ if (GET_PRIVATE(this, 'ReadableStreamReader#readRequests').length > 0) {
+ throw new TypeError(
+ 'Tried to release a reader lock when that reader has pending read() calls un-settled');
+ }
+
+ return ReleaseReadableStreamReader(this);
+ }
+ }
+
+ // Readable stream abstract operations
+
+ function AcquireReadableStreamReader(stream) {
+ return new ReadableStreamReader(stream);
+ }
+
+ function CancelReadableStream(stream, reason) {
+ const state = GET_PRIVATE(stream, 'ReadableStream#state');
+ if (state === 'closed') {
+ return Promise_resolve(undefined);
+ }
+ if (state === 'errored') {
+ return Promise_reject(GET_PRIVATE(stream, 'ReadableStream#storedError'));
+ }
+
+ SET_PRIVATE(stream, 'ReadableStream#queue', []);
+ FinishClosingReadableStream(stream);
+
+ const underlyingSource = GET_PRIVATE(stream, 'ReadableStream#underlyingSource');
+ const sourceCancelPromise = PromiseInvokeOrNoop(underlyingSource, 'cancel', [reason]);
+ return thenPromise(sourceCancelPromise, function() { return undefined; });
+ }
+
+ function CloseReadableStream(stream) {
+ if (GET_PRIVATE(stream, 'ReadableStream#state') === 'closed') {
+ return undefined;
+ }
+
+ SET_PRIVATE(stream, 'ReadableStream#closeRequested', true);
+
+ if (GET_PRIVATE(stream, 'ReadableStream#queue').length === 0) {
+ return FinishClosingReadableStream(stream);
+ }
+ }
+
+
+ function EnqueueInReadableStream(stream, chunk) {
+ if (GET_PRIVATE(stream, 'ReadableStream#state') === 'closed') {
+ return undefined;
+ }
+
+ if (IsReadableStreamLocked(stream) === true &&
+ GET_PRIVATE(GET_PRIVATE(stream, 'ReadableStream#reader'), 'ReadableStreamReader#readRequests').length > 0) {
+ const readRequest = shiftArray(readRequests);
+ readRequest[resolve](CreateIterResultObject(chunk, false));
+ } else {
+ let chunkSize = 1;
+
+ const strategySize = GET_PRIVATE(stream, 'ReadableStream#strategySize');
+ if (strategySize !== undefined) {
+ try {
+ chunkSize = strategySize(chunk);
+ } catch (chunkSizeE) {
+ ErrorReadableStream(stream, chunkSizeE);
+ throw chunkSizeE;
+ }
+ }
+
+ try {
+ EnqueueValueWithSize(GET_PRIVATE(stream, 'ReadableStream#queue'), chunk, chunkSize);
+ } catch (enqueueE) {
+ ErrorReadableStream(stream, enqueueE);
+ throw enqueueE;
+ }
+ }
+
+ RequestReadableStreamPull(stream);
+ }
+
+ function ErrorReadableStream(stream, e) {
+ SET_PRIVATE(stream, 'ReadableStream#queue', []);
+ SET_PRIVATE(stream, 'ReadableStream#storedError', e);
+ SET_PRIVATE(stream, 'ReadableStream#state', 'errored');
+
+ if (IsReadableStreamLocked(stream) === true) {
+ return ReleaseReadableStreamReader(GET_PRIVATE(stream, 'ReadableStream#reader'));
+ }
+ }
+
+ function FinishClosingReadableStream(stream) {
+ SET_PRIVATE(stream, 'ReadableStream#state', 'closed');
+
+ if (IsReadableStreamLocked(stream) === true) {
+ return ReleaseReadableStreamReader(GET_PRIVATE(stream, 'ReadableStream#reader'));
+ }
+ }
+
+ function GetReadableStreamDesiredSize(stream) {
+ const queueSize = GetTotalQueueSize(GET_PRIVATE(stream, 'ReadableStream#queue'));
+ return GET_PRIVATE(stream, 'ReadableStream#strategyHWM') - queueSize;
+ }
+
+ function IsReadableStream(x) {
+ // TODO(domenic): is it safe to allow this to be called on non-objects?
+
+ return HAS_PRIVATE(x, 'ReadableStream#underlyingSource');
+ }
+
+ function IsReadableStreamLocked(stream) {
+ return GET_PRIVATE(stream, 'ReadableStream#reader') !== undefined;
+ }
+
+ function IsReadableStreamController(x) {
+ return HAS_PRIVATE(x, 'ReadableStreamController#controlledReadableStream');
+ }
+
+ function IsReadableStreamReader(x) {
+ return HAS_PRIVATE(x, 'ReadableStreamReader#ownerReadableStream');
+ }
+
+ function ReadFromReadableStreamReader(reader) {
+ const state = GET_PRIVATE(reader, 'ReadableStreamReader#state');
+ if (state === 'closed') {
+ return Promise_resolve(CreateIterResultObject(undefined, true));
+ }
+
+ if (state === 'errored') {
+ return Promise_reject(GET_PRIVATE(reader, 'ReadableStreamReader#storedError'));
+ }
+
+ const ownerReadableStream = GET_PRIVATE(reader, 'ReadableStreamReader#ownerReadableStream');
+ const queue = GET_PRIVATE(ownerReadableStream, 'ReadableStream#queue');
+ if (queue.length > 0) {
+ const chunk = DequeueValue(queue);
+
+ if (GET_PRIVATE(ownerReadableStream, 'ReadableStream#closeRequested') === true && queue.length === 0) {
+ FinishClosingReadableStream(ownerReadableStream);
+ } else {
+ RequestReadableStreamPull(ownerReadableStream);
+ }
+
+ return Promise_resolve(CreateIterResultObject(chunk, false));
+ } else {
+ const readRequest = {};
+ readRequest[kPromise] = new Promise(function (resolve, reject) {
+ readRequest[kResolve] = resolve;
+ readRequest[kReject] = reject;
+ });
+
+ pushArray(GET_PRIVATE(reader, 'ReadableStreamReader#readRequests'), readRequest);
+ RequestReadableStreamPull(ownerReadableStream);
+ return readRequest[kPromise];
+ }
+ }
+
+ function ReleaseReadableStreamReader(reader) {
+ const ownerReadableStream = GET_PRIVATE(reader, 'ReadableStreamReader#ownerReadableStream');
+ if (GET_PRIVATE(ownerReadableStream, 'ReadableStream#state') === 'errored') {
+ SET_PRIVATE(reader, 'ReadableStreamReader#state', 'errored');
+
+ const e = GET_PRIVATE(ownerReadableStream, 'ReadableStream#storedError');
+ SET_PRIVATE(reader, 'ReadableStreamReader#storedError', e);
+ GET_PRIVATE(reader, 'ReadableStreamReader#closedPromise_reject')(e);
+
+ for (const readRequest of GET_PRIVATE(reader, 'ReadableStreamReader#readRequests')) {
+ readRequest[kReject](e);
+ }
+ } else {
+ SET_PRIVATE(reader, 'ReadableStreamReader#state', 'closed');
+ GET_PRIVATE(reader, 'ReadableStreamReader#closedPromise_resolve')(undefined);
+
+ for (const readRequest of GET_PRIVATE(reader, 'ReadableStreamReader#readRequests')) {
+ readRequest[kResolve](CreateIterResultObject(undefined, true));
+ }
+ }
+
+ SET_PRIVATE(reader, 'ReadableStreamReader#readRequests', []);
+ SET_PRIVATE(ownerReadableStream, 'ReadableStream#reader', undefined);
+ SET_PRIVATE(reader, 'ReadableStreamReader#ownerReadableStream', undefined);
+ }
+
+ function RequestReadableStreamPull(stream) {
+ const shouldPull = ShouldReadableStreamPull(stream);
+ if (shouldPull === false) {
+ return undefined;
+ }
+
+ if (GET_PRIVATE(stream, 'ReadableStream#pulling') === true) {
+ SET_PRIVATE(stream, 'ReadableStream#pullAgain', true);
+ return undefined;
+ }
+
+ SET_PRIVATE(stream, 'ReadableStream#pulling', true);
+
+ const underlyingSource = GET_PRIVATE(stream, 'ReadableStream#underlyingSource');
+ const controller = GET_PRIVATE(stream, 'ReadableStream#controller');
+ const pullPromise = PromiseInvokeOrNoop(underlyingSource, 'pull', [controller]);
+
+ thenPromise(pullPromise,
+ function () {
+ SET_PRIVATE(stream, 'ReadableStream#pulling', false);
+
+ if (GET_PRIVATE(stream, 'ReadableStream#pullAgain') === true) {
+ SET_PRIVATE(stream, 'ReadableStream#pullAgain', false);
+ return RequestReadableStreamPull(stream);
+ }
+ },
+ function (e) {
+ if (GET_PRIVATE(stream, 'ReadableStream#state') === 'readable') {
+ return ErrorReadableStream(stream, e);
+ }
+ }
+ );
+ }
+
+ function ShouldReadableStreamPull(stream) {
+ const state = GET_PRIVATE(stream, 'ReadableStream#state');
+ if (state === 'closed' || state === 'errored') {
+ return false;
+ }
+
+ if (GET_PRIVATE(stream, 'ReadableStream#closeRequested') === true) {
+ return false;
+ }
+
+ if (GET_PRIVATE(stream, 'ReadableStream#started') === false) {
+ return false;
+ }
+
+ if (IsReadableStreamLocked(stream) === true) {
+ const reader = GET_PRIVATE(stream, 'ReadableStream#reader');
+ const readRequests = GET_PRIVATE(reader, 'ReadableStreamReader#readRequests');
+ if (readRequests.length > 0) {
+ return true;
+ }
+ }
+
+ const desiredSize = GetReadableStreamDesiredSize(stream);
+ if (desiredSize < 0) {
+ return true;
+ }
+
+ return false;
+ }
+
+ // TODO TeeReadableStream
+
+
+
+ //
+ // Queue-with-sizes
+ //
+
+ // TODO(domenic): manipulating arrays seems fraught with peril in general; e.g. if someone defines getters/setters
+ // on the prototype chain, we can no longer shift and push.
+
+ function DequeueValue(queue) {
+ return shiftArray(queue).value;
+ }
+
+ function EnqueueValueWithSize(queue, value, size) {
+ size = Number(size);
+ if (Number_isNaN(size) || size === +Infinity || size === -Infinity) {
+ throw new RangeError('size must be a finite, non-NaN number.');
+ }
+
+ // TODO(domenic): is adding numbers safe? Overridden valueOf could ruin our day.
+ queue[kQueueSize] += size;
+
+ pushArray(queue, { value, size });
+ }
+
+ function GetTotalQueueSize(queue) {
+ return queue[kQueueSize];
+ }
+
+ //
+ // Other helpers
+ //
+
+ function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
+ if (size !== undefined && typeof size !== 'function') {
+ throw new TypeError('size property of a queuing strategy must be a function');
+ }
+
+ highWaterMark = Number(highWaterMark);
+ if (Number_isNaN(highWaterMark)) {
+ throw new TypeError('highWaterMark property of a queuing strategy must be convertible to a non-NaN number');
+ }
+ if (highWaterMark < 0) {
+ throw new RangeError('highWaterMark property of a queuing strategy must be nonnegative');
+ }
+
+ return { size, highWaterMark };
+ }
+
+ function InvokeOrNoop(O, P, args) {
+ const method = O[P];
+ if (method === undefined) {
+ return undefined;
+ }
+ return applyFunction(method, O, args);
+ }
+
+
+ function PromiseInvokeOrNoop(O, P, args) {
+ let method;
+ try {
+ method = O[P];
+ } catch (methodE) {
+ return Promise_reject(methodE);
+ }
+
+ if (method === undefined) {
+ return Promise_resolve(undefined);
+ }
+
+ try {
+ return Promise_resolve(applyFunction(method, O, args));
+ } catch (e) {
+ return Promise_reject(e);
+ }
+ }
+
+ function CreateIterResultObject(value, done) {
+ return { value, done };
+ }
+
+
+ //
+ // Exports
+ //
+
+ Object.defineProperty(global, 'ReadableStream', {
+ enumerable: false,
+ writable: true,
+ configurable: true,
+ value: ReadableStream
+ });
+}(this));
« no previous file with comments | « Source/platform/streams/ReadableStream.cpp ('k') | Source/platform/streams/WebStreams.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698