| Index: content/browser/service_worker/service_worker_data_pipe_reader.cc
 | 
| diff --git a/content/browser/service_worker/service_worker_data_pipe_reader.cc b/content/browser/service_worker/service_worker_data_pipe_reader.cc
 | 
| new file mode 100644
 | 
| index 0000000000000000000000000000000000000000..d31543018d30cbc330f888f6061170ecfd06be66
 | 
| --- /dev/null
 | 
| +++ b/content/browser/service_worker/service_worker_data_pipe_reader.cc
 | 
| @@ -0,0 +1,198 @@
 | 
| +// Copyright 2017 The Chromium Authors. All rights reserved.
 | 
| +// Use of this source code is governed by a BSD-style license that can be
 | 
| +// found in the LICENSE file.
 | 
| +
 | 
| +#include "content/browser/service_worker/service_worker_data_pipe_reader.h"
 | 
| +
 | 
| +#include "base/trace_event/trace_event.h"
 | 
| +#include "content/browser/service_worker/service_worker_url_request_job.h"
 | 
| +#include "content/browser/service_worker/service_worker_version.h"
 | 
| +#include "net/base/io_buffer.h"
 | 
| +
 | 
| +namespace content {
 | 
| +
 | 
| +ServiceWorkerDataPipeReader::ServiceWorkerDataPipeReader(
 | 
| +    ServiceWorkerURLRequestJob* owner,
 | 
| +    scoped_refptr<ServiceWorkerVersion> streaming_version,
 | 
| +    blink::mojom::ServiceWorkerStreamHandlePtr stream_handle)
 | 
| +    : owner_(owner),
 | 
| +      streaming_version_(streaming_version),
 | 
| +      stream_pending_buffer_size_(0),
 | 
| +      handle_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC),
 | 
| +      stream_(std::move(stream_handle->stream)),
 | 
| +      binding_(this, std::move(stream_handle->callback_request)),
 | 
| +      producer_state_(State::kStreaming) {
 | 
| +  TRACE_EVENT_ASYNC_BEGIN1("ServiceWorker", "ServiceWorkerDataPipeReader", this,
 | 
| +                           "Url", owner->request()->url().spec());
 | 
| +  streaming_version_->AddStreamingURLRequestJob(owner_);
 | 
| +  binding_.set_connection_error_handler(base::Bind(
 | 
| +      &ServiceWorkerDataPipeReader::OnAborted, base::Unretained(this)));
 | 
| +}
 | 
| +
 | 
| +ServiceWorkerDataPipeReader::~ServiceWorkerDataPipeReader() {
 | 
| +  DCHECK(streaming_version_);
 | 
| +  streaming_version_->RemoveStreamingURLRequestJob(owner_);
 | 
| +  streaming_version_ = nullptr;
 | 
| +
 | 
| +  TRACE_EVENT_ASYNC_END0("ServiceWorker", "ServiceWorkerDataPipeReader", this);
 | 
| +}
 | 
| +
 | 
| +void ServiceWorkerDataPipeReader::Start() {
 | 
| +  TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
 | 
| +                               this, "Start");
 | 
| +  handle_watcher_.Watch(
 | 
| +      stream_.get(),
 | 
| +      MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
 | 
| +      base::Bind(&ServiceWorkerDataPipeReader::OnHandleGotSignal,
 | 
| +                 base::Unretained(this)));
 | 
| +  owner_->OnResponseStarted();
 | 
| +}
 | 
| +
 | 
| +void ServiceWorkerDataPipeReader::OnHandleGotSignal(MojoResult) {
 | 
| +  TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
 | 
| +                               this, "OnHandleGotSignal");
 | 
| +  // Do nothing if stream_pending_buffer_ is empty, i.e. there's no ReadRawData
 | 
| +  // operation waiting for IO completion.
 | 
| +  if (!stream_pending_buffer_)
 | 
| +    return;
 | 
| +
 | 
| +  // If state() is not STREAMING, it means the data pipe was disconnected and
 | 
| +  // OnCompleted/OnAborted has already been called.
 | 
| +  if (state() != State::kStreaming) {
 | 
| +    handle_watcher_.Cancel();
 | 
| +    AsyncComplete();
 | 
| +  }
 | 
| +
 | 
| +  // |stream_pending_buffer_| is set to the IOBuffer instance provided to
 | 
| +  // ReadRawData() by URLRequestJob.
 | 
| +  uint32_t size_to_pass = stream_pending_buffer_size_;
 | 
| +  MojoResult mojo_result =
 | 
| +      mojo::ReadDataRaw(stream_.get(), stream_pending_buffer_->data(),
 | 
| +                        &size_to_pass, MOJO_READ_DATA_FLAG_NONE);
 | 
| +
 | 
| +  switch (mojo_result) {
 | 
| +    case MOJO_RESULT_OK:
 | 
| +      stream_pending_buffer_ = nullptr;
 | 
| +      stream_pending_buffer_size_ = 0;
 | 
| +      owner_->OnReadRawDataComplete(size_to_pass);
 | 
| +      return;
 | 
| +    case MOJO_RESULT_FAILED_PRECONDITION:
 | 
| +      stream_.reset();
 | 
| +      handle_watcher_.Cancel();
 | 
| +      // If OnCompleted/OnAborted has already been called, let this request
 | 
| +      // complete.
 | 
| +      if (state() != State::kStreaming)
 | 
| +        AsyncComplete();
 | 
| +      return;
 | 
| +    case MOJO_RESULT_SHOULD_WAIT:
 | 
| +      return;
 | 
| +    case MOJO_RESULT_INVALID_ARGUMENT:
 | 
| +    case MOJO_RESULT_OUT_OF_RANGE:
 | 
| +    case MOJO_RESULT_BUSY:
 | 
| +      break;
 | 
| +  }
 | 
| +  NOTREACHED();
 | 
| +}
 | 
| +
 | 
| +int ServiceWorkerDataPipeReader::ReadRawData(net::IOBuffer* buf, int buf_size) {
 | 
| +  TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
 | 
| +                               this, "ReadRawData");
 | 
| +  DCHECK(!stream_pending_buffer_);
 | 
| +  // If state() is not STREAMING, it means the data pipe was disconnected and
 | 
| +  // OnCompleted/OnAborted has already been called.
 | 
| +  if (state() != State::kStreaming)
 | 
| +    return SyncComplete();
 | 
| +
 | 
| +  uint32_t size_to_pass = buf_size;
 | 
| +  MojoResult mojo_result = mojo::ReadDataRaw(
 | 
| +      stream_.get(), buf->data(), &size_to_pass, MOJO_READ_DATA_FLAG_NONE);
 | 
| +  switch (mojo_result) {
 | 
| +    case MOJO_RESULT_OK:
 | 
| +      return size_to_pass;
 | 
| +    case MOJO_RESULT_FAILED_PRECONDITION:
 | 
| +      stream_.reset();
 | 
| +      handle_watcher_.Cancel();
 | 
| +      // Complete/Abort asynchronously if OnCompleted/OnAborted has not been
 | 
| +      // called yet.
 | 
| +      if (state() == State::kStreaming) {
 | 
| +        stream_pending_buffer_ = buf;
 | 
| +        stream_pending_buffer_size_ = buf_size;
 | 
| +        return net::ERR_IO_PENDING;
 | 
| +      }
 | 
| +      return SyncComplete();
 | 
| +    case MOJO_RESULT_SHOULD_WAIT:
 | 
| +      stream_pending_buffer_ = buf;
 | 
| +      stream_pending_buffer_size_ = buf_size;
 | 
| +      return net::ERR_IO_PENDING;
 | 
| +    case MOJO_RESULT_INVALID_ARGUMENT:
 | 
| +    case MOJO_RESULT_OUT_OF_RANGE:
 | 
| +    case MOJO_RESULT_BUSY:
 | 
| +      break;
 | 
| +  }
 | 
| +  NOTREACHED();
 | 
| +  return net::ERR_FAILED;
 | 
| +}
 | 
| +
 | 
| +void ServiceWorkerDataPipeReader::OnCompleted() {
 | 
| +  producer_state_ = State::kCompleted;
 | 
| +  if (stream_pending_buffer_ && state() != State::kStreaming)
 | 
| +    AsyncComplete();
 | 
| +}
 | 
| +
 | 
| +void ServiceWorkerDataPipeReader::OnAborted() {
 | 
| +  producer_state_ = State::kAborted;
 | 
| +  if (stream_pending_buffer_ && state() != State::kStreaming)
 | 
| +    AsyncComplete();
 | 
| +}
 | 
| +
 | 
| +void ServiceWorkerDataPipeReader::AsyncComplete() {
 | 
| +  // This works only after ReadRawData returns net::ERR_IO_PENDING.
 | 
| +  DCHECK(stream_pending_buffer_);
 | 
| +
 | 
| +  switch (state()) {
 | 
| +    case State::kStreaming:
 | 
| +      NOTREACHED();
 | 
| +    case State::kCompleted:
 | 
| +      stream_pending_buffer_ = nullptr;
 | 
| +      stream_pending_buffer_size_ = 0;
 | 
| +      handle_watcher_.Cancel();
 | 
| +      owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE);
 | 
| +      owner_->OnReadRawDataComplete(net::OK);
 | 
| +      return;
 | 
| +    case State::kAborted:
 | 
| +      stream_pending_buffer_ = nullptr;
 | 
| +      stream_pending_buffer_size_ = 0;
 | 
| +      handle_watcher_.Cancel();
 | 
| +      owner_->RecordResult(
 | 
| +          ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED);
 | 
| +      owner_->OnReadRawDataComplete(net::ERR_CONNECTION_RESET);
 | 
| +      return;
 | 
| +  }
 | 
| +}
 | 
| +
 | 
| +int ServiceWorkerDataPipeReader::SyncComplete() {
 | 
| +  // This works only in ReadRawData.
 | 
| +  DCHECK(!stream_pending_buffer_);
 | 
| +
 | 
| +  switch (state()) {
 | 
| +    case State::kStreaming:
 | 
| +      break;
 | 
| +    case State::kCompleted:
 | 
| +      owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE);
 | 
| +      return net::OK;
 | 
| +    case State::kAborted:
 | 
| +      owner_->RecordResult(
 | 
| +          ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED);
 | 
| +      return net::ERR_CONNECTION_RESET;
 | 
| +  }
 | 
| +  NOTREACHED();
 | 
| +  return net::ERR_FAILED;
 | 
| +}
 | 
| +
 | 
| +ServiceWorkerDataPipeReader::State ServiceWorkerDataPipeReader::state() {
 | 
| +  if (!stream_.is_valid())
 | 
| +    return producer_state_;
 | 
| +  return State::kStreaming;
 | 
| +}
 | 
| +
 | 
| +}  // namespace content
 | 
| 
 |