Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(913)

Unified Diff: third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp

Issue 2277143002: Use BytesConsumer in BodyStreamBuffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@bytes-consumer-tee
Patch Set: rebase Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
diff --git a/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp b/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
index ae6657134064e61f76e26acb2dc8949658ff20af..a02c20617351eedde1d179d073390be56084294b 100644
--- a/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
+++ b/third_party/WebKit/Source/modules/fetch/BodyStreamBuffer.cpp
@@ -14,7 +14,6 @@
#include "modules/fetch/Body.h"
#include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
#include "modules/fetch/DataConsumerHandleUtil.h"
-#include "modules/fetch/DataConsumerTee.h"
#include "modules/fetch/ReadableStreamDataConsumerHandle.h"
#include "platform/blob/BlobData.h"
#include "platform/network/EncodedFormData.h"
@@ -83,10 +82,14 @@ private:
};
BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<FetchDataConsumerHandle> handle)
+ : BodyStreamBuffer(scriptState, new BytesConsumerForDataConsumerHandle(scriptState->getExecutionContext(), std::move(handle)))
+{
+}
+
+BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, BytesConsumer* consumer)
: UnderlyingSourceBase(scriptState)
, m_scriptState(scriptState)
- , m_handle(std::move(handle))
- , m_reader(m_handle->obtainFetchDataReader(this))
+ , m_consumer(consumer)
, m_madeFromReadableStream(false)
{
v8::Local<v8::Value> bodyValue = toV8(this, scriptState);
@@ -98,6 +101,7 @@ BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, std::unique_ptr<Fet
scriptState, this, ReadableStreamOperations::createCountQueuingStrategy(scriptState, 0));
DCHECK(!readableStream.isEmpty());
V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBodyStream(scriptState->isolate()), readableStream.v8Value());
+ m_consumer->setClient(this);
}
BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, ScriptValue stream)
@@ -124,7 +128,7 @@ ScriptValue BodyStreamBuffer::stream()
return ScriptValue(m_scriptState.get(), V8HiddenValue::getHiddenValue(m_scriptState.get(), body, V8HiddenValue::internalBodyStream(m_scriptState->isolate())));
}
-PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::BlobSizePolicy policy)
+PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(BytesConsumer::BlobSizePolicy policy)
{
ASSERT(!isStreamLocked());
ASSERT(!isStreamDisturbed());
@@ -134,7 +138,7 @@ PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataCons
if (m_madeFromReadableStream)
return nullptr;
- RefPtr<BlobDataHandle> blobDataHandle = m_reader->drainAsBlobDataHandle(policy);
+ RefPtr<BlobDataHandle> blobDataHandle = m_consumer->drainAsBlobDataHandle(policy);
if (blobDataHandle) {
closeAndLockAndDisturb();
return blobDataHandle.release();
@@ -152,7 +156,7 @@ PassRefPtr<EncodedFormData> BodyStreamBuffer::drainAsFormData()
if (m_madeFromReadableStream)
return nullptr;
- RefPtr<EncodedFormData> formData = m_reader->drainAsFormData();
+ RefPtr<EncodedFormData> formData = m_consumer->drainAsFormData();
if (formData) {
closeAndLockAndDisturb();
return formData.release();
@@ -164,9 +168,8 @@ void BodyStreamBuffer::startLoading(FetchDataLoader* loader, FetchDataLoader::Cl
{
ASSERT(!m_loader);
ASSERT(m_scriptState->contextIsValid());
- std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle();
m_loader = loader;
- loader->start(new BytesConsumerForDataConsumerHandle(getExecutionContext(), std::move(handle)), new LoaderClient(m_scriptState->getExecutionContext(), this, client));
+ loader->start(releaseHandle(), new LoaderClient(m_scriptState->getExecutionContext(), this, client));
}
void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch2)
@@ -183,11 +186,11 @@ void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch
*branch2 = new BodyStreamBuffer(m_scriptState.get(), stream2);
return;
}
- std::unique_ptr<FetchDataConsumerHandle> handle = releaseHandle();
- std::unique_ptr<FetchDataConsumerHandle> handle1, handle2;
- DataConsumerTee::create(m_scriptState->getExecutionContext(), std::move(handle), &handle1, &handle2);
- *branch1 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle1));
- *branch2 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle2));
+ BytesConsumer* dest1 = nullptr;
+ BytesConsumer* dest2 = nullptr;
+ BytesConsumer::tee(m_scriptState->getExecutionContext(), releaseHandle(), &dest1, &dest2);
+ *branch1 = new BodyStreamBuffer(m_scriptState.get(), dest1);
+ *branch2 = new BodyStreamBuffer(m_scriptState.get(), dest2);
}
ScriptPromise BodyStreamBuffer::pull(ScriptState* scriptState)
@@ -207,28 +210,19 @@ ScriptPromise BodyStreamBuffer::cancel(ScriptState* scriptState, ScriptValue rea
return ScriptPromise::castUndefined(scriptState);
}
-void BodyStreamBuffer::didGetReadable()
+void BodyStreamBuffer::onStateChange()
{
- if (!m_reader || !getExecutionContext() || getExecutionContext()->activeDOMObjectsAreStopped())
+ if (!m_consumer || !getExecutionContext() || getExecutionContext()->activeDOMObjectsAreStopped())
return;
- if (!m_streamNeedsMore) {
- // Perform zero-length read to call close()/error() early.
- size_t readSize;
- WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, WebDataConsumerHandle::FlagNone, &readSize);
- switch (result) {
- case WebDataConsumerHandle::Ok:
- case WebDataConsumerHandle::ShouldWait:
- return;
- case WebDataConsumerHandle::Done:
- close();
- return;
- case WebDataConsumerHandle::Busy:
- case WebDataConsumerHandle::ResourceExhausted:
- case WebDataConsumerHandle::UnexpectedError:
- error();
- return;
- }
+ switch (m_consumer->getPublicState()) {
+ case BytesConsumer::PublicState::ReadableOrWaiting:
+ break;
+ case BytesConsumer::PublicState::Closed:
+ close();
+ return;
+ case BytesConsumer::PublicState::Errored:
+ error();
return;
}
processData();
@@ -243,8 +237,7 @@ bool BodyStreamBuffer::hasPendingActivity() const
void BodyStreamBuffer::stop()
{
- m_reader = nullptr;
- m_handle = nullptr;
+ cancelConsumer();
UnderlyingSourceBase::stop();
}
@@ -295,28 +288,37 @@ void BodyStreamBuffer::closeAndLockAndDisturb()
void BodyStreamBuffer::close()
{
controller()->close();
- m_reader = nullptr;
- m_handle = nullptr;
+ cancelConsumer();
}
void BodyStreamBuffer::error()
{
controller()->error(DOMException::create(NetworkError, "network error"));
- m_reader = nullptr;
- m_handle = nullptr;
+ cancelConsumer();
+}
+
+void BodyStreamBuffer::cancelConsumer()
+{
+ if (m_consumer) {
+ m_consumer->cancel();
+ m_consumer = nullptr;
+ }
}
void BodyStreamBuffer::processData()
{
- ASSERT(m_reader);
+ DCHECK(m_consumer);
while (m_streamNeedsMore) {
- const void* buffer;
- size_t available;
- WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
- switch (result) {
- case WebDataConsumerHandle::Ok: {
- DOMUint8Array* array = DOMUint8Array::create(static_cast<const unsigned char*>(buffer), available);
- m_reader->endRead(available);
+ const char* buffer = nullptr;
+ size_t available = 0;
+
+ switch (m_consumer->beginRead(&buffer, &available)) {
+ case BytesConsumer::Result::Ok: {
+ DOMUint8Array* array = DOMUint8Array::create(reinterpret_cast<const unsigned char*>(buffer), available);
+ if (m_consumer->endRead(available) != BytesConsumer::Result::Ok) {
+ error();
+ return;
+ }
// Clear m_streamNeedsMore in order to detect a pull call.
m_streamNeedsMore = false;
controller()->enqueue(array);
@@ -327,16 +329,12 @@ void BodyStreamBuffer::processData()
m_streamNeedsMore = controller()->desiredSize() > 0;
break;
}
- case WebDataConsumerHandle::Done:
- close();
+ case BytesConsumer::Result::ShouldWait:
return;
-
- case WebDataConsumerHandle::ShouldWait:
+ case BytesConsumer::Result::Done:
+ close();
return;
-
- case WebDataConsumerHandle::Busy:
- case WebDataConsumerHandle::ResourceExhausted:
- case WebDataConsumerHandle::UnexpectedError:
+ case BytesConsumer::Result::Error:
error();
return;
}
@@ -357,7 +355,7 @@ void BodyStreamBuffer::stopLoading()
m_loader = nullptr;
}
-std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
+BytesConsumer* BodyStreamBuffer::releaseHandle()
{
DCHECK(!isStreamLocked());
DCHECK(!isStreamDisturbed());
@@ -372,25 +370,26 @@ std::unique_ptr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
// , we don't need to keep the reader explicitly.
NonThrowableExceptionState exceptionState;
ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.get(), stream(), exceptionState);
- return ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader);
+ return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutionContext(), ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader));
}
// We need to call these before calling closeAndLockAndDisturb.
const bool isClosed = isStreamClosed();
const bool isErrored = isStreamErrored();
- std::unique_ptr<FetchDataConsumerHandle> handle = std::move(m_handle);
+ BytesConsumer* consumer = m_consumer.release();
closeAndLockAndDisturb();
if (isClosed) {
// Note that the stream cannot be "draining", because it doesn't have
// the internal buffer.
- return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle());
+ return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutionContext(), createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()));
}
if (isErrored)
- return createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle());
+ return new BytesConsumerForDataConsumerHandle(m_scriptState->getExecutionContext(), createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle()));
- DCHECK(handle);
- return handle;
+ DCHECK(consumer);
+ consumer->clearClient();
+ return consumer;
}
} // namespace blink

Powered by Google App Engine
This is Rietveld 408576698