Index: third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp |
diff --git a/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp b/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp |
index e5b6ce557785ad9816bf866ffd45be9bc671477d..82397a0b0691637339a9fc732ae5e23be068ca3a 100644 |
--- a/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp |
+++ b/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp |
@@ -4,13 +4,15 @@ |
#include "modules/fetch/FetchDataLoader.h" |
+#include <memory> |
#include "core/html/parser/TextResourceDecoder.h" |
#include "modules/fetch/BytesConsumer.h" |
+#include "mojo/public/cpp/system/simple_watcher.h" |
+#include "platform/instrumentation/tracing/TraceEvent.h" |
#include "wtf/PtrUtil.h" |
#include "wtf/text/StringBuilder.h" |
#include "wtf/text/WTFString.h" |
#include "wtf/typed_arrays/ArrayBufferBuilder.h" |
-#include <memory> |
namespace blink { |
@@ -305,6 +307,117 @@ class FetchDataLoaderAsStream final : public FetchDataLoader, |
Member<Stream> m_outStream; |
}; |
+class FetchDataLoaderAsDataPipe final : public FetchDataLoader, |
+ public BytesConsumer::Client { |
+ USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsDataPipe); |
+ |
+ public: |
+ explicit FetchDataLoaderAsDataPipe(mojo::ScopedDataPipeProducerHandle handle) |
+ : m_handle(std::move(handle)), |
+ m_handleWatcher(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL) { |
+ TRACE_EVENT0("ServiceWorker", |
+ "FetchDataLoaderAsDataPipe::FetchDataLoaderAsDataPipe"); |
+ } |
+ ~FetchDataLoaderAsDataPipe() override {} |
+ void start(BytesConsumer* consumer, |
+ FetchDataLoader::Client* client) override { |
+ TRACE_EVENT0("ServiceWorker", "FetchDataLoaderAsDataPipe::start"); |
+ DCHECK(!m_client); |
+ DCHECK(!m_consumer); |
+ m_handleWatcher.Watch(m_handle.get(), MOJO_HANDLE_SIGNAL_WRITABLE, |
+ base::Bind(&FetchDataLoaderAsDataPipe::OnWritable, |
+ base::Unretained(this))); |
+ m_handleWatcher.ArmOrNotify(); |
+ m_client = client; |
+ m_consumer = consumer; |
+ m_consumer->setClient(this); |
+ onStateChange(); |
+ } |
+ |
+ void OnWritable(MojoResult result) { |
+ // LOG(ERROR) << "OnWritable"; |
+ onStateChange(); |
+ } |
+ void onStateChange() override { |
+ TRACE_EVENT0("ServiceWorker", "FetchDataLoaderAsDataPipe::onStateChange"); |
+ // LOG(ERROR) << "onStateChange"; |
+ bool should_wait = false; |
+ while (!should_wait) { |
+ const char* buffer; |
+ size_t available; |
+ auto result = m_consumer->beginRead(&buffer, &available); |
+ if (result == BytesConsumer::Result::ShouldWait) |
+ return; |
+ if (result == BytesConsumer::Result::Ok) { |
+ // LOG(ERROR) << "available " << available; |
+ if (available > 0) { |
+ uint32_t num_bytes = available; |
+ MojoResult mojo_result = WriteDataRaw( |
+ m_handle.get(), buffer, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
+ // LOG(ERROR) << "mojo_result " << mojo_result; |
+ // LOG(ERROR) << "num_bytes " << num_bytes; |
+ // LOG(ERROR) << " data: [" << std::string(buffer, num_bytes) << "]"; |
+ if (mojo_result == MOJO_RESULT_OK) { |
+ // LOG(ERROR) << " MOJO_RESULT_OK"; |
+ TRACE_EVENT1("ServiceWorker", |
+ "FetchDataLoaderAsDataPipe::onStateChange endRead", |
+ "bytes", num_bytes); |
+ result = m_consumer->endRead(num_bytes); |
+ } else { |
+ result = m_consumer->endRead(0); |
+ if (mojo_result == MOJO_RESULT_SHOULD_WAIT) { |
+ // LOG(ERROR) << " MOJO_RESULT_SHOULD_WAIT"; |
+ should_wait = true; |
+ m_handleWatcher.ArmOrNotify(); |
+ } else { |
+ // LOG(ERROR) << " mojo_result != MOJO_RESULT_OK"; |
+ m_consumer->cancel(); |
+ m_handle.reset(); |
+ m_handleWatcher.Cancel(); |
+ m_client->didFetchDataLoadFailed(); |
+ return; |
+ } |
+ } |
+ } else { |
+ result = m_consumer->endRead(available); |
+ } |
+ } |
+ switch (result) { |
+ case BytesConsumer::Result::Ok: |
+ break; |
+ case BytesConsumer::Result::ShouldWait: |
+ NOTREACHED(); |
+ return; |
+ case BytesConsumer::Result::Done: |
+ // LOG(ERROR) << "BytesConsumer::Result::Done"; |
+ m_handle.reset(); |
+ m_handleWatcher.Cancel(); |
+ m_client->didFetchDataLoadedStream(); |
+ return; |
+ case BytesConsumer::Result::Error: |
+ LOG(ERROR) << "BytesConsumer::Result::Error"; |
+ m_client->didFetchDataLoadFailed(); |
+ return; |
+ } |
+ } |
+ } |
+ |
+ void cancel() override { m_consumer->cancel(); } |
+ |
+ DEFINE_INLINE_TRACE() { |
+ visitor->trace(m_consumer); |
+ visitor->trace(m_client); |
+ FetchDataLoader::trace(visitor); |
+ BytesConsumer::Client::trace(visitor); |
+ } |
+ |
+ Member<BytesConsumer> m_consumer; |
+ Member<FetchDataLoader::Client> m_client; |
+ |
+ mojo::ScopedDataPipeProducerHandle m_handle; |
+ mojo::SimpleWatcher m_handleWatcher; |
+}; |
+ |
} // namespace |
FetchDataLoader* FetchDataLoader::createLoaderAsBlobHandle( |
@@ -324,4 +437,9 @@ FetchDataLoader* FetchDataLoader::createLoaderAsStream(Stream* outStream) { |
return new FetchDataLoaderAsStream(outStream); |
} |
+FetchDataLoader* FetchDataLoader::createLoaderAsDataPipe( |
+ mojo::ScopedDataPipeProducerHandle handle) { |
+ return new FetchDataLoaderAsDataPipe(std::move(handle)); |
+} |
+ |
} // namespace blink |