Chromium Code Reviews| Index: content/browser/service_worker/service_worker_data_pipe_reader.cc |
| diff --git a/content/browser/service_worker/service_worker_data_pipe_reader.cc b/content/browser/service_worker/service_worker_data_pipe_reader.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8237e2b443a27059a3cf047e4fba63eaf0e5ce6f |
| --- /dev/null |
| +++ b/content/browser/service_worker/service_worker_data_pipe_reader.cc |
| @@ -0,0 +1,201 @@ |
| +// Copyright 2017 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "content/browser/service_worker/service_worker_data_pipe_reader.h" |
| + |
| +#include "base/trace_event/trace_event.h" |
| +#include "content/browser/service_worker/service_worker_url_request_job.h" |
| +#include "content/browser/service_worker/service_worker_version.h" |
| +#include "net/base/io_buffer.h" |
| + |
| +namespace content { |
| + |
| +ServiceWorkerDataPipeReader::ServiceWorkerDataPipeReader( |
| + ServiceWorkerURLRequestJob* owner, |
| + scoped_refptr<ServiceWorkerVersion> streaming_version, |
| + blink::mojom::ServiceWorkerStreamHandlePtr stream_handle) |
| + : owner_(owner), |
| + streaming_version_(streaming_version), |
| + stream_pending_buffer_size_(0), |
| + handle_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC), |
| + stream_(std::move(stream_handle->stream)), |
| + binding_(this, std::move(stream_handle->callback_request)), |
| + producer_state_(State::STREAMING) { |
| + TRACE_EVENT_ASYNC_BEGIN1("ServiceWorker", "ServiceWorkerDataPipeReader", this, |
| + "Url", owner->request()->url().spec()); |
| + streaming_version_->AddStreamingURLRequestJob(owner_); |
| + binding_.set_connection_error_handler(base::Bind( |
| + &ServiceWorkerDataPipeReader::OnAborted, base::Unretained(this))); |
| +} |
| + |
| +ServiceWorkerDataPipeReader::~ServiceWorkerDataPipeReader() { |
| + DCHECK(streaming_version_); |
| + streaming_version_->RemoveStreamingURLRequestJob(owner_); |
| + streaming_version_ = nullptr; |
| + |
| + TRACE_EVENT_ASYNC_END0("ServiceWorker", "ServiceWorkerDataPipeReader", this); |
| +} |
| + |
| +void ServiceWorkerDataPipeReader::Start() { |
| + TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader", |
| + this, "Start"); |
| + handle_watcher_.Watch( |
| + stream_.get(), |
| + MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
| + base::Bind(&ServiceWorkerDataPipeReader::OnHandleGotSignal, |
| + base::Unretained(this))); |
| + owner_->OnResponseStarted(); |
| +} |
| + |
| +void ServiceWorkerDataPipeReader::OnHandleGotSignal(MojoResult) { |
|
falken
2017/04/19 06:56:52
It's a little surprising MojoResult is not used. I
shimazu
2017/04/19 08:32:16
Yes, it is. MojoResult indicates the state of the
|
| + TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader", |
| + this, "OnHandleGotReadable"); |
|
falken
2017/04/19 06:56:52
OnHandleGotSignal?
shimazu
2017/04/19 08:32:16
Oops, thanks. Done.
|
| + // Do nothing if stream_pending_buffer_ is empty, i.e. there's no ReadRawData |
| + // operation waiting for IO completion. |
| + if (!stream_pending_buffer_) |
| + return; |
| + |
| + // If state() is not STREAMING, it means the data pipe was disconnected and |
| + // OnCompleted/OnAborted has already been called. |
| + if (state() != State::STREAMING) { |
| + handle_watcher_.Cancel(); |
| + AsyncComplete(); |
| + } |
| + |
| + // |stream_pending_buffer_| is set to the IOBuffer instance provided to |
| + // ReadRawData() by URLRequestJob. |
| + uint32_t size_to_pass = stream_pending_buffer_size_; |
| + MojoResult mojo_result = |
| + mojo::ReadDataRaw(stream_.get(), stream_pending_buffer_->data(), |
| + &size_to_pass, MOJO_READ_DATA_FLAG_NONE); |
| + |
| + switch (mojo_result) { |
| + case MOJO_RESULT_OK: |
| + stream_pending_buffer_ = nullptr; |
| + stream_pending_buffer_size_ = 0; |
| + owner_->OnReadRawDataComplete(size_to_pass); |
| + return; |
| + case MOJO_RESULT_FAILED_PRECONDITION: |
| + stream_.reset(); |
| + handle_watcher_.Cancel(); |
| + // If OnCompleted/OnAborted has already been called, let this request |
| + // complete. |
| + if (state() != State::STREAMING) |
| + AsyncComplete(); |
| + return; |
| + case MOJO_RESULT_SHOULD_WAIT: |
| + return; |
| + case MOJO_RESULT_BUSY: |
|
kinuko
2017/04/19 09:17:47
Why is this handled as an error, is that because t
shimazu
2017/04/20 04:20:41
Yes, this won't happen. NOTREACHED might be prefer
|
| + case MOJO_RESULT_INVALID_ARGUMENT: |
| + stream_pending_buffer_ = nullptr; |
| + stream_pending_buffer_size_ = 0; |
| + stream_.reset(); |
| + handle_watcher_.Cancel(); |
| + owner_->OnReadRawDataComplete(net::ERR_FAILED); |
| + return; |
| + } |
| + NOTREACHED(); |
| +} |
| + |
| +int ServiceWorkerDataPipeReader::ReadRawData(net::IOBuffer* buf, int buf_size) { |
| + TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader", |
| + this, "ReadRawData"); |
| + DCHECK(!stream_pending_buffer_); |
| + // If state() is not STREAMING, it means the data pipe was disconnected and |
| + // OnCompleted/OnAborted has already been called. |
| + if (state() != State::STREAMING) |
| + return SyncComplete(); |
| + |
| + uint32_t size_to_pass = buf_size; |
| + MojoResult mojo_result = mojo::ReadDataRaw( |
| + stream_.get(), buf->data(), &size_to_pass, MOJO_READ_DATA_FLAG_NONE); |
| + switch (mojo_result) { |
| + case MOJO_RESULT_OK: |
| + return size_to_pass; |
| + case MOJO_RESULT_FAILED_PRECONDITION: |
| + stream_.reset(); |
| + handle_watcher_.Cancel(); |
| + // Complete/Abort asynchronously if OnCompleted/OnAborted has not been |
| + // called yet. |
| + if (state() == State::STREAMING) { |
| + stream_pending_buffer_ = buf; |
| + stream_pending_buffer_size_ = buf_size; |
| + return net::ERR_IO_PENDING; |
| + } |
| + return SyncComplete(); |
| + case MOJO_RESULT_SHOULD_WAIT: |
| + stream_pending_buffer_ = buf; |
| + stream_pending_buffer_size_ = buf_size; |
| + return net::ERR_IO_PENDING; |
| + case MOJO_RESULT_BUSY: |
| + case MOJO_RESULT_INVALID_ARGUMENT: |
| + return net::ERR_FAILED; |
| + } |
| + NOTREACHED(); |
| + return net::ERR_FAILED; |
| +} |
| + |
| +void ServiceWorkerDataPipeReader::OnCompleted() { |
| + producer_state_ = State::COMPLETED; |
| + if (stream_pending_buffer_ && state() != State::STREAMING) |
| + AsyncComplete(); |
| +} |
| + |
| +void ServiceWorkerDataPipeReader::OnAborted() { |
| + producer_state_ = State::ABORTED; |
| + if (stream_pending_buffer_ && state() != State::STREAMING) |
| + AsyncComplete(); |
| +} |
| + |
| +void ServiceWorkerDataPipeReader::AsyncComplete() { |
| + // This works only after ReadRawData returns net::ERR_IO_PENDING. |
| + DCHECK(stream_pending_buffer_); |
| + |
| + switch (state()) { |
| + case State::STREAMING: |
| + NOTREACHED(); |
| + case State::COMPLETED: |
| + stream_pending_buffer_ = nullptr; |
| + stream_pending_buffer_size_ = 0; |
| + handle_watcher_.Cancel(); |
| + owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE); |
| + owner_->OnReadRawDataComplete(net::OK); |
| + return; |
| + case State::ABORTED: |
| + stream_pending_buffer_ = nullptr; |
| + stream_pending_buffer_size_ = 0; |
| + handle_watcher_.Cancel(); |
| + owner_->RecordResult( |
| + ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED); |
| + owner_->OnReadRawDataComplete(net::ERR_CONNECTION_RESET); |
| + return; |
| + } |
| +} |
| + |
| +int ServiceWorkerDataPipeReader::SyncComplete() { |
| + // This works only in ReadRawData. |
| + DCHECK(!stream_pending_buffer_); |
| + |
| + switch (state()) { |
| + case State::STREAMING: |
| + break; |
| + case State::COMPLETED: |
| + owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE); |
| + return net::OK; |
| + case State::ABORTED: |
| + owner_->RecordResult( |
| + ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED); |
| + return net::ERR_CONNECTION_RESET; |
| + } |
| + NOTREACHED(); |
| + return net::ERR_FAILED; |
| +} |
| + |
| +ServiceWorkerDataPipeReader::State ServiceWorkerDataPipeReader::state() { |
| + if (!stream_.is_valid()) |
| + return producer_state_; |
| + return State::STREAMING; |
| +} |
| + |
| +} // namespace content |