Chromium Code Reviews| Index: third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp |
| diff --git a/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp b/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp |
| index 11b50f742a10d067ed13b596b2655d051a6fef77..ead09bb7b0976fb911441c2be1cb21cb132e8d3a 100644 |
| --- a/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp |
| +++ b/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp |
| @@ -16,7 +16,6 @@ |
| #include "modules/fetch/Body.h" |
| #include "modules/fetch/BytesConsumerForDataConsumerHandle.h" |
| #include "modules/fetch/DataConsumerHandleUtil.h" |
| -#include "modules/fetch/DataConsumerTee.h" |
| #include "modules/fetch/ReadableStreamDataConsumerHandle.h" |
| #include "platform/blob/BlobData.h" |
| #include "platform/network/EncodedFormData.h" |
| @@ -99,22 +98,24 @@ private: |
| }; |
| BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<FetchDataConsumerHandle> handle) |
| + : BodyStreamBuffer(scriptState, new BytesConsumerForDataConsumerHandle(std::move(handle))) |
| +{ |
| +} |
| + |
| +BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, BytesConsumer* consumer) |
| : UnderlyingSourceBase(scriptState) |
| , m_scriptState(scriptState) |
| - , m_handle(std::move(handle)) |
| - , m_reader(m_handle->obtainFetchDataReader(this)) |
| + , m_consumer(consumer) |
| , m_madeFromReadableStream(false) |
| { |
| if (isTerminating(scriptState)) { |
| - m_reader = nullptr; |
| - m_handle = nullptr; |
| + m_consumer->cancel(); |
| return; |
| } |
| v8::Local<v8::Value> bodyValue = toV8(this, scriptState); |
| if (bodyValue.IsEmpty()) { |
| DCHECK(isTerminating(scriptState)); |
| - m_reader = nullptr; |
| - m_handle = nullptr; |
| + m_consumer->cancel(); |
| return; |
| } |
| DCHECK(bodyValue->IsObject()); |
| @@ -123,11 +124,11 @@ BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<Fet |
| ScriptValue readableStream = ReadableStreamOperations::createReadableStream( |
| scriptState, this, ReadableStreamOperations::createCountQueuingStrategy(scriptState, 0)); |
| if (isTerminating(scriptState)) { |
| - m_reader = nullptr; |
| - m_handle = nullptr; |
| + m_consumer->cancel(); |
| return; |
| } |
| V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBodyStream(scriptState->isolate()), readableStream.v8Value()); |
| + m_consumer->setClient(this); |
| } |
| BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, ScriptValue stream) |
| @@ -164,7 +165,7 @@ ScriptValue BodyStreamBuffer::stream() |
| return ScriptValue(m_scriptState.get(), V8HiddenValue::getHiddenValue(m_scriptState.get(), body, V8HiddenValue::internalBodyStream(m_scriptState->isolate()))); |
| } |
| -PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::BlobSizePolicy policy) |
| +PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(BytesConsumer::BlobSizePolicy policy) |
| { |
| ASSERT(!isStreamLocked()); |
| ASSERT(!isStreamDisturbed()); |
| @@ -174,7 +175,7 @@ PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataCons |
| if (m_madeFromReadableStream) |
| return nullptr; |
| - RefPtr<BlobDataHandle> blobDataHandle = m_reader->drainAsBlobDataHandle(policy); |
| + RefPtr<BlobDataHandle> blobDataHandle = m_consumer->drainAsBlobDataHandle(policy); |
| if (blobDataHandle) { |
| closeAndLockAndDisturb(); |
| return blobDataHandle.release(); |
| @@ -192,7 +193,7 @@ PassRefPtr<EncodedFormData> BodyStreamBuffer::drainAsFormData() |
| if (m_madeFromReadableStream) |
| return nullptr; |
| - RefPtr<EncodedFormData> formData = m_reader->drainAsFormData(); |
| + RefPtr<EncodedFormData> formData = m_consumer->drainAsFormData(); |
| if (formData) { |
| closeAndLockAndDisturb(); |
| return formData.release(); |
| @@ -204,9 +205,8 @@ void BodyStreamBuffer::startLoading(FetchDataLoader* loader, FetchDataLoader::Cl |
| { |
| ASSERT(!m_loader); |
| ASSERT(m_scriptState->contextIsValid()); |
| - std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle(); |
| m_loader = loader; |
| - loader->start(new BytesConsumerForDataConsumerHandle(std::move(handle)), new LoaderClient(m_scriptState->getExecutionContext(), this, client)); |
| + loader->start(releaseHandle(), new LoaderClient(m_scriptState->getExecutionContext(), this, client)); |
| } |
| void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch2) |
| @@ -223,11 +223,11 @@ void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch |
| *branch2 = new BodyStreamBuffer(m_scriptState.get(), stream2); |
| return; |
| } |
| - std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle(); |
| - std::unique_ptr<FetchDataConsumerHandle> handle1, handle2; |
| - DataConsumerTee::create(m_scriptState->getExecutionContext(), std::move(handle), &handle1, &handle2); |
| - *branch1 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle1)); |
| - *branch2 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle2)); |
| + BytesConsumer* dest1 = nullptr; |
| + BytesConsumer* dest2 = nullptr; |
| + BytesConsumer::tee(m_scriptState->getExecutionContext(), releaseHandle(), &dest1, &dest2); |
| + *branch1 = new BodyStreamBuffer(m_scriptState.get(), dest1); |
| + *branch2 = new BodyStreamBuffer(m_scriptState.get(), dest2); |
| } |
| ScriptPromise BodyStreamBuffer::pull(ScriptState* scriptState) |
| @@ -247,28 +247,19 @@ ScriptPromise BodyStreamBuffer::cancel(ScriptState* scriptState, ScriptValue rea |
| return ScriptPromise::castUndefined(scriptState); |
| } |
| -void BodyStreamBuffer::didGetReadable() |
| +void BodyStreamBuffer::onStateChange() |
| { |
| - if (!m_reader || !getExecutionContext() || getExecutionContext()->activeDOMObjectsAreStopped()) |
| + if (!m_consumer || !getExecutionContext() || getExecutionContext()->activeDOMObjectsAreStopped()) |
| return; |
| - if (!m_streamNeedsMore) { |
| - // Perform zero-length read to call close()/error() early. |
| - size_t readSize; |
| - WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, WebDataConsumerHandle::FlagNone, &readSize); |
| - switch (result) { |
| - case WebDataConsumerHandle::Ok: |
| - case WebDataConsumerHandle::ShouldWait: |
| - return; |
| - case WebDataConsumerHandle::Done: |
| - close(); |
| - return; |
| - case WebDataConsumerHandle::Busy: |
| - case WebDataConsumerHandle::ResourceExhausted: |
| - case WebDataConsumerHandle::UnexpectedError: |
| - error(); |
| - return; |
| - } |
| + switch (m_consumer->getPublicState()) { |
| + case BytesConsumer::PublicState::ReadableOrWaiting: |
| + break; |
| + case BytesConsumer::PublicState::Closed: |
| + close(); |
| + return; |
| + case BytesConsumer::PublicState::Errored: |
| + error(); |
| return; |
| } |
| processData(); |
| @@ -283,8 +274,9 @@ bool BodyStreamBuffer::hasPendingActivity() const |
| void BodyStreamBuffer::stop() |
| { |
| - m_reader = nullptr; |
| - m_handle = nullptr; |
| + if (m_consumer) |
| + m_consumer->cancel(); |
| + m_consumer = nullptr; |
| UnderlyingSourceBase::stop(); |
| } |
| @@ -335,28 +327,32 @@ void BodyStreamBuffer::closeAndLockAndDisturb() |
| void BodyStreamBuffer::close() |
| { |
| controller()->close(); |
| - m_reader = nullptr; |
| - m_handle = nullptr; |
| + if (m_consumer) { |
| + m_consumer->cancel(); |
| + m_consumer = nullptr; |
| + } |
| } |
| void BodyStreamBuffer::error() |
| { |
| controller()->error(DOMException::create(NetworkError, "network error")); |
| - m_reader = nullptr; |
| - m_handle = nullptr; |
| + if (m_consumer) { |
| + m_consumer->cancel(); |
| + m_consumer = nullptr; |
| + } |
| } |
| void BodyStreamBuffer::processData() |
| { |
| - ASSERT(m_reader); |
| + DCHECK(m_consumer); |
| while (m_streamNeedsMore) { |
| - const void* buffer; |
| - size_t available; |
| - WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available); |
| - switch (result) { |
| - case WebDataConsumerHandle::Ok: { |
| - DOMUint8Array* array = DOMUint8Array::create(static_cast<const unsigned char*>(buffer), available); |
| - m_reader->endRead(available); |
| + const char* buffer = nullptr; |
| + size_t available = 0; |
| + |
| + switch (m_consumer->beginRead(&buffer, &available)) { |
| + case BytesConsumer::Result::Ok: { |
| + DOMUint8Array* array = DOMUint8Array::create(reinterpret_cast<const unsigned char*>(buffer), available); |
| + m_consumer->endRead(available); |
|
hiroshige
2016/09/07 09:25:54
We have to check the return value of endRead().
yhirano
2016/09/08 01:41:05
Done.
|
| // Clear m_streamNeedsMore in order to detect a pull call. |
| m_streamNeedsMore = false; |
| controller()->enqueue(array); |
| @@ -367,16 +363,12 @@ void BodyStreamBuffer::processData() |
| m_streamNeedsMore = controller()->desiredSize() > 0; |
| break; |
| } |
| - case WebDataConsumerHandle::Done: |
| - close(); |
| + case BytesConsumer::Result::ShouldWait: |
| return; |
| - |
| - case WebDataConsumerHandle::ShouldWait: |
| + case BytesConsumer::Result::Done: |
| + close(); |
| return; |
| - |
| - case WebDataConsumerHandle::Busy: |
| - case WebDataConsumerHandle::ResourceExhausted: |
| - case WebDataConsumerHandle::UnexpectedError: |
| + case BytesConsumer::Result::Error: |
| error(); |
| return; |
| } |
| @@ -397,7 +389,7 @@ void BodyStreamBuffer::stopLoading() |
| m_loader = nullptr; |
| } |
| -std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle() |
| +BytesConsumer* BodyStreamBuffer::releaseHandle() |
| { |
| DCHECK(!isStreamLocked()); |
| DCHECK(!isStreamDisturbed()); |
| @@ -412,25 +404,26 @@ std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle() |
| // , we don't need to keep the reader explicitly. |
| NonThrowableExceptionState exceptionState; |
| ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.get(), stream(), exceptionState); |
| - return ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader); |
| + return new BytesConsumerForDataConsumerHandle(ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader)); |
| } |
| // We need to call these before calling closeAndLockAndDisturb. |
| const bool isClosed = isStreamClosed(); |
| const bool isErrored = isStreamErrored(); |
| - std::unique_ptr<FetchDataConsumerHandle> handle = std::move(m_handle); |
| + BytesConsumer* consumer = m_consumer.release(); |
| closeAndLockAndDisturb(); |
| if (isClosed) { |
| // Note that the stream cannot be "draining", because it doesn't have |
| // the internal buffer. |
| - return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()); |
| + return new BytesConsumerForDataConsumerHandle(createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle())); |
| } |
| if (isErrored) |
| - return createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle()); |
| + return new BytesConsumerForDataConsumerHandle(createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle())); |
| - DCHECK(handle); |
| - return handle; |
| + DCHECK(consumer); |
| + consumer->clearClient(); |
| + return consumer; |
| } |
| } // namespace blink |