Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(950)

Unified Diff: third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp

Issue 2277143002: Use BytesConsumer in BodyStreamBuffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@bytes-consumer-tee
Patch Set: fix Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
diff --git a/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp b/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
index 11b50f742a10d067ed13b596b2655d051a6fef77..ead09bb7b0976fb911441c2be1cb21cb132e8d3a 100644
--- a/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
+++ b/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
@@ -16,7 +16,6 @@
#include "modules/fetch/Body.h"
#include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
#include "modules/fetch/DataConsumerHandleUtil.h"
-#include "modules/fetch/DataConsumerTee.h"
#include "modules/fetch/ReadableStreamDataConsumerHandle.h"
#include "platform/blob/BlobData.h"
#include "platform/network/EncodedFormData.h"
@@ -99,22 +98,24 @@ private:
};
BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<FetchDataConsumerHandle> handle)
+ : BodyStreamBuffer(scriptState, new BytesConsumerForDataConsumerHandle(std::move(handle)))
+{
+}
+
+BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, BytesConsumer* consumer)
: UnderlyingSourceBase(scriptState)
, m_scriptState(scriptState)
- , m_handle(std::move(handle))
- , m_reader(m_handle->obtainFetchDataReader(this))
+ , m_consumer(consumer)
, m_madeFromReadableStream(false)
{
if (isTerminating(scriptState)) {
- m_reader = nullptr;
- m_handle = nullptr;
+ m_consumer->cancel();
return;
}
v8::Local<v8::Value> bodyValue = toV8(this, scriptState);
if (bodyValue.IsEmpty()) {
DCHECK(isTerminating(scriptState));
- m_reader = nullptr;
- m_handle = nullptr;
+ m_consumer->cancel();
return;
}
DCHECK(bodyValue->IsObject());
@@ -123,11 +124,11 @@ BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<Fet
ScriptValue readableStream = ReadableStreamOperations::createReadableStream(
scriptState, this, ReadableStreamOperations::createCountQueuingStrategy(scriptState, 0));
if (isTerminating(scriptState)) {
- m_reader = nullptr;
- m_handle = nullptr;
+ m_consumer->cancel();
return;
}
V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBodyStream(scriptState->isolate()), readableStream.v8Value());
+ m_consumer->setClient(this);
}
BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, ScriptValue stream)
@@ -164,7 +165,7 @@ ScriptValue BodyStreamBuffer::stream()
return ScriptValue(m_scriptState.get(), V8HiddenValue::getHiddenValue(m_scriptState.get(), body, V8HiddenValue::internalBodyStream(m_scriptState->isolate())));
}
-PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::BlobSizePolicy policy)
+PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(BytesConsumer::BlobSizePolicy policy)
{
ASSERT(!isStreamLocked());
ASSERT(!isStreamDisturbed());
@@ -174,7 +175,7 @@ PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataCons
if (m_madeFromReadableStream)
return nullptr;
- RefPtr<BlobDataHandle> blobDataHandle = m_reader->drainAsBlobDataHandle(policy);
+ RefPtr<BlobDataHandle> blobDataHandle = m_consumer->drainAsBlobDataHandle(policy);
if (blobDataHandle) {
closeAndLockAndDisturb();
return blobDataHandle.release();
@@ -192,7 +193,7 @@ PassRefPtr<EncodedFormData> BodyStreamBuffer::drainAsFormData()
if (m_madeFromReadableStream)
return nullptr;
- RefPtr<EncodedFormData> formData = m_reader->drainAsFormData();
+ RefPtr<EncodedFormData> formData = m_consumer->drainAsFormData();
if (formData) {
closeAndLockAndDisturb();
return formData.release();
@@ -204,9 +205,8 @@ void BodyStreamBuffer::startLoading(FetchDataLoader* loader, FetchDataLoader::Cl
{
ASSERT(!m_loader);
ASSERT(m_scriptState->contextIsValid());
- std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle();
m_loader = loader;
- loader->start(new BytesConsumerForDataConsumerHandle(std::move(handle)), new LoaderClient(m_scriptState->getExecutionContext(), this, client));
+ loader->start(releaseHandle(), new LoaderClient(m_scriptState->getExecutionContext(), this, client));
}
void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch2)
@@ -223,11 +223,11 @@ void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch
*branch2 = new BodyStreamBuffer(m_scriptState.get(), stream2);
return;
}
- std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle();
- std::unique_ptr<FetchDataConsumerHandle> handle1, handle2;
- DataConsumerTee::create(m_scriptState->getExecutionContext(), std::move(handle), &handle1, &handle2);
- *branch1 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle1));
- *branch2 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle2));
+ BytesConsumer* dest1 = nullptr;
+ BytesConsumer* dest2 = nullptr;
+ BytesConsumer::tee(m_scriptState->getExecutionContext(), releaseHandle(), &dest1, &dest2);
+ *branch1 = new BodyStreamBuffer(m_scriptState.get(), dest1);
+ *branch2 = new BodyStreamBuffer(m_scriptState.get(), dest2);
}
ScriptPromise BodyStreamBuffer::pull(ScriptState* scriptState)
@@ -247,28 +247,19 @@ ScriptPromise BodyStreamBuffer::cancel(ScriptState* scriptState, ScriptValue rea
return ScriptPromise::castUndefined(scriptState);
}
-void BodyStreamBuffer::didGetReadable()
+void BodyStreamBuffer::onStateChange()
{
- if (!m_reader || !getExecutionContext() || getExecutionContext()->activeDOMObjectsAreStopped())
+ if (!m_consumer || !getExecutionContext() || getExecutionContext()->activeDOMObjectsAreStopped())
return;
- if (!m_streamNeedsMore) {
- // Perform zero-length read to call close()/error() early.
- size_t readSize;
- WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, WebDataConsumerHandle::FlagNone, &readSize);
- switch (result) {
- case WebDataConsumerHandle::Ok:
- case WebDataConsumerHandle::ShouldWait:
- return;
- case WebDataConsumerHandle::Done:
- close();
- return;
- case WebDataConsumerHandle::Busy:
- case WebDataConsumerHandle::ResourceExhausted:
- case WebDataConsumerHandle::UnexpectedError:
- error();
- return;
- }
+ switch (m_consumer->getPublicState()) {
+ case BytesConsumer::PublicState::ReadableOrWaiting:
+ break;
+ case BytesConsumer::PublicState::Closed:
+ close();
+ return;
+ case BytesConsumer::PublicState::Errored:
+ error();
return;
}
processData();
@@ -283,8 +274,9 @@ bool BodyStreamBuffer::hasPendingActivity() const
void BodyStreamBuffer::stop()
{
- m_reader = nullptr;
- m_handle = nullptr;
+ if (m_consumer)
+ m_consumer->cancel();
+ m_consumer = nullptr;
UnderlyingSourceBase::stop();
}
@@ -335,28 +327,32 @@ void BodyStreamBuffer::closeAndLockAndDisturb()
void BodyStreamBuffer::close()
{
controller()->close();
- m_reader = nullptr;
- m_handle = nullptr;
+ if (m_consumer) {
+ m_consumer->cancel();
+ m_consumer = nullptr;
+ }
}
void BodyStreamBuffer::error()
{
controller()->error(DOMException::create(NetworkError, "network error"));
- m_reader = nullptr;
- m_handle = nullptr;
+ if (m_consumer) {
+ m_consumer->cancel();
+ m_consumer = nullptr;
+ }
}
void BodyStreamBuffer::processData()
{
- ASSERT(m_reader);
+ DCHECK(m_consumer);
while (m_streamNeedsMore) {
- const void* buffer;
- size_t available;
- WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
- switch (result) {
- case WebDataConsumerHandle::Ok: {
- DOMUint8Array* array = DOMUint8Array::create(static_cast<const unsigned char*>(buffer), available);
- m_reader->endRead(available);
+ const char* buffer = nullptr;
+ size_t available = 0;
+
+ switch (m_consumer->beginRead(&buffer, &available)) {
+ case BytesConsumer::Result::Ok: {
+ DOMUint8Array* array = DOMUint8Array::create(reinterpret_cast<const unsigned char*>(buffer), available);
+ m_consumer->endRead(available);
hiroshige 2016/09/07 09:25:54 We have to check the return value of endRead().
yhirano 2016/09/08 01:41:05 Done.
// Clear m_streamNeedsMore in order to detect a pull call.
m_streamNeedsMore = false;
controller()->enqueue(array);
@@ -367,16 +363,12 @@ void BodyStreamBuffer::processData()
m_streamNeedsMore = controller()->desiredSize() > 0;
break;
}
- case WebDataConsumerHandle::Done:
- close();
+ case BytesConsumer::Result::ShouldWait:
return;
-
- case WebDataConsumerHandle::ShouldWait:
+ case BytesConsumer::Result::Done:
+ close();
return;
-
- case WebDataConsumerHandle::Busy:
- case WebDataConsumerHandle::ResourceExhausted:
- case WebDataConsumerHandle::UnexpectedError:
+ case BytesConsumer::Result::Error:
error();
return;
}
@@ -397,7 +389,7 @@ void BodyStreamBuffer::stopLoading()
m_loader = nullptr;
}
-std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
+BytesConsumer* BodyStreamBuffer::releaseHandle()
{
DCHECK(!isStreamLocked());
DCHECK(!isStreamDisturbed());
@@ -412,25 +404,26 @@ std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
// , we don't need to keep the reader explicitly.
NonThrowableExceptionState exceptionState;
ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.get(), stream(), exceptionState);
- return ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader);
+ return new BytesConsumerForDataConsumerHandle(ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader));
}
// We need to call these before calling closeAndLockAndDisturb.
const bool isClosed = isStreamClosed();
const bool isErrored = isStreamErrored();
- std::unique_ptr<FetchDataConsumerHandle> handle = std::move(m_handle);
+ BytesConsumer* consumer = m_consumer.release();
closeAndLockAndDisturb();
if (isClosed) {
// Note that the stream cannot be "draining", because it doesn't have
// the internal buffer.
- return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle());
+ return new BytesConsumerForDataConsumerHandle(createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()));
}
if (isErrored)
- return createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle());
+ return new BytesConsumerForDataConsumerHandle(createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle()));
- DCHECK(handle);
- return handle;
+ DCHECK(consumer);
+ consumer->clearClient();
+ return consumer;
}
} // namespace blink

Powered by Google App Engine
This is Rietveld 408576698