Index: Source/core/streams/ReadableStream.cpp |
diff --git a/Source/core/streams/ReadableStream.cpp b/Source/core/streams/ReadableStream.cpp |
index 0f1a80cf61547873af993ab1d7c6070e855e8280..712a87f425492e3241fa5f75b0a0286147efcad4 100644 |
--- a/Source/core/streams/ReadableStream.cpp |
+++ b/Source/core/streams/ReadableStream.cpp |
@@ -12,6 +12,7 @@ |
#include "core/dom/DOMException.h" |
#include "core/dom/ExceptionCode.h" |
#include "core/dom/ExecutionContext.h" |
+#include "core/streams/ExclusiveStreamReader.h" |
#include "core/streams/UnderlyingSource.h" |
namespace blink { |
@@ -33,6 +34,32 @@ private: |
} |
}; |
+class ResolveWithReady : public ScriptFunction { |
+public: |
+ static v8::Handle<v8::Function> create(ScriptState* scriptState, ReadableStream* stream) |
+ { |
+ return (new ResolveWithReady(scriptState, stream))->bindToV8Function(); |
+ } |
+ |
+ void trace(Visitor* visitor) |
+ { |
+ visitor->trace(m_stream); |
+ ScriptFunction::trace(visitor); |
+ } |
+ |
+private: |
+ ResolveWithReady(ScriptState* scriptState, ReadableStream* stream) |
+ : ScriptFunction(scriptState) |
+ , m_stream(stream) { } |
+ |
+ ScriptValue call(ScriptValue value) override |
+ { |
+ return ScriptValue(scriptState(), m_stream->ready(scriptState()).v8Value()); |
+ } |
+ |
+ Member<ReadableStream> m_stream; |
+}; |
+ |
} // namespace |
ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSource* source) |
@@ -54,18 +81,10 @@ ReadableStream::~ReadableStream() |
String ReadableStream::stateString() const |
{ |
- switch (m_state) { |
- case Readable: |
- return "readable"; |
- case Waiting: |
+ if (m_reader) |
return "waiting"; |
- case Closed: |
- return "closed"; |
- case Errored: |
- return "errored"; |
- } |
- ASSERT(false); |
- return String(); |
+ |
+ return stateToString(m_state); |
} |
bool ReadableStream::enqueuePreliminaryCheck() |
@@ -108,13 +127,15 @@ void ReadableStream::close() |
if (m_state == Waiting) { |
m_ready->resolve(ToV8UndefinedGenerator()); |
m_closed->resolve(ToV8UndefinedGenerator()); |
+ if (m_reader) |
+ m_reader->releaseLock(); |
m_state = Closed; |
} else if (m_state == Readable) { |
m_isDraining = true; |
} |
} |
-void ReadableStream::readPreliminaryCheck(ExceptionState& exceptionState) |
+void ReadableStream::readInternalPreliminaryCheck(ExceptionState& exceptionState) |
{ |
if (m_state == Waiting) { |
exceptionState.throwTypeError("read is called while state is waiting"); |
@@ -130,13 +151,15 @@ void ReadableStream::readPreliminaryCheck(ExceptionState& exceptionState) |
} |
} |
-void ReadableStream::readPostAction() |
+void ReadableStream::readInternalPostAction() |
{ |
ASSERT(m_state == Readable); |
if (isQueueEmpty()) { |
if (m_isDraining) { |
- m_closed->resolve(ToV8UndefinedGenerator()); |
m_state = Closed; |
+ m_closed->resolve(ToV8UndefinedGenerator()); |
+ if (m_reader) |
+ m_reader->releaseLock(); |
} else { |
m_ready->reset(); |
m_state = Waiting; |
@@ -145,13 +168,36 @@ void ReadableStream::readPostAction() |
callPullIfNeeded(); |
} |
+ScriptValue ReadableStream::read(ScriptState* scriptState, ExceptionState& exceptionState) |
+{ |
+ if (m_reader) { |
+ exceptionState.throwTypeError("this stream is locked to an ExclusiveStreamReader"); |
+ return ScriptValue(); |
+ } |
+ return readInternal(scriptState, exceptionState); |
+} |
+ |
ScriptPromise ReadableStream::ready(ScriptState* scriptState) |
{ |
+ if (m_reader) { |
+ return m_reader->released(scriptState).then(ResolveWithReady::create(scriptState, this)); |
+ } |
+ |
+ if (m_state == Waiting) { |
+ return readyInternal(scriptState).then(ResolveWithReady::create(scriptState, this)); |
+ } |
+ return readyInternal(scriptState); |
+} |
+ |
+ScriptPromise ReadableStream::readyInternal(ScriptState* scriptState) |
+{ |
return m_ready->promise(scriptState->world()); |
} |
ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reason) |
{ |
+ if (m_reader) |
+ return ScriptPromise::reject(scriptState, V8ThrowException::createTypeError(scriptState->isolate(), "this stream is locked to an ExclusiveStreamReader")); |
if (m_state == Closed) |
return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate())); |
if (m_state == Errored) |
@@ -179,6 +225,8 @@ void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception) |
m_ready->reject(m_exception); |
m_closed->reject(m_exception); |
m_state = Errored; |
+ if (m_reader) |
+ m_reader->releaseLock(); |
break; |
case Readable: |
clearQueue(); |
@@ -187,6 +235,8 @@ void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception) |
m_ready->reject(m_exception); |
m_closed->reject(m_exception); |
m_state = Errored; |
+ if (m_reader) |
+ m_reader->releaseLock(); |
break; |
default: |
break; |
@@ -199,6 +249,29 @@ void ReadableStream::didSourceStart() |
callPullIfNeeded(); |
} |
+ExclusiveStreamReader* ReadableStream::getReader(ExceptionState& exceptionState) |
+{ |
+ if (m_state == Closed) { |
+ exceptionState.throwTypeError("this stream is already closed"); |
+ return nullptr; |
+ } |
+ if (m_state == Errored) { |
+ exceptionState.throwDOMException(m_exception->code(), m_exception->message()); |
+ return nullptr; |
+ } |
+ if (m_reader) { |
+ exceptionState.throwTypeError("already locked to an ExclusiveStreamReader"); |
+ return nullptr; |
+ } |
+ return new ExclusiveStreamReader(this); |
+} |
+ |
+void ReadableStream::setReader(ExclusiveStreamReader* reader) |
+{ |
+ ASSERT((reader && !m_reader) || (!reader && m_reader)); |
+ m_reader = reader; |
+} |
+ |
void ReadableStream::callPullIfNeeded() |
{ |
if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_state == Errored) |
@@ -217,13 +290,36 @@ bool ReadableStream::hasPendingActivity() const |
return m_state == Waiting || m_state == Readable; |
} |
+void ReadableStream::stop() |
+{ |
+ error(DOMException::create(AbortError, "execution context is stopped")); |
+ ActiveDOMObject::stop(); |
+} |
+ |
void ReadableStream::trace(Visitor* visitor) |
{ |
visitor->trace(m_source); |
visitor->trace(m_ready); |
visitor->trace(m_closed); |
visitor->trace(m_exception); |
+ visitor->trace(m_reader); |
ActiveDOMObject::trace(visitor); |
} |
+String ReadableStream::stateToString(State state) |
+{ |
+ switch (state) { |
+ case Readable: |
+ return "readable"; |
+ case Waiting: |
+ return "waiting"; |
+ case Closed: |
+ return "closed"; |
+ case Errored: |
+ return "errored"; |
+ } |
+ ASSERT(false); |
+ return String(); |
+} |
+ |
} // namespace blink |