Index: Source/modules/fetch/BodyStreamBuffer.cpp |
diff --git a/Source/modules/fetch/BodyStreamBuffer.cpp b/Source/modules/fetch/BodyStreamBuffer.cpp |
index 3c98e3ee5d588101a847150e7632d971969ca05e..8ad9502e33a28f9df144f9cee852867b8bd68b61 100644 |
--- a/Source/modules/fetch/BodyStreamBuffer.cpp |
+++ b/Source/modules/fetch/BodyStreamBuffer.cpp |
@@ -10,308 +10,157 @@ |
namespace blink { |
-namespace { |
+BodyStreamBuffer* BodyStreamBuffer::createEmpty() |
+{ |
+ return BodyStreamBuffer::create(createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle())); |
+} |
+ |
+FetchDataConsumerHandle* BodyStreamBuffer::handle() const |
+{ |
+ ASSERT(!m_fetchDataLoader); |
+ ASSERT(!m_drainingStreamNotificationClient); |
+ return m_handle.get(); |
+} |
+ |
+PassOwnPtr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle() |
+{ |
+ ASSERT(!m_fetchDataLoader); |
+ ASSERT(!m_drainingStreamNotificationClient); |
+ return m_handle.release(); |
+} |
-class BlobCreator final : public BodyStreamBuffer::Observer { |
+class ClientWithFinishNotification final : public GarbageCollectedFinalized<ClientWithFinishNotification>, public FetchDataLoader::Client { |
+ USING_GARBAGE_COLLECTED_MIXIN(ClientWithFinishNotification); |
public: |
- BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamBuffer::BlobHandleCreatorClient* client) |
+ ClientWithFinishNotification(BodyStreamBuffer* buffer, FetchDataLoader::Client* client) |
: m_buffer(buffer) |
, m_client(client) |
- , m_blobData(BlobData::create()) |
{ |
- m_blobData->setContentType(contentType); |
} |
- ~BlobCreator() override { } |
+ |
DEFINE_INLINE_VIRTUAL_TRACE() |
{ |
visitor->trace(m_buffer); |
visitor->trace(m_client); |
- BodyStreamBuffer::Observer::trace(visitor); |
- } |
- void onWrite() override |
- { |
- ASSERT(m_buffer); |
- while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) { |
- m_blobData->appendBytes(buf->data(), buf->byteLength()); |
- } |
- } |
- void onClose() override |
- { |
- ASSERT(m_buffer); |
- const long long size = m_blobData->length(); |
- m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release(), size)); |
- cleanup(); |
- } |
- void onError() override |
- { |
- ASSERT(m_buffer); |
- m_client->didFail(m_buffer->exception()); |
- cleanup(); |
- } |
- void start() |
- { |
- ASSERT(!m_buffer->isObserverRegistered()); |
- m_buffer->registerObserver(this); |
- onWrite(); |
- if (m_buffer->hasError()) { |
- return onError(); |
- } |
- if (m_buffer->isClosed()) |
- return onClose(); |
- } |
- void cleanup() |
- { |
- m_buffer->unregisterObserver(); |
- m_buffer.clear(); |
- m_client.clear(); |
- m_blobData.clear(); |
- } |
-private: |
- Member<BodyStreamBuffer> m_buffer; |
- Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client; |
- OwnPtr<BlobData> m_blobData; |
-}; |
- |
-class StreamTeePump : public BodyStreamBuffer::Observer { |
-public: |
- StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, BodyStreamBuffer* outBuffer2) |
- : m_inBuffer(inBuffer) |
- , m_outBuffer1(outBuffer1) |
- , m_outBuffer2(outBuffer2) |
- { |
- } |
- void onWrite() override |
- { |
- while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) { |
- if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) |
- m_outBuffer1->write(buf); |
- if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) |
- m_outBuffer2->write(buf); |
- } |
- } |
- void onClose() override |
- { |
- if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) |
- m_outBuffer1->close(); |
- if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) |
- m_outBuffer2->close(); |
- cleanup(); |
- } |
- void onError() override |
- { |
- if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) |
- m_outBuffer1->error(m_inBuffer->exception()); |
- if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) |
- m_outBuffer2->error(m_inBuffer->exception()); |
- cleanup(); |
- } |
- DEFINE_INLINE_VIRTUAL_TRACE() |
- { |
- BodyStreamBuffer::Observer::trace(visitor); |
- visitor->trace(m_inBuffer); |
- visitor->trace(m_outBuffer1); |
- visitor->trace(m_outBuffer2); |
- } |
- void start() |
- { |
- m_inBuffer->registerObserver(this); |
- onWrite(); |
- if (m_inBuffer->hasError()) |
- return onError(); |
- if (m_inBuffer->isClosed()) |
- return onClose(); |
+ FetchDataLoader::Client::trace(visitor); |
} |
private: |
- void cleanup() |
+ void didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle) override |
{ |
- m_inBuffer->unregisterObserver(); |
- m_inBuffer.clear(); |
- m_outBuffer1.clear(); |
- m_outBuffer2.clear(); |
+ if (m_client) |
+ m_client->didFetchDataLoadedBlobHandle(blobDataHandle); |
+ m_buffer->didFetchDataLoadFinished(); |
} |
- Member<BodyStreamBuffer> m_inBuffer; |
- Member<BodyStreamBuffer> m_outBuffer1; |
- Member<BodyStreamBuffer> m_outBuffer2; |
-}; |
- |
-// WebDataConsumerHandleAdapter is used to migrate incrementally |
-// from BodyStreamBuffer to FetchDataConsumerHandle and will be removed |
-// after the migration. |
-class WebDataConsumerHandleAdapter |
- : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter> |
- , public WebDataConsumerHandle::Client { |
-public: |
- WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage) |
- : m_reader(handle->obtainReader(this)) |
- , m_failureMessage(failureMessage) |
- , m_outputBuffer(new BodyStreamBuffer(new Canceller(this))) |
+ void didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) override |
{ |
- ASSERT(m_reader); |
+ if (m_client) |
+ m_client->didFetchDataLoadedArrayBuffer(arrayBuffer); |
+ m_buffer->didFetchDataLoadFinished(); |
} |
- |
- BodyStreamBuffer* outputBuffer() { return m_outputBuffer; } |
- |
- DEFINE_INLINE_TRACE() |
+ void didFetchDataLoadedString(const String& str) override |
{ |
- visitor->trace(m_outputBuffer); |
+ if (m_client) |
+ m_client->didFetchDataLoadedString(str); |
+ m_buffer->didFetchDataLoadFinished(); |
} |
- |
-private: |
- class Canceller : public BodyStreamBuffer::Canceller { |
- public: |
- explicit Canceller(WebDataConsumerHandleAdapter* source) : m_source(source) { } |
- |
- void cancel() override |
- { |
- m_source->close(); |
- } |
- |
- DEFINE_INLINE_VIRTUAL_TRACE() |
- { |
- BodyStreamBuffer::Canceller::trace(visitor); |
- visitor->trace(m_source); |
- } |
- |
- private: |
- Member<WebDataConsumerHandleAdapter> m_source; |
- }; |
- |
- void didGetReadable() override |
- { |
- while (true) { |
- const void* buffer; |
- size_t available; |
- WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available); |
- switch (result) { |
- case WebDataConsumerHandle::Ok: |
- m_outputBuffer->write(DOMArrayBuffer::create(buffer, available)); |
- m_reader->endRead(available); |
- break; |
- |
- case WebDataConsumerHandle::Done: |
- close(); |
- return; |
- |
- case WebDataConsumerHandle::ShouldWait: |
- return; |
- |
- case WebDataConsumerHandle::Busy: |
- case WebDataConsumerHandle::ResourceExhausted: |
- case WebDataConsumerHandle::UnexpectedError: |
- error(); |
- return; |
- } |
- } |
- } |
- |
- void error() |
+ void didFetchDataLoadedStream() override |
{ |
- m_reader.clear(); |
- m_outputBuffer->error(DOMException::create(NetworkError, m_failureMessage)); |
- m_outputBuffer.clear(); |
+ if (m_client) |
+ m_client->didFetchDataLoadedStream(); |
+ m_buffer->didFetchDataLoadFinished(); |
} |
- |
- void close() |
+ void didFetchDataLoadFailed() override |
{ |
- m_reader.clear(); |
- m_outputBuffer->close(); |
- m_outputBuffer.clear(); |
+ if (m_client) |
+ m_client->didFetchDataLoadFailed(); |
+ m_buffer->didFetchDataLoadFinished(); |
} |
- OwnPtr<WebDataConsumerHandle::Reader> m_reader; |
- String m_failureMessage; |
- |
- Member<BodyStreamBuffer> m_outputBuffer; |
+ Member<BodyStreamBuffer> m_buffer; |
+ Member<FetchDataLoader::Client> m_client; |
}; |
- |
-} // namespace |
- |
-PassRefPtr<DOMArrayBuffer> BodyStreamBuffer::read() |
+void BodyStreamBuffer::setDrainingStreamNotificationClient(DrainingStreamNotificationClient* client) |
{ |
- if (m_queue.isEmpty()) |
- return PassRefPtr<DOMArrayBuffer>(); |
- return m_queue.takeFirst(); |
+ ASSERT(!m_fetchDataLoader); |
+ ASSERT(!m_drainingStreamNotificationClient); |
+ m_drainingStreamNotificationClient = client; |
} |
-void BodyStreamBuffer::write(PassRefPtr<DOMArrayBuffer> chunk) |
+void BodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataLoader::Client* client) |
{ |
- ASSERT(!m_isClosed); |
- ASSERT(!m_exception); |
- ASSERT(chunk); |
- m_queue.append(chunk); |
- if (m_observer) |
- m_observer->onWrite(); |
+ ASSERT(!m_fetchDataLoader); |
+ m_fetchDataLoader = fetchDataLoader; |
+ m_fetchDataLoader->start(m_handle.get(), new ClientWithFinishNotification(this, client)); |
} |
-void BodyStreamBuffer::close() |
+void BodyStreamBuffer::doDrainingStreamNotification() |
{ |
- ASSERT(!m_isClosed); |
- ASSERT(!m_exception); |
- m_isClosed = true; |
- if (m_observer) |
- m_observer->onClose(); |
+ ASSERT(!m_fetchDataLoader); |
+ DrainingStreamNotificationClient* client = m_drainingStreamNotificationClient; |
+ m_drainingStreamNotificationClient.clear(); |
+ if (client) |
+ client->didFetchDataLoadFinishedFromDrainingStream(); |
} |
-void BodyStreamBuffer::error(DOMException* exception) |
+void BodyStreamBuffer::clearDrainingStreamNotification() |
{ |
- ASSERT(exception); |
- ASSERT(!m_isClosed); |
- ASSERT(!m_exception); |
- m_exception = exception; |
- if (m_observer) |
- m_observer->onError(); |
+ ASSERT(!m_fetchDataLoader); |
+ m_drainingStreamNotificationClient.clear(); |
} |
-bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, BlobHandleCreatorClient* client) |
+void BodyStreamBuffer::didFetchDataLoadFinished() |
{ |
- if (m_observer) |
- return false; |
- BlobCreator* blobCreator = new BlobCreator(this, contentType, client); |
- blobCreator->start(); |
- return true; |
+ ASSERT(m_fetchDataLoader); |
+ m_fetchDataLoader.clear(); |
+ doDrainingStreamNotification(); |
} |
-bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2) |
+DrainingBodyStreamBuffer::~DrainingBodyStreamBuffer() |
{ |
- if (m_observer) |
- return false; |
- StreamTeePump* teePump = new StreamTeePump(this, out1, out2); |
- teePump->start(); |
- return true; |
+ if (m_buffer) |
+ m_buffer->doDrainingStreamNotification(); |
} |
-bool BodyStreamBuffer::registerObserver(Observer* observer) |
+void DrainingBodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataLoader::Client* client) |
{ |
- if (m_observer) |
- return false; |
- ASSERT(observer); |
- m_observer = observer; |
- return true; |
-} |
+ if (!m_buffer) |
+ return; |
-void BodyStreamBuffer::unregisterObserver() |
-{ |
- m_observer.clear(); |
+ m_buffer->startLoading(fetchDataLoader, client); |
+ m_buffer.clear(); |
} |
-DEFINE_TRACE(BodyStreamBuffer) |
+BodyStreamBuffer* DrainingBodyStreamBuffer::leakBuffer() |
{ |
- visitor->trace(m_exception); |
- visitor->trace(m_observer); |
- visitor->trace(m_canceller); |
+ if (!m_buffer) |
+ return nullptr; |
+ |
+ m_buffer->clearDrainingStreamNotification(); |
+ BodyStreamBuffer* buffer = m_buffer; |
+ m_buffer.clear(); |
+ return buffer; |
} |
-BodyStreamBuffer::BodyStreamBuffer(Canceller* canceller) |
- : m_isClosed(false) |
- , m_canceller(canceller) |
+PassRefPtr<BlobDataHandle> DrainingBodyStreamBuffer::drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::BlobSizePolicy blobSizePolicy) |
{ |
+ if (!m_buffer) |
+ return nullptr; |
+ |
+ RefPtr<BlobDataHandle> blobDataHandle = m_buffer->m_handle->obtainReader(nullptr)->drainAsBlobDataHandle(blobSizePolicy); |
+ if (!blobDataHandle) |
+ return nullptr; |
+ m_buffer->doDrainingStreamNotification(); |
+ m_buffer.clear(); |
+ return blobDataHandle.release(); |
} |
-BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage) |
+DrainingBodyStreamBuffer::DrainingBodyStreamBuffer(BodyStreamBuffer* buffer, BodyStreamBuffer::DrainingStreamNotificationClient* client) |
+ : m_buffer(buffer) |
{ |
- return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuffer(); |
+ if (client) |
+ m_buffer->setDrainingStreamNotificationClient(client); |
} |
} // namespace blink |