Index: Source/core/streams/ReadableStream.cpp |
diff --git a/Source/core/streams/ReadableStream.cpp b/Source/core/streams/ReadableStream.cpp |
index a6e1f10fa9767eb2a3ad00b9ebe22a74e8a4bc63..26f8f71748648d5f6f2348eae0ab573fde007ed0 100644 |
--- a/Source/core/streams/ReadableStream.cpp |
+++ b/Source/core/streams/ReadableStream.cpp |
@@ -16,12 +16,30 @@ |
namespace blink { |
+namespace { |
+ |
+class ConstUndefined : public ScriptFunction { |
+public: |
+ static v8::Handle<v8::Function> create(ScriptState* scriptState) |
+ { |
+ return (new ConstUndefined(scriptState))->bindToV8Function(); |
+ } |
+ |
+private: |
+ explicit ConstUndefined(ScriptState* scriptState) : ScriptFunction(scriptState) { } |
+ ScriptValue call(ScriptValue value) override |
+ { |
+ return ScriptValue(scriptState(), v8::Undefined(scriptState()->isolate())); |
+ } |
+}; |
+ |
+} // namespace |
+ |
ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSource* source) |
: m_source(source) |
, m_isStarted(false) |
, m_isDraining(false) |
, m_isPulling(false) |
- , m_isSchedulingPull(false) |
, m_state(Waiting) |
, m_wait(new WaitPromise(executionContext, this, WaitPromise::Ready)) |
, m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed)) |
@@ -50,10 +68,12 @@ String ReadableStream::stateString() const |
bool ReadableStream::enqueuePreliminaryCheck(size_t chunkSize) |
{ |
+ // This is a bit different from what spec says: it says we should throw |
+ // an exception here. But sometimes a caller is not in any JavaScript |
+ // context, and we don't want to throw an exception in such a case. |
if (m_state == Errored || m_state == Closed || m_isDraining) |
return false; |
- // FIXME: Query strategy. |
return true; |
} |
@@ -61,15 +81,15 @@ bool ReadableStream::enqueuePostAction(size_t totalQueueSize) |
{ |
m_isPulling = false; |
- // FIXME: Set needsMore correctly. |
- bool needsMore = true; |
+ // FIXME: Set |shouldApplyBackpressure| correctly. |
+ bool shouldApplyBackpressure = false; |
if (m_state == Waiting) { |
m_state = Readable; |
m_wait->resolve(V8UndefinedType()); |
} |
- return needsMore; |
+ return !shouldApplyBackpressure; |
} |
void ReadableStream::close() |
@@ -105,47 +125,34 @@ void ReadableStream::readPostAction() |
if (isQueueEmpty()) { |
if (m_isDraining) { |
m_state = Closed; |
- m_wait->reset(); |
- m_wait->resolve(V8UndefinedType()); |
m_closed->resolve(V8UndefinedType()); |
} else { |
m_state = Waiting; |
m_wait->reset(); |
- callOrSchedulePull(); |
} |
} |
+ callPullIfNeeded(); |
} |
ScriptPromise ReadableStream::wait(ScriptState* scriptState) |
{ |
- if (m_state == Waiting) |
- callOrSchedulePull(); |
return m_wait->promise(scriptState->world()); |
} |
ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reason) |
{ |
- if (m_state == Errored) { |
- RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState); |
- ScriptPromise promise = resolver->promise(); |
- resolver->reject(m_exception); |
- return promise; |
- } |
if (m_state == Closed) |
return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate())); |
+ if (m_state == Errored) |
+ return ScriptPromise::rejectWithDOMException(scriptState, m_exception); |
- if (m_state == Waiting) { |
- m_wait->resolve(V8UndefinedType()); |
- } else { |
- ASSERT(m_state == Readable); |
- m_wait->reset(); |
+ ASSERT(m_state == Readable || m_state == Waiting); |
+ if (m_state == Waiting) |
m_wait->resolve(V8UndefinedType()); |
- } |
- |
clearQueue(); |
m_state = Closed; |
m_closed->resolve(V8UndefinedType()); |
- return m_source->cancelSource(scriptState, reason); |
+ return m_source->cancelSource(scriptState, reason).then(ConstUndefined::create(scriptState)); |
} |
ScriptPromise ReadableStream::closed(ScriptState* scriptState) |
@@ -155,36 +162,42 @@ ScriptPromise ReadableStream::closed(ScriptState* scriptState) |
void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception) |
{ |
- if (m_state == Readable) { |
+ switch (m_state) { |
+ case Waiting: |
+ m_state = Errored; |
+ m_exception = exception; |
+ m_wait->reject(m_exception); |
+ m_closed->reject(m_exception); |
+ break; |
+ case Readable: |
clearQueue(); |
- m_wait->reset(); |
- } |
- |
- if (m_state == Waiting || m_state == Readable) { |
m_state = Errored; |
m_exception = exception; |
- if (m_wait->state() == m_wait->Pending) |
- m_wait->reject(m_exception); |
+ m_wait->reset(); |
+ m_wait->reject(m_exception); |
m_closed->reject(m_exception); |
+ break; |
+ default: |
+ break; |
} |
} |
void ReadableStream::didSourceStart() |
{ |
m_isStarted = true; |
- if (m_isSchedulingPull) |
- m_source->pullSource(); |
+ callPullIfNeeded(); |
} |
-void ReadableStream::callOrSchedulePull() |
+void ReadableStream::callPullIfNeeded() |
{ |
- if (m_isPulling) |
+ if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_state == Errored) |
+ return; |
+ // FIXME: Set shouldApplyBackpressure correctly. |
+ bool shouldApplyBackpressure = false; |
+ if (shouldApplyBackpressure) |
return; |
m_isPulling = true; |
- if (m_isStarted) |
- m_source->pullSource(); |
- else |
- m_isSchedulingPull = true; |
+ m_source->pullSource(); |
} |
void ReadableStream::trace(Visitor* visitor) |