| Index: third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
|
| diff --git a/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp b/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
|
| index ae6657134064e61f76e26acb2dc8949658ff20af..a02c20617351eedde1d179d073390be56084294b 100644
|
| --- a/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
|
| +++ b/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
|
| @@ -14,7 +14,6 @@
|
| #include "modules/fetch/Body.h"
|
| #include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
|
| #include "modules/fetch/DataConsumerHandleUtil.h"
|
| -#include "modules/fetch/DataConsumerTee.h"
|
| #include "modules/fetch/ReadableStreamDataConsumerHandle.h"
|
| #include "platform/blob/BlobData.h"
|
| #include "platform/network/EncodedFormData.h"
|
| @@ -83,10 +82,14 @@ private:
|
| };
|
|
|
| BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<FetchDataConsumerHandle> handle)
|
| + : BodyStreamBuffer(scriptState, new BytesConsumerForDataConsumerHandle(scriptState->getExecutionContext(), std::move(handle)))
|
| +{
|
| +}
|
| +
|
| +BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, BytesConsumer* consumer)
|
| : UnderlyingSourceBase(scriptState)
|
| , m_scriptState(scriptState)
|
| - , m_handle(std::move(handle))
|
| - , m_reader(m_handle->obtainFetchDataReader(this))
|
| + , m_consumer(consumer)
|
| , m_madeFromReadableStream(false)
|
| {
|
| v8::Local<v8::Value> bodyValue = toV8(this, scriptState);
|
| @@ -98,6 +101,7 @@ BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<Fet
|
| scriptState, this, ReadableStreamOperations::createCountQueuingStrategy(scriptState, 0));
|
| DCHECK(!readableStream.isEmpty());
|
| V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBodyStream(scriptState->isolate()), readableStream.v8Value());
|
| + m_consumer->setClient(this);
|
| }
|
|
|
| BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, ScriptValue stream)
|
| @@ -124,7 +128,7 @@ ScriptValue BodyStreamBuffer::stream()
|
| return ScriptValue(m_scriptState.get(), V8HiddenValue::getHiddenValue(m_scriptState.get(), body, V8HiddenValue::internalBodyStream(m_scriptState->isolate())));
|
| }
|
|
|
| -PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::BlobSizePolicy policy)
|
| +PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(BytesConsumer::BlobSizePolicy policy)
|
| {
|
| ASSERT(!isStreamLocked());
|
| ASSERT(!isStreamDisturbed());
|
| @@ -134,7 +138,7 @@ PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataCons
|
| if (m_madeFromReadableStream)
|
| return nullptr;
|
|
|
| - RefPtr<BlobDataHandle> blobDataHandle = m_reader->drainAsBlobDataHandle(policy);
|
| + RefPtr<BlobDataHandle> blobDataHandle = m_consumer->drainAsBlobDataHandle(policy);
|
| if (blobDataHandle) {
|
| closeAndLockAndDisturb();
|
| return blobDataHandle.release();
|
| @@ -152,7 +156,7 @@ PassRefPtr<EncodedFormData> BodyStreamBuffer::drainAsFormData()
|
| if (m_madeFromReadableStream)
|
| return nullptr;
|
|
|
| - RefPtr<EncodedFormData> formData = m_reader->drainAsFormData();
|
| + RefPtr<EncodedFormData> formData = m_consumer->drainAsFormData();
|
| if (formData) {
|
| closeAndLockAndDisturb();
|
| return formData.release();
|
| @@ -164,9 +168,8 @@ void BodyStreamBuffer::startLoading(FetchDataLoader* loader, FetchDataLoader::Cl
|
| {
|
| ASSERT(!m_loader);
|
| ASSERT(m_scriptState->contextIsValid());
|
| - std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle();
|
| m_loader = loader;
|
| - loader->start(new BytesConsumerForDataConsumerHandle(getExecutionContext(), std::move(handle)), new LoaderClient(m_scriptState->getExecutionContext(), this, client));
|
| + loader->start(releaseHandle(), new LoaderClient(m_scriptState->getExecutionContext(), this, client));
|
| }
|
|
|
| void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch2)
|
| @@ -183,11 +186,11 @@ void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch
|
| *branch2 = new BodyStreamBuffer(m_scriptState.get(), stream2);
|
| return;
|
| }
|
| - std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle();
|
| - std::unique_ptr<FetchDataConsumerHandle> handle1, handle2;
|
| - DataConsumerTee::create(m_scriptState->getExecutionContext(), std::move(handle), &handle1, &handle2);
|
| - *branch1 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle1));
|
| - *branch2 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle2));
|
| + BytesConsumer* dest1 = nullptr;
|
| + BytesConsumer* dest2 = nullptr;
|
| + BytesConsumer::tee(m_scriptState->getExecutionContext(), releaseHandle(), &dest1, &dest2);
|
| + *branch1 = new BodyStreamBuffer(m_scriptState.get(), dest1);
|
| + *branch2 = new BodyStreamBuffer(m_scriptState.get(), dest2);
|
| }
|
|
|
| ScriptPromise BodyStreamBuffer::pull(ScriptState* scriptState)
|
| @@ -207,28 +210,19 @@ ScriptPromise BodyStreamBuffer::cancel(ScriptState* scriptState, ScriptValue rea
|
| return ScriptPromise::castUndefined(scriptState);
|
| }
|
|
|
| -void BodyStreamBuffer::didGetReadable()
|
| +void BodyStreamBuffer::onStateChange()
|
| {
|
| - if (!m_reader || !getExecutionContext() || getExecutionContext()->activeDOMObjectsAreStopped())
|
| + if (!m_consumer || !getExecutionContext() || getExecutionContext()->activeDOMObjectsAreStopped())
|
| return;
|
|
|
| - if (!m_streamNeedsMore) {
|
| - // Perform zero-length read to call close()/error() early.
|
| - size_t readSize;
|
| - WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, WebDataConsumerHandle::FlagNone, &readSize);
|
| - switch (result) {
|
| - case WebDataConsumerHandle::Ok:
|
| - case WebDataConsumerHandle::ShouldWait:
|
| - return;
|
| - case WebDataConsumerHandle::Done:
|
| - close();
|
| - return;
|
| - case WebDataConsumerHandle::Busy:
|
| - case WebDataConsumerHandle::ResourceExhausted:
|
| - case WebDataConsumerHandle::UnexpectedError:
|
| - error();
|
| - return;
|
| - }
|
| + switch (m_consumer->getPublicState()) {
|
| + case BytesConsumer::PublicState::ReadableOrWaiting:
|
| + break;
|
| + case BytesConsumer::PublicState::Closed:
|
| + close();
|
| + return;
|
| + case BytesConsumer::PublicState::Errored:
|
| + error();
|
| return;
|
| }
|
| processData();
|
| @@ -243,8 +237,7 @@ bool BodyStreamBuffer::hasPendingActivity() const
|
|
|
| void BodyStreamBuffer::stop()
|
| {
|
| - m_reader = nullptr;
|
| - m_handle = nullptr;
|
| + cancelConsumer();
|
| UnderlyingSourceBase::stop();
|
| }
|
|
|
| @@ -295,28 +288,37 @@ void BodyStreamBuffer::closeAndLockAndDisturb()
|
| void BodyStreamBuffer::close()
|
| {
|
| controller()->close();
|
| - m_reader = nullptr;
|
| - m_handle = nullptr;
|
| + cancelConsumer();
|
| }
|
|
|
| void BodyStreamBuffer::error()
|
| {
|
| controller()->error(DOMException::create(NetworkError, "network error"));
|
| - m_reader = nullptr;
|
| - m_handle = nullptr;
|
| + cancelConsumer();
|
| +}
|
| +
|
| +void BodyStreamBuffer::cancelConsumer()
|
| +{
|
| + if (m_consumer) {
|
| + m_consumer->cancel();
|
| + m_consumer = nullptr;
|
| + }
|
| }
|
|
|
| void BodyStreamBuffer::processData()
|
| {
|
| - ASSERT(m_reader);
|
| + DCHECK(m_consumer);
|
| while (m_streamNeedsMore) {
|
| - const void* buffer;
|
| - size_t available;
|
| - WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
|
| - switch (result) {
|
| - case WebDataConsumerHandle::Ok: {
|
| - DOMUint8Array* array = DOMUint8Array::create(static_cast<const unsigned char*>(buffer), available);
|
| - m_reader->endRead(available);
|
| + const char* buffer = nullptr;
|
| + size_t available = 0;
|
| +
|
| + switch (m_consumer->beginRead(&buffer, &available)) {
|
| + case BytesConsumer::Result::Ok: {
|
| + DOMUint8Array* array = DOMUint8Array::create(reinterpret_cast<const unsigned char*>(buffer), available);
|
| + if (m_consumer->endRead(available) != BytesConsumer::Result::Ok) {
|
| + error();
|
| + return;
|
| + }
|
| // Clear m_streamNeedsMore in order to detect a pull call.
|
| m_streamNeedsMore = false;
|
| controller()->enqueue(array);
|
| @@ -327,16 +329,12 @@ void BodyStreamBuffer::processData()
|
| m_streamNeedsMore = controller()->desiredSize() > 0;
|
| break;
|
| }
|
| - case WebDataConsumerHandle::Done:
|
| - close();
|
| + case BytesConsumer::Result::ShouldWait:
|
| return;
|
| -
|
| - case WebDataConsumerHandle::ShouldWait:
|
| + case BytesConsumer::Result::Done:
|
| + close();
|
| return;
|
| -
|
| - case WebDataConsumerHandle::Busy:
|
| - case WebDataConsumerHandle::ResourceExhausted:
|
| - case WebDataConsumerHandle::UnexpectedError:
|
| + case BytesConsumer::Result::Error:
|
| error();
|
| return;
|
| }
|
| @@ -357,7 +355,7 @@ void BodyStreamBuffer::stopLoading()
|
| m_loader = nullptr;
|
| }
|
|
|
| -std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
|
| +BytesConsumer* BodyStreamBuffer::releaseHandle()
|
| {
|
| DCHECK(!isStreamLocked());
|
| DCHECK(!isStreamDisturbed());
|
| @@ -372,25 +370,26 @@ std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
|
| // , we don't need to keep the reader explicitly.
|
| NonThrowableExceptionState exceptionState;
|
| ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.get(), stream(), exceptionState);
|
| - return ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader);
|
| + return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutionContext(), ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader));
|
| }
|
| // We need to call these before calling closeAndLockAndDisturb.
|
| const bool isClosed = isStreamClosed();
|
| const bool isErrored = isStreamErrored();
|
| - std::unique_ptr<FetchDataConsumerHandle> handle = std::move(m_handle);
|
| + BytesConsumer* consumer = m_consumer.release();
|
|
|
| closeAndLockAndDisturb();
|
|
|
| if (isClosed) {
|
| // Note that the stream cannot be "draining", because it doesn't have
|
| // the internal buffer.
|
| - return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle());
|
| + return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutionContext(), createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()));
|
| }
|
| if (isErrored)
|
| - return createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle());
|
| + return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutionContext(), createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle()));
|
|
|
| - DCHECK(handle);
|
| - return handle;
|
| + DCHECK(consumer);
|
| + consumer->clearClient();
|
| + return consumer;
|
| }
|
|
|
| } // namespace blink
|
|
|