Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1177)

Unified Diff: Source/core/streams/ReadableStream.cpp

Issue 837673002: Introduce ExclusiveStreamReader. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « Source/core/streams/ReadableStream.h ('k') | Source/core/streams/ReadableStream.idl » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: Source/core/streams/ReadableStream.cpp
diff --git a/Source/core/streams/ReadableStream.cpp b/Source/core/streams/ReadableStream.cpp
index 0f1a80cf61547873af993ab1d7c6070e855e8280..712a87f425492e3241fa5f75b0a0286147efcad4 100644
--- a/Source/core/streams/ReadableStream.cpp
+++ b/Source/core/streams/ReadableStream.cpp
@@ -12,6 +12,7 @@
#include "core/dom/DOMException.h"
#include "core/dom/ExceptionCode.h"
#include "core/dom/ExecutionContext.h"
+#include "core/streams/ExclusiveStreamReader.h"
#include "core/streams/UnderlyingSource.h"
namespace blink {
@@ -33,6 +34,32 @@ private:
}
};
+class ResolveWithReady : public ScriptFunction {
+public:
+ static v8::Handle<v8::Function> create(ScriptState* scriptState, ReadableStream* stream)
+ {
+ return (new ResolveWithReady(scriptState, stream))->bindToV8Function();
+ }
+
+ void trace(Visitor* visitor)
+ {
+ visitor->trace(m_stream);
+ ScriptFunction::trace(visitor);
+ }
+
+private:
+ ResolveWithReady(ScriptState* scriptState, ReadableStream* stream)
+ : ScriptFunction(scriptState)
+ , m_stream(stream) { }
+
+ ScriptValue call(ScriptValue value) override
+ {
+ return ScriptValue(scriptState(), m_stream->ready(scriptState()).v8Value());
+ }
+
+ Member<ReadableStream> m_stream;
+};
+
} // namespace
ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSource* source)
@@ -54,18 +81,10 @@ ReadableStream::~ReadableStream()
String ReadableStream::stateString() const
{
- switch (m_state) {
- case Readable:
- return "readable";
- case Waiting:
+ if (m_reader)
return "waiting";
- case Closed:
- return "closed";
- case Errored:
- return "errored";
- }
- ASSERT(false);
- return String();
+
+ return stateToString(m_state);
}
bool ReadableStream::enqueuePreliminaryCheck()
@@ -108,13 +127,15 @@ void ReadableStream::close()
if (m_state == Waiting) {
m_ready->resolve(ToV8UndefinedGenerator());
m_closed->resolve(ToV8UndefinedGenerator());
+ if (m_reader)
+ m_reader->releaseLock();
m_state = Closed;
} else if (m_state == Readable) {
m_isDraining = true;
}
}
-void ReadableStream::readPreliminaryCheck(ExceptionState& exceptionState)
+void ReadableStream::readInternalPreliminaryCheck(ExceptionState& exceptionState)
{
if (m_state == Waiting) {
exceptionState.throwTypeError("read is called while state is waiting");
@@ -130,13 +151,15 @@ void ReadableStream::readPreliminaryCheck(ExceptionState& exceptionState)
}
}
-void ReadableStream::readPostAction()
+void ReadableStream::readInternalPostAction()
{
ASSERT(m_state == Readable);
if (isQueueEmpty()) {
if (m_isDraining) {
- m_closed->resolve(ToV8UndefinedGenerator());
m_state = Closed;
+ m_closed->resolve(ToV8UndefinedGenerator());
+ if (m_reader)
+ m_reader->releaseLock();
} else {
m_ready->reset();
m_state = Waiting;
@@ -145,13 +168,36 @@ void ReadableStream::readPostAction()
callPullIfNeeded();
}
+ScriptValue ReadableStream::read(ScriptState* scriptState, ExceptionState& exceptionState)
+{
+ if (m_reader) {
+ exceptionState.throwTypeError("this stream is locked to an ExclusiveStreamReader");
+ return ScriptValue();
+ }
+ return readInternal(scriptState, exceptionState);
+}
+
ScriptPromise ReadableStream::ready(ScriptState* scriptState)
{
+ if (m_reader) {
+ return m_reader->released(scriptState).then(ResolveWithReady::create(scriptState, this));
+ }
+
+ if (m_state == Waiting) {
+ return readyInternal(scriptState).then(ResolveWithReady::create(scriptState, this));
+ }
+ return readyInternal(scriptState);
+}
+
+ScriptPromise ReadableStream::readyInternal(ScriptState* scriptState)
+{
return m_ready->promise(scriptState->world());
}
ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reason)
{
+ if (m_reader)
+ return ScriptPromise::reject(scriptState, V8ThrowException::createTypeError(scriptState->isolate(), "this stream is locked to an ExclusiveStreamReader"));
if (m_state == Closed)
return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate()));
if (m_state == Errored)
@@ -179,6 +225,8 @@ void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
m_ready->reject(m_exception);
m_closed->reject(m_exception);
m_state = Errored;
+ if (m_reader)
+ m_reader->releaseLock();
break;
case Readable:
clearQueue();
@@ -187,6 +235,8 @@ void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
m_ready->reject(m_exception);
m_closed->reject(m_exception);
m_state = Errored;
+ if (m_reader)
+ m_reader->releaseLock();
break;
default:
break;
@@ -199,6 +249,29 @@ void ReadableStream::didSourceStart()
callPullIfNeeded();
}
+ExclusiveStreamReader* ReadableStream::getReader(ExceptionState& exceptionState)
+{
+ if (m_state == Closed) {
+ exceptionState.throwTypeError("this stream is already closed");
+ return nullptr;
+ }
+ if (m_state == Errored) {
+ exceptionState.throwDOMException(m_exception->code(), m_exception->message());
+ return nullptr;
+ }
+ if (m_reader) {
+ exceptionState.throwTypeError("already locked to an ExclusiveStreamReader");
+ return nullptr;
+ }
+ return new ExclusiveStreamReader(this);
+}
+
+void ReadableStream::setReader(ExclusiveStreamReader* reader)
+{
+ ASSERT((reader && !m_reader) || (!reader && m_reader));
+ m_reader = reader;
+}
+
void ReadableStream::callPullIfNeeded()
{
if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_state == Errored)
@@ -217,13 +290,36 @@ bool ReadableStream::hasPendingActivity() const
return m_state == Waiting || m_state == Readable;
}
+void ReadableStream::stop()
+{
+ error(DOMException::create(AbortError, "execution context is stopped"));
+ ActiveDOMObject::stop();
+}
+
void ReadableStream::trace(Visitor* visitor)
{
visitor->trace(m_source);
visitor->trace(m_ready);
visitor->trace(m_closed);
visitor->trace(m_exception);
+ visitor->trace(m_reader);
ActiveDOMObject::trace(visitor);
}
+String ReadableStream::stateToString(State state)
+{
+ switch (state) {
+ case Readable:
+ return "readable";
+ case Waiting:
+ return "waiting";
+ case Closed:
+ return "closed";
+ case Errored:
+ return "errored";
+ }
+ ASSERT(false);
+ return String();
+}
+
} // namespace blink
« no previous file with comments | « Source/core/streams/ReadableStream.h ('k') | Source/core/streams/ReadableStream.idl » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698