Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(186)

Unified Diff: Source/modules/fetch/BodyStreamBuffer.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Rebase. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « Source/modules/fetch/BodyStreamBuffer.h ('k') | Source/modules/fetch/BodyStreamBufferTest.cpp » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: Source/modules/fetch/BodyStreamBuffer.cpp
diff --git a/Source/modules/fetch/BodyStreamBuffer.cpp b/Source/modules/fetch/BodyStreamBuffer.cpp
index 3c98e3ee5d588101a847150e7632d971969ca05e..8ad9502e33a28f9df144f9cee852867b8bd68b61 100644
--- a/Source/modules/fetch/BodyStreamBuffer.cpp
+++ b/Source/modules/fetch/BodyStreamBuffer.cpp
@@ -10,308 +10,157 @@
namespace blink {
-namespace {
+BodyStreamBuffer* BodyStreamBuffer::createEmpty()
+{
+ return BodyStreamBuffer::create(createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()));
+}
+
+FetchDataConsumerHandle* BodyStreamBuffer::handle() const
+{
+ ASSERT(!m_fetchDataLoader);
+ ASSERT(!m_drainingStreamNotificationClient);
+ return m_handle.get();
+}
+
+PassOwnPtr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
+{
+ ASSERT(!m_fetchDataLoader);
+ ASSERT(!m_drainingStreamNotificationClient);
+ return m_handle.release();
+}
-class BlobCreator final : public BodyStreamBuffer::Observer {
+class ClientWithFinishNotification final : public GarbageCollectedFinalized<ClientWithFinishNotification>, public FetchDataLoader::Client {
+ USING_GARBAGE_COLLECTED_MIXIN(ClientWithFinishNotification);
public:
- BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamBuffer::BlobHandleCreatorClient* client)
+ ClientWithFinishNotification(BodyStreamBuffer* buffer, FetchDataLoader::Client* client)
: m_buffer(buffer)
, m_client(client)
- , m_blobData(BlobData::create())
{
- m_blobData->setContentType(contentType);
}
- ~BlobCreator() override { }
+
DEFINE_INLINE_VIRTUAL_TRACE()
{
visitor->trace(m_buffer);
visitor->trace(m_client);
- BodyStreamBuffer::Observer::trace(visitor);
- }
- void onWrite() override
- {
- ASSERT(m_buffer);
- while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) {
- m_blobData->appendBytes(buf->data(), buf->byteLength());
- }
- }
- void onClose() override
- {
- ASSERT(m_buffer);
- const long long size = m_blobData->length();
- m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release(), size));
- cleanup();
- }
- void onError() override
- {
- ASSERT(m_buffer);
- m_client->didFail(m_buffer->exception());
- cleanup();
- }
- void start()
- {
- ASSERT(!m_buffer->isObserverRegistered());
- m_buffer->registerObserver(this);
- onWrite();
- if (m_buffer->hasError()) {
- return onError();
- }
- if (m_buffer->isClosed())
- return onClose();
- }
- void cleanup()
- {
- m_buffer->unregisterObserver();
- m_buffer.clear();
- m_client.clear();
- m_blobData.clear();
- }
-private:
- Member<BodyStreamBuffer> m_buffer;
- Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client;
- OwnPtr<BlobData> m_blobData;
-};
-
-class StreamTeePump : public BodyStreamBuffer::Observer {
-public:
- StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, BodyStreamBuffer* outBuffer2)
- : m_inBuffer(inBuffer)
- , m_outBuffer1(outBuffer1)
- , m_outBuffer2(outBuffer2)
- {
- }
- void onWrite() override
- {
- while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) {
- if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
- m_outBuffer1->write(buf);
- if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
- m_outBuffer2->write(buf);
- }
- }
- void onClose() override
- {
- if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
- m_outBuffer1->close();
- if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
- m_outBuffer2->close();
- cleanup();
- }
- void onError() override
- {
- if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
- m_outBuffer1->error(m_inBuffer->exception());
- if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
- m_outBuffer2->error(m_inBuffer->exception());
- cleanup();
- }
- DEFINE_INLINE_VIRTUAL_TRACE()
- {
- BodyStreamBuffer::Observer::trace(visitor);
- visitor->trace(m_inBuffer);
- visitor->trace(m_outBuffer1);
- visitor->trace(m_outBuffer2);
- }
- void start()
- {
- m_inBuffer->registerObserver(this);
- onWrite();
- if (m_inBuffer->hasError())
- return onError();
- if (m_inBuffer->isClosed())
- return onClose();
+ FetchDataLoader::Client::trace(visitor);
}
private:
- void cleanup()
+ void didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle) override
{
- m_inBuffer->unregisterObserver();
- m_inBuffer.clear();
- m_outBuffer1.clear();
- m_outBuffer2.clear();
+ if (m_client)
+ m_client->didFetchDataLoadedBlobHandle(blobDataHandle);
+ m_buffer->didFetchDataLoadFinished();
}
- Member<BodyStreamBuffer> m_inBuffer;
- Member<BodyStreamBuffer> m_outBuffer1;
- Member<BodyStreamBuffer> m_outBuffer2;
-};
-
-// WebDataConsumerHandleAdapter is used to migrate incrementally
-// from BodyStreamBuffer to FetchDataConsumerHandle and will be removed
-// after the migration.
-class WebDataConsumerHandleAdapter
- : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter>
- , public WebDataConsumerHandle::Client {
-public:
- WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage)
- : m_reader(handle->obtainReader(this))
- , m_failureMessage(failureMessage)
- , m_outputBuffer(new BodyStreamBuffer(new Canceller(this)))
+ void didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) override
{
- ASSERT(m_reader);
+ if (m_client)
+ m_client->didFetchDataLoadedArrayBuffer(arrayBuffer);
+ m_buffer->didFetchDataLoadFinished();
}
-
- BodyStreamBuffer* outputBuffer() { return m_outputBuffer; }
-
- DEFINE_INLINE_TRACE()
+ void didFetchDataLoadedString(const String& str) override
{
- visitor->trace(m_outputBuffer);
+ if (m_client)
+ m_client->didFetchDataLoadedString(str);
+ m_buffer->didFetchDataLoadFinished();
}
-
-private:
- class Canceller : public BodyStreamBuffer::Canceller {
- public:
- explicit Canceller(WebDataConsumerHandleAdapter* source) : m_source(source) { }
-
- void cancel() override
- {
- m_source->close();
- }
-
- DEFINE_INLINE_VIRTUAL_TRACE()
- {
- BodyStreamBuffer::Canceller::trace(visitor);
- visitor->trace(m_source);
- }
-
- private:
- Member<WebDataConsumerHandleAdapter> m_source;
- };
-
- void didGetReadable() override
- {
- while (true) {
- const void* buffer;
- size_t available;
- WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
- switch (result) {
- case WebDataConsumerHandle::Ok:
- m_outputBuffer->write(DOMArrayBuffer::create(buffer, available));
- m_reader->endRead(available);
- break;
-
- case WebDataConsumerHandle::Done:
- close();
- return;
-
- case WebDataConsumerHandle::ShouldWait:
- return;
-
- case WebDataConsumerHandle::Busy:
- case WebDataConsumerHandle::ResourceExhausted:
- case WebDataConsumerHandle::UnexpectedError:
- error();
- return;
- }
- }
- }
-
- void error()
+ void didFetchDataLoadedStream() override
{
- m_reader.clear();
- m_outputBuffer->error(DOMException::create(NetworkError, m_failureMessage));
- m_outputBuffer.clear();
+ if (m_client)
+ m_client->didFetchDataLoadedStream();
+ m_buffer->didFetchDataLoadFinished();
}
-
- void close()
+ void didFetchDataLoadFailed() override
{
- m_reader.clear();
- m_outputBuffer->close();
- m_outputBuffer.clear();
+ if (m_client)
+ m_client->didFetchDataLoadFailed();
+ m_buffer->didFetchDataLoadFinished();
}
- OwnPtr<WebDataConsumerHandle::Reader> m_reader;
- String m_failureMessage;
-
- Member<BodyStreamBuffer> m_outputBuffer;
+ Member<BodyStreamBuffer> m_buffer;
+ Member<FetchDataLoader::Client> m_client;
};
-
-} // namespace
-
-PassRefPtr<DOMArrayBuffer> BodyStreamBuffer::read()
+void BodyStreamBuffer::setDrainingStreamNotificationClient(DrainingStreamNotificationClient* client)
{
- if (m_queue.isEmpty())
- return PassRefPtr<DOMArrayBuffer>();
- return m_queue.takeFirst();
+ ASSERT(!m_fetchDataLoader);
+ ASSERT(!m_drainingStreamNotificationClient);
+ m_drainingStreamNotificationClient = client;
}
-void BodyStreamBuffer::write(PassRefPtr<DOMArrayBuffer> chunk)
+void BodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataLoader::Client* client)
{
- ASSERT(!m_isClosed);
- ASSERT(!m_exception);
- ASSERT(chunk);
- m_queue.append(chunk);
- if (m_observer)
- m_observer->onWrite();
+ ASSERT(!m_fetchDataLoader);
+ m_fetchDataLoader = fetchDataLoader;
+ m_fetchDataLoader->start(m_handle.get(), new ClientWithFinishNotification(this, client));
}
-void BodyStreamBuffer::close()
+void BodyStreamBuffer::doDrainingStreamNotification()
{
- ASSERT(!m_isClosed);
- ASSERT(!m_exception);
- m_isClosed = true;
- if (m_observer)
- m_observer->onClose();
+ ASSERT(!m_fetchDataLoader);
+ DrainingStreamNotificationClient* client = m_drainingStreamNotificationClient;
+ m_drainingStreamNotificationClient.clear();
+ if (client)
+ client->didFetchDataLoadFinishedFromDrainingStream();
}
-void BodyStreamBuffer::error(DOMException* exception)
+void BodyStreamBuffer::clearDrainingStreamNotification()
{
- ASSERT(exception);
- ASSERT(!m_isClosed);
- ASSERT(!m_exception);
- m_exception = exception;
- if (m_observer)
- m_observer->onError();
+ ASSERT(!m_fetchDataLoader);
+ m_drainingStreamNotificationClient.clear();
}
-bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, BlobHandleCreatorClient* client)
+void BodyStreamBuffer::didFetchDataLoadFinished()
{
- if (m_observer)
- return false;
- BlobCreator* blobCreator = new BlobCreator(this, contentType, client);
- blobCreator->start();
- return true;
+ ASSERT(m_fetchDataLoader);
+ m_fetchDataLoader.clear();
+ doDrainingStreamNotification();
}
-bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2)
+DrainingBodyStreamBuffer::~DrainingBodyStreamBuffer()
{
- if (m_observer)
- return false;
- StreamTeePump* teePump = new StreamTeePump(this, out1, out2);
- teePump->start();
- return true;
+ if (m_buffer)
+ m_buffer->doDrainingStreamNotification();
}
-bool BodyStreamBuffer::registerObserver(Observer* observer)
+void DrainingBodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataLoader::Client* client)
{
- if (m_observer)
- return false;
- ASSERT(observer);
- m_observer = observer;
- return true;
-}
+ if (!m_buffer)
+ return;
-void BodyStreamBuffer::unregisterObserver()
-{
- m_observer.clear();
+ m_buffer->startLoading(fetchDataLoader, client);
+ m_buffer.clear();
}
-DEFINE_TRACE(BodyStreamBuffer)
+BodyStreamBuffer* DrainingBodyStreamBuffer::leakBuffer()
{
- visitor->trace(m_exception);
- visitor->trace(m_observer);
- visitor->trace(m_canceller);
+ if (!m_buffer)
+ return nullptr;
+
+ m_buffer->clearDrainingStreamNotification();
+ BodyStreamBuffer* buffer = m_buffer;
+ m_buffer.clear();
+ return buffer;
}
-BodyStreamBuffer::BodyStreamBuffer(Canceller* canceller)
- : m_isClosed(false)
- , m_canceller(canceller)
+PassRefPtr<BlobDataHandle> DrainingBodyStreamBuffer::drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::BlobSizePolicy blobSizePolicy)
{
+ if (!m_buffer)
+ return nullptr;
+
+ RefPtr<BlobDataHandle> blobDataHandle = m_buffer->m_handle->obtainReader(nullptr)->drainAsBlobDataHandle(blobSizePolicy);
+ if (!blobDataHandle)
+ return nullptr;
+ m_buffer->doDrainingStreamNotification();
+ m_buffer.clear();
+ return blobDataHandle.release();
}
-BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage)
+DrainingBodyStreamBuffer::DrainingBodyStreamBuffer(BodyStreamBuffer* buffer, BodyStreamBuffer::DrainingStreamNotificationClient* client)
+ : m_buffer(buffer)
{
- return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuffer();
+ if (client)
+ m_buffer->setDrainingStreamNotificationClient(client);
}
} // namespace blink
« no previous file with comments | « Source/modules/fetch/BodyStreamBuffer.h ('k') | Source/modules/fetch/BodyStreamBufferTest.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698