| Index: Source/modules/fetch/BodyStreamBuffer.cpp
|
| diff --git a/Source/modules/fetch/BodyStreamBuffer.cpp b/Source/modules/fetch/BodyStreamBuffer.cpp
|
| index 3c98e3ee5d588101a847150e7632d971969ca05e..8ad9502e33a28f9df144f9cee852867b8bd68b61 100644
|
| --- a/Source/modules/fetch/BodyStreamBuffer.cpp
|
| +++ b/Source/modules/fetch/BodyStreamBuffer.cpp
|
| @@ -10,308 +10,157 @@
|
|
|
| namespace blink {
|
|
|
| -namespace {
|
| +BodyStreamBuffer* BodyStreamBuffer::createEmpty()
|
| +{
|
| + return BodyStreamBuffer::create(createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle()));
|
| +}
|
| +
|
| +FetchDataConsumerHandle* BodyStreamBuffer::handle() const
|
| +{
|
| + ASSERT(!m_fetchDataLoader);
|
| + ASSERT(!m_drainingStreamNotificationClient);
|
| + return m_handle.get();
|
| +}
|
| +
|
| +PassOwnPtr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
|
| +{
|
| + ASSERT(!m_fetchDataLoader);
|
| + ASSERT(!m_drainingStreamNotificationClient);
|
| + return m_handle.release();
|
| +}
|
|
|
| -class BlobCreator final : public BodyStreamBuffer::Observer {
|
| +class ClientWithFinishNotification final : public GarbageCollectedFinalized<ClientWithFinishNotification>, public FetchDataLoader::Client {
|
| + USING_GARBAGE_COLLECTED_MIXIN(ClientWithFinishNotification);
|
| public:
|
| - BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamBuffer::BlobHandleCreatorClient* client)
|
| + ClientWithFinishNotification(BodyStreamBuffer* buffer, FetchDataLoader::Client* client)
|
| : m_buffer(buffer)
|
| , m_client(client)
|
| - , m_blobData(BlobData::create())
|
| {
|
| - m_blobData->setContentType(contentType);
|
| }
|
| - ~BlobCreator() override { }
|
| +
|
| DEFINE_INLINE_VIRTUAL_TRACE()
|
| {
|
| visitor->trace(m_buffer);
|
| visitor->trace(m_client);
|
| - BodyStreamBuffer::Observer::trace(visitor);
|
| - }
|
| - void onWrite() override
|
| - {
|
| - ASSERT(m_buffer);
|
| - while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) {
|
| - m_blobData->appendBytes(buf->data(), buf->byteLength());
|
| - }
|
| - }
|
| - void onClose() override
|
| - {
|
| - ASSERT(m_buffer);
|
| - const long long size = m_blobData->length();
|
| - m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release(), size));
|
| - cleanup();
|
| - }
|
| - void onError() override
|
| - {
|
| - ASSERT(m_buffer);
|
| - m_client->didFail(m_buffer->exception());
|
| - cleanup();
|
| - }
|
| - void start()
|
| - {
|
| - ASSERT(!m_buffer->isObserverRegistered());
|
| - m_buffer->registerObserver(this);
|
| - onWrite();
|
| - if (m_buffer->hasError()) {
|
| - return onError();
|
| - }
|
| - if (m_buffer->isClosed())
|
| - return onClose();
|
| - }
|
| - void cleanup()
|
| - {
|
| - m_buffer->unregisterObserver();
|
| - m_buffer.clear();
|
| - m_client.clear();
|
| - m_blobData.clear();
|
| - }
|
| -private:
|
| - Member<BodyStreamBuffer> m_buffer;
|
| - Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client;
|
| - OwnPtr<BlobData> m_blobData;
|
| -};
|
| -
|
| -class StreamTeePump : public BodyStreamBuffer::Observer {
|
| -public:
|
| - StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, BodyStreamBuffer* outBuffer2)
|
| - : m_inBuffer(inBuffer)
|
| - , m_outBuffer1(outBuffer1)
|
| - , m_outBuffer2(outBuffer2)
|
| - {
|
| - }
|
| - void onWrite() override
|
| - {
|
| - while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) {
|
| - if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
|
| - m_outBuffer1->write(buf);
|
| - if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
|
| - m_outBuffer2->write(buf);
|
| - }
|
| - }
|
| - void onClose() override
|
| - {
|
| - if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
|
| - m_outBuffer1->close();
|
| - if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
|
| - m_outBuffer2->close();
|
| - cleanup();
|
| - }
|
| - void onError() override
|
| - {
|
| - if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
|
| - m_outBuffer1->error(m_inBuffer->exception());
|
| - if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
|
| - m_outBuffer2->error(m_inBuffer->exception());
|
| - cleanup();
|
| - }
|
| - DEFINE_INLINE_VIRTUAL_TRACE()
|
| - {
|
| - BodyStreamBuffer::Observer::trace(visitor);
|
| - visitor->trace(m_inBuffer);
|
| - visitor->trace(m_outBuffer1);
|
| - visitor->trace(m_outBuffer2);
|
| - }
|
| - void start()
|
| - {
|
| - m_inBuffer->registerObserver(this);
|
| - onWrite();
|
| - if (m_inBuffer->hasError())
|
| - return onError();
|
| - if (m_inBuffer->isClosed())
|
| - return onClose();
|
| + FetchDataLoader::Client::trace(visitor);
|
| }
|
|
|
| private:
|
| - void cleanup()
|
| + void didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle) override
|
| {
|
| - m_inBuffer->unregisterObserver();
|
| - m_inBuffer.clear();
|
| - m_outBuffer1.clear();
|
| - m_outBuffer2.clear();
|
| + if (m_client)
|
| + m_client->didFetchDataLoadedBlobHandle(blobDataHandle);
|
| + m_buffer->didFetchDataLoadFinished();
|
| }
|
| - Member<BodyStreamBuffer> m_inBuffer;
|
| - Member<BodyStreamBuffer> m_outBuffer1;
|
| - Member<BodyStreamBuffer> m_outBuffer2;
|
| -};
|
| -
|
| -// WebDataConsumerHandleAdapter is used to migrate incrementally
|
| -// from BodyStreamBuffer to FetchDataConsumerHandle and will be removed
|
| -// after the migration.
|
| -class WebDataConsumerHandleAdapter
|
| - : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter>
|
| - , public WebDataConsumerHandle::Client {
|
| -public:
|
| - WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage)
|
| - : m_reader(handle->obtainReader(this))
|
| - , m_failureMessage(failureMessage)
|
| - , m_outputBuffer(new BodyStreamBuffer(new Canceller(this)))
|
| + void didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) override
|
| {
|
| - ASSERT(m_reader);
|
| + if (m_client)
|
| + m_client->didFetchDataLoadedArrayBuffer(arrayBuffer);
|
| + m_buffer->didFetchDataLoadFinished();
|
| }
|
| -
|
| - BodyStreamBuffer* outputBuffer() { return m_outputBuffer; }
|
| -
|
| - DEFINE_INLINE_TRACE()
|
| + void didFetchDataLoadedString(const String& str) override
|
| {
|
| - visitor->trace(m_outputBuffer);
|
| + if (m_client)
|
| + m_client->didFetchDataLoadedString(str);
|
| + m_buffer->didFetchDataLoadFinished();
|
| }
|
| -
|
| -private:
|
| - class Canceller : public BodyStreamBuffer::Canceller {
|
| - public:
|
| - explicit Canceller(WebDataConsumerHandleAdapter* source) : m_source(source) { }
|
| -
|
| - void cancel() override
|
| - {
|
| - m_source->close();
|
| - }
|
| -
|
| - DEFINE_INLINE_VIRTUAL_TRACE()
|
| - {
|
| - BodyStreamBuffer::Canceller::trace(visitor);
|
| - visitor->trace(m_source);
|
| - }
|
| -
|
| - private:
|
| - Member<WebDataConsumerHandleAdapter> m_source;
|
| - };
|
| -
|
| - void didGetReadable() override
|
| - {
|
| - while (true) {
|
| - const void* buffer;
|
| - size_t available;
|
| - WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
|
| - switch (result) {
|
| - case WebDataConsumerHandle::Ok:
|
| - m_outputBuffer->write(DOMArrayBuffer::create(buffer, available));
|
| - m_reader->endRead(available);
|
| - break;
|
| -
|
| - case WebDataConsumerHandle::Done:
|
| - close();
|
| - return;
|
| -
|
| - case WebDataConsumerHandle::ShouldWait:
|
| - return;
|
| -
|
| - case WebDataConsumerHandle::Busy:
|
| - case WebDataConsumerHandle::ResourceExhausted:
|
| - case WebDataConsumerHandle::UnexpectedError:
|
| - error();
|
| - return;
|
| - }
|
| - }
|
| - }
|
| -
|
| - void error()
|
| + void didFetchDataLoadedStream() override
|
| {
|
| - m_reader.clear();
|
| - m_outputBuffer->error(DOMException::create(NetworkError, m_failureMessage));
|
| - m_outputBuffer.clear();
|
| + if (m_client)
|
| + m_client->didFetchDataLoadedStream();
|
| + m_buffer->didFetchDataLoadFinished();
|
| }
|
| -
|
| - void close()
|
| + void didFetchDataLoadFailed() override
|
| {
|
| - m_reader.clear();
|
| - m_outputBuffer->close();
|
| - m_outputBuffer.clear();
|
| + if (m_client)
|
| + m_client->didFetchDataLoadFailed();
|
| + m_buffer->didFetchDataLoadFinished();
|
| }
|
|
|
| - OwnPtr<WebDataConsumerHandle::Reader> m_reader;
|
| - String m_failureMessage;
|
| -
|
| - Member<BodyStreamBuffer> m_outputBuffer;
|
| + Member<BodyStreamBuffer> m_buffer;
|
| + Member<FetchDataLoader::Client> m_client;
|
| };
|
|
|
| -
|
| -} // namespace
|
| -
|
| -PassRefPtr<DOMArrayBuffer> BodyStreamBuffer::read()
|
| +void BodyStreamBuffer::setDrainingStreamNotificationClient(DrainingStreamNotificationClient* client)
|
| {
|
| - if (m_queue.isEmpty())
|
| - return PassRefPtr<DOMArrayBuffer>();
|
| - return m_queue.takeFirst();
|
| + ASSERT(!m_fetchDataLoader);
|
| + ASSERT(!m_drainingStreamNotificationClient);
|
| + m_drainingStreamNotificationClient = client;
|
| }
|
|
|
| -void BodyStreamBuffer::write(PassRefPtr<DOMArrayBuffer> chunk)
|
| +void BodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataLoader::Client* client)
|
| {
|
| - ASSERT(!m_isClosed);
|
| - ASSERT(!m_exception);
|
| - ASSERT(chunk);
|
| - m_queue.append(chunk);
|
| - if (m_observer)
|
| - m_observer->onWrite();
|
| + ASSERT(!m_fetchDataLoader);
|
| + m_fetchDataLoader = fetchDataLoader;
|
| + m_fetchDataLoader->start(m_handle.get(), new ClientWithFinishNotification(this, client));
|
| }
|
|
|
| -void BodyStreamBuffer::close()
|
| +void BodyStreamBuffer::doDrainingStreamNotification()
|
| {
|
| - ASSERT(!m_isClosed);
|
| - ASSERT(!m_exception);
|
| - m_isClosed = true;
|
| - if (m_observer)
|
| - m_observer->onClose();
|
| + ASSERT(!m_fetchDataLoader);
|
| + DrainingStreamNotificationClient* client = m_drainingStreamNotificationClient;
|
| + m_drainingStreamNotificationClient.clear();
|
| + if (client)
|
| + client->didFetchDataLoadFinishedFromDrainingStream();
|
| }
|
|
|
| -void BodyStreamBuffer::error(DOMException* exception)
|
| +void BodyStreamBuffer::clearDrainingStreamNotification()
|
| {
|
| - ASSERT(exception);
|
| - ASSERT(!m_isClosed);
|
| - ASSERT(!m_exception);
|
| - m_exception = exception;
|
| - if (m_observer)
|
| - m_observer->onError();
|
| + ASSERT(!m_fetchDataLoader);
|
| + m_drainingStreamNotificationClient.clear();
|
| }
|
|
|
| -bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, BlobHandleCreatorClient* client)
|
| +void BodyStreamBuffer::didFetchDataLoadFinished()
|
| {
|
| - if (m_observer)
|
| - return false;
|
| - BlobCreator* blobCreator = new BlobCreator(this, contentType, client);
|
| - blobCreator->start();
|
| - return true;
|
| + ASSERT(m_fetchDataLoader);
|
| + m_fetchDataLoader.clear();
|
| + doDrainingStreamNotification();
|
| }
|
|
|
| -bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2)
|
| +DrainingBodyStreamBuffer::~DrainingBodyStreamBuffer()
|
| {
|
| - if (m_observer)
|
| - return false;
|
| - StreamTeePump* teePump = new StreamTeePump(this, out1, out2);
|
| - teePump->start();
|
| - return true;
|
| + if (m_buffer)
|
| + m_buffer->doDrainingStreamNotification();
|
| }
|
|
|
| -bool BodyStreamBuffer::registerObserver(Observer* observer)
|
| +void DrainingBodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataLoader::Client* client)
|
| {
|
| - if (m_observer)
|
| - return false;
|
| - ASSERT(observer);
|
| - m_observer = observer;
|
| - return true;
|
| -}
|
| + if (!m_buffer)
|
| + return;
|
|
|
| -void BodyStreamBuffer::unregisterObserver()
|
| -{
|
| - m_observer.clear();
|
| + m_buffer->startLoading(fetchDataLoader, client);
|
| + m_buffer.clear();
|
| }
|
|
|
| -DEFINE_TRACE(BodyStreamBuffer)
|
| +BodyStreamBuffer* DrainingBodyStreamBuffer::leakBuffer()
|
| {
|
| - visitor->trace(m_exception);
|
| - visitor->trace(m_observer);
|
| - visitor->trace(m_canceller);
|
| + if (!m_buffer)
|
| + return nullptr;
|
| +
|
| + m_buffer->clearDrainingStreamNotification();
|
| + BodyStreamBuffer* buffer = m_buffer;
|
| + m_buffer.clear();
|
| + return buffer;
|
| }
|
|
|
| -BodyStreamBuffer::BodyStreamBuffer(Canceller* canceller)
|
| - : m_isClosed(false)
|
| - , m_canceller(canceller)
|
| +PassRefPtr<BlobDataHandle> DrainingBodyStreamBuffer::drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::BlobSizePolicy blobSizePolicy)
|
| {
|
| + if (!m_buffer)
|
| + return nullptr;
|
| +
|
| + RefPtr<BlobDataHandle> blobDataHandle = m_buffer->m_handle->obtainReader(nullptr)->drainAsBlobDataHandle(blobSizePolicy);
|
| + if (!blobDataHandle)
|
| + return nullptr;
|
| + m_buffer->doDrainingStreamNotification();
|
| + m_buffer.clear();
|
| + return blobDataHandle.release();
|
| }
|
|
|
| -BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage)
|
| +DrainingBodyStreamBuffer::DrainingBodyStreamBuffer(BodyStreamBuffer* buffer, BodyStreamBuffer::DrainingStreamNotificationClient* client)
|
| + : m_buffer(buffer)
|
| {
|
| - return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuffer();
|
| + if (client)
|
| + m_buffer->setDrainingStreamNotificationClient(client);
|
| }
|
|
|
| } // namespace blink
|
|
|