| Index: third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp
|
| diff --git a/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp b/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp
|
| index e5b6ce557785ad9816bf866ffd45be9bc671477d..82397a0b0691637339a9fc732ae5e23be068ca3a 100644
|
| --- a/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp
|
| +++ b/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp
|
| @@ -4,13 +4,15 @@
|
|
|
| #include "modules/fetch/FetchDataLoader.h"
|
|
|
| +#include <memory>
|
| #include "core/html/parser/TextResourceDecoder.h"
|
| #include "modules/fetch/BytesConsumer.h"
|
| +#include "mojo/public/cpp/system/simple_watcher.h"
|
| +#include "platform/instrumentation/tracing/TraceEvent.h"
|
| #include "wtf/PtrUtil.h"
|
| #include "wtf/text/StringBuilder.h"
|
| #include "wtf/text/WTFString.h"
|
| #include "wtf/typed_arrays/ArrayBufferBuilder.h"
|
| -#include <memory>
|
|
|
| namespace blink {
|
|
|
| @@ -305,6 +307,117 @@ class FetchDataLoaderAsStream final : public FetchDataLoader,
|
| Member<Stream> m_outStream;
|
| };
|
|
|
| +class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
|
| + public BytesConsumer::Client {
|
| + USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsDataPipe);
|
| +
|
| + public:
|
| + explicit FetchDataLoaderAsDataPipe(mojo::ScopedDataPipeProducerHandle handle)
|
| + : m_handle(std::move(handle)),
|
| + m_handleWatcher(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL) {
|
| + TRACE_EVENT0("ServiceWorker",
|
| + "FetchDataLoaderAsDataPipe::FetchDataLoaderAsDataPipe");
|
| + }
|
| + ~FetchDataLoaderAsDataPipe() override {}
|
| + void start(BytesConsumer* consumer,
|
| + FetchDataLoader::Client* client) override {
|
| + TRACE_EVENT0("ServiceWorker", "FetchDataLoaderAsDataPipe::start");
|
| + DCHECK(!m_client);
|
| + DCHECK(!m_consumer);
|
| + m_handleWatcher.Watch(m_handle.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
|
| + base::Bind(&FetchDataLoaderAsDataPipe::OnWritable,
|
| + base::Unretained(this)));
|
| + m_handleWatcher.ArmOrNotify();
|
| + m_client = client;
|
| + m_consumer = consumer;
|
| + m_consumer->setClient(this);
|
| + onStateChange();
|
| + }
|
| +
|
| + void OnWritable(MojoResult result) {
|
| + // LOG(ERROR) << "OnWritable";
|
| + onStateChange();
|
| + }
|
| + void onStateChange() override {
|
| + TRACE_EVENT0("ServiceWorker", "FetchDataLoaderAsDataPipe::onStateChange");
|
| + // LOG(ERROR) << "onStateChange";
|
| + bool should_wait = false;
|
| + while (!should_wait) {
|
| + const char* buffer;
|
| + size_t available;
|
| + auto result = m_consumer->beginRead(&buffer, &available);
|
| + if (result == BytesConsumer::Result::ShouldWait)
|
| + return;
|
| + if (result == BytesConsumer::Result::Ok) {
|
| + // LOG(ERROR) << "available " << available;
|
| + if (available > 0) {
|
| + uint32_t num_bytes = available;
|
| + MojoResult mojo_result = WriteDataRaw(
|
| + m_handle.get(), buffer, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
|
| + // LOG(ERROR) << "mojo_result " << mojo_result;
|
| + // LOG(ERROR) << "num_bytes " << num_bytes;
|
| + // LOG(ERROR) << " data: [" << std::string(buffer, num_bytes) << "]";
|
| + if (mojo_result == MOJO_RESULT_OK) {
|
| + // LOG(ERROR) << " MOJO_RESULT_OK";
|
| + TRACE_EVENT1("ServiceWorker",
|
| + "FetchDataLoaderAsDataPipe::onStateChange endRead",
|
| + "bytes", num_bytes);
|
| + result = m_consumer->endRead(num_bytes);
|
| + } else {
|
| + result = m_consumer->endRead(0);
|
| + if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
|
| + // LOG(ERROR) << " MOJO_RESULT_SHOULD_WAIT";
|
| + should_wait = true;
|
| + m_handleWatcher.ArmOrNotify();
|
| + } else {
|
| + // LOG(ERROR) << " mojo_result != MOJO_RESULT_OK";
|
| + m_consumer->cancel();
|
| + m_handle.reset();
|
| + m_handleWatcher.Cancel();
|
| + m_client->didFetchDataLoadFailed();
|
| + return;
|
| + }
|
| + }
|
| + } else {
|
| + result = m_consumer->endRead(available);
|
| + }
|
| + }
|
| + switch (result) {
|
| + case BytesConsumer::Result::Ok:
|
| + break;
|
| + case BytesConsumer::Result::ShouldWait:
|
| + NOTREACHED();
|
| + return;
|
| + case BytesConsumer::Result::Done:
|
| + // LOG(ERROR) << "BytesConsumer::Result::Done";
|
| + m_handle.reset();
|
| + m_handleWatcher.Cancel();
|
| + m_client->didFetchDataLoadedStream();
|
| + return;
|
| + case BytesConsumer::Result::Error:
|
| + LOG(ERROR) << "BytesConsumer::Result::Error";
|
| + m_client->didFetchDataLoadFailed();
|
| + return;
|
| + }
|
| + }
|
| + }
|
| +
|
| + void cancel() override { m_consumer->cancel(); }
|
| +
|
| + DEFINE_INLINE_TRACE() {
|
| + visitor->trace(m_consumer);
|
| + visitor->trace(m_client);
|
| + FetchDataLoader::trace(visitor);
|
| + BytesConsumer::Client::trace(visitor);
|
| + }
|
| +
|
| + Member<BytesConsumer> m_consumer;
|
| + Member<FetchDataLoader::Client> m_client;
|
| +
|
| + mojo::ScopedDataPipeProducerHandle m_handle;
|
| + mojo::SimpleWatcher m_handleWatcher;
|
| +};
|
| +
|
| } // namespace
|
|
|
| FetchDataLoader* FetchDataLoader::createLoaderAsBlobHandle(
|
| @@ -324,4 +437,9 @@ FetchDataLoader* FetchDataLoader::createLoaderAsStream(Stream* outStream) {
|
| return new FetchDataLoaderAsStream(outStream);
|
| }
|
|
|
| +FetchDataLoader* FetchDataLoader::createLoaderAsDataPipe(
|
| + mojo::ScopedDataPipeProducerHandle handle) {
|
| + return new FetchDataLoaderAsDataPipe(std::move(handle));
|
| +}
|
| +
|
| } // namespace blink
|
|
|