| Index: third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp
 | 
| diff --git a/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp b/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp
 | 
| index 19532566a97182fa72f84e1df62944cff5282127..ac601a3b59e6087428c7204e09ad12a236334bb9 100644
 | 
| --- a/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp
 | 
| +++ b/third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp
 | 
| @@ -7,6 +7,8 @@
 | 
|  #include <memory>
 | 
|  #include "core/html/parser/TextResourceDecoder.h"
 | 
|  #include "modules/fetch/BytesConsumer.h"
 | 
| +#include "mojo/public/cpp/system/simple_watcher.h"
 | 
| +#include "platform/wtf/Functional.h"
 | 
|  #include "platform/wtf/PtrUtil.h"
 | 
|  #include "platform/wtf/text/StringBuilder.h"
 | 
|  #include "platform/wtf/text/WTFString.h"
 | 
| @@ -231,39 +233,61 @@ class FetchDataLoaderAsString final : public FetchDataLoader,
 | 
|    StringBuilder builder_;
 | 
|  };
 | 
|  
 | 
| -class FetchDataLoaderAsStream final : public FetchDataLoader,
 | 
| -                                      public BytesConsumer::Client {
 | 
| -  USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsStream);
 | 
| +class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
 | 
| +                                        public BytesConsumer::Client {
 | 
| +  USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsDataPipe);
 | 
|  
 | 
|   public:
 | 
| -  explicit FetchDataLoaderAsStream(Stream* out_stream)
 | 
| -      : out_stream_(out_stream) {}
 | 
| +  explicit FetchDataLoaderAsDataPipe(
 | 
| +      mojo::ScopedDataPipeProducerHandle out_data_pipe)
 | 
| +      : out_data_pipe_(std::move(out_data_pipe)),
 | 
| +        data_pipe_watcher_(FROM_HERE,
 | 
| +                           mojo::SimpleWatcher::ArmingPolicy::MANUAL) {}
 | 
| +  ~FetchDataLoaderAsDataPipe() override {}
 | 
|  
 | 
|    void Start(BytesConsumer* consumer,
 | 
|               FetchDataLoader::Client* client) override {
 | 
|      DCHECK(!client_);
 | 
|      DCHECK(!consumer_);
 | 
| +    data_pipe_watcher_.Watch(
 | 
| +        out_data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
 | 
| +        ConvertToBaseCallback(WTF::Bind(&FetchDataLoaderAsDataPipe::OnWritable,
 | 
| +                                        WrapWeakPersistent(this))));
 | 
| +    data_pipe_watcher_.ArmOrNotify();
 | 
|      client_ = client;
 | 
|      consumer_ = consumer;
 | 
|      consumer_->SetClient(this);
 | 
| -    OnStateChange();
 | 
|    }
 | 
|  
 | 
| +  void OnWritable(MojoResult) { OnStateChange(); }
 | 
| +
 | 
| +  // Implements BytesConsumer::Client.
 | 
|    void OnStateChange() override {
 | 
| -    bool need_to_flush = false;
 | 
| -    while (true) {
 | 
| +    bool should_wait = false;
 | 
| +    while (!should_wait) {
 | 
|        const char* buffer;
 | 
|        size_t available;
 | 
|        auto result = consumer_->BeginRead(&buffer, &available);
 | 
| -      if (result == BytesConsumer::Result::kShouldWait) {
 | 
| -        if (need_to_flush)
 | 
| -          out_stream_->Flush();
 | 
| +      if (result == BytesConsumer::Result::kShouldWait)
 | 
|          return;
 | 
| -      }
 | 
|        if (result == BytesConsumer::Result::kOk) {
 | 
| -        out_stream_->AddData(buffer, available);
 | 
| -        need_to_flush = true;
 | 
| -        result = consumer_->EndRead(available);
 | 
| +        DCHECK_GT(available, 0UL);
 | 
| +        uint32_t num_bytes = available;
 | 
| +        MojoResult mojo_result =
 | 
| +            mojo::WriteDataRaw(out_data_pipe_.get(), buffer, &num_bytes,
 | 
| +                               MOJO_WRITE_DATA_FLAG_NONE);
 | 
| +        if (mojo_result == MOJO_RESULT_OK) {
 | 
| +          result = consumer_->EndRead(num_bytes);
 | 
| +        } else if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
 | 
| +          result = consumer_->EndRead(0);
 | 
| +          should_wait = true;
 | 
| +          data_pipe_watcher_.ArmOrNotify();
 | 
| +        } else {
 | 
| +          result = consumer_->EndRead(0);
 | 
| +          StopInternal();
 | 
| +          client_->DidFetchDataLoadFailed();
 | 
| +          return;
 | 
| +        }
 | 
|        }
 | 
|        switch (result) {
 | 
|          case BytesConsumer::Result::kOk:
 | 
| @@ -272,37 +296,38 @@ class FetchDataLoaderAsStream final : public FetchDataLoader,
 | 
|            NOTREACHED();
 | 
|            return;
 | 
|          case BytesConsumer::Result::kDone:
 | 
| -          if (need_to_flush)
 | 
| -            out_stream_->Flush();
 | 
| -          out_stream_->Finalize();
 | 
| -          client_->DidFetchDataLoadedStream();
 | 
| +          StopInternal();
 | 
| +          client_->DidFetchDataLoadedDataPipe();
 | 
|            return;
 | 
|          case BytesConsumer::Result::kError:
 | 
| -          // If the stream is aborted soon after the stream is registered
 | 
| -          // to the StreamRegistry, ServiceWorkerURLRequestJob may not
 | 
| -          // notice the error and continue waiting forever.
 | 
| -          // TODO(yhirano): Add new message to report the error to the
 | 
| -          // browser process.
 | 
| -          out_stream_->Abort();
 | 
| +          StopInternal();
 | 
|            client_->DidFetchDataLoadFailed();
 | 
|            return;
 | 
|        }
 | 
|      }
 | 
|    }
 | 
|  
 | 
| -  void Cancel() override { consumer_->Cancel(); }
 | 
| +  void Cancel() override { StopInternal(); }
 | 
|  
 | 
|    DEFINE_INLINE_TRACE() {
 | 
|      visitor->Trace(consumer_);
 | 
|      visitor->Trace(client_);
 | 
| -    visitor->Trace(out_stream_);
 | 
|      FetchDataLoader::Trace(visitor);
 | 
|      BytesConsumer::Client::Trace(visitor);
 | 
|    }
 | 
|  
 | 
| + private:
 | 
| +  void StopInternal() {
 | 
| +    consumer_->Cancel();
 | 
| +    data_pipe_watcher_.Cancel();
 | 
| +    out_data_pipe_.reset();
 | 
| +  }
 | 
| +
 | 
|    Member<BytesConsumer> consumer_;
 | 
|    Member<FetchDataLoader::Client> client_;
 | 
| -  Member<Stream> out_stream_;
 | 
| +
 | 
| +  mojo::ScopedDataPipeProducerHandle out_data_pipe_;
 | 
| +  mojo::SimpleWatcher data_pipe_watcher_;
 | 
|  };
 | 
|  
 | 
|  }  // namespace
 | 
| @@ -320,8 +345,9 @@ FetchDataLoader* FetchDataLoader::CreateLoaderAsString() {
 | 
|    return new FetchDataLoaderAsString();
 | 
|  }
 | 
|  
 | 
| -FetchDataLoader* FetchDataLoader::CreateLoaderAsStream(Stream* out_stream) {
 | 
| -  return new FetchDataLoaderAsStream(out_stream);
 | 
| +FetchDataLoader* FetchDataLoader::CreateLoaderAsDataPipe(
 | 
| +    mojo::ScopedDataPipeProducerHandle out_data_pipe) {
 | 
| +  return new FetchDataLoaderAsDataPipe(std::move(out_data_pipe));
 | 
|  }
 | 
|  
 | 
|  }  // namespace blink
 | 
| 
 |