Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(829)

Unified Diff: content/browser/service_worker/service_worker_data_pipe_reader.cc

Issue 2703343002: ServiceWorker: Use mojo's data pipe for respondWith(stream) (Closed)
Patch Set: Make SWDataPipeReader::State private Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/browser/service_worker/service_worker_data_pipe_reader.cc
diff --git a/content/browser/service_worker/service_worker_data_pipe_reader.cc b/content/browser/service_worker/service_worker_data_pipe_reader.cc
new file mode 100644
index 0000000000000000000000000000000000000000..b6329138ed1e005f7d5ec883aa31ee46a3418627
--- /dev/null
+++ b/content/browser/service_worker/service_worker_data_pipe_reader.cc
@@ -0,0 +1,200 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/service_worker/service_worker_data_pipe_reader.h"
+
+#include "base/trace_event/trace_event.h"
+#include "content/browser/service_worker/service_worker_url_request_job.h"
+#include "content/browser/service_worker/service_worker_version.h"
+#include "net/base/io_buffer.h"
+
+namespace content {
+
+ServiceWorkerDataPipeReader::ServiceWorkerDataPipeReader(
+ ServiceWorkerURLRequestJob* owner,
+ scoped_refptr<ServiceWorkerVersion> streaming_version,
+ blink::mojom::ServiceWorkerStreamHandlePtr stream_handle)
+ : owner_(owner),
+ streaming_version_(streaming_version),
+ handle_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC),
+ stream_(std::move(stream_handle->stream)),
+ binding_(this, std::move(stream_handle->callback_request)),
+ producer_state_(State::STREAMING) {
+ TRACE_EVENT_ASYNC_BEGIN1("ServiceWorker", "ServiceWorkerDataPipeReader", this,
+ "Url", owner->request()->url().spec());
+ streaming_version_->AddStreamingURLRequestJob(owner_);
+ binding_.set_connection_error_handler(base::Bind(
+ &ServiceWorkerDataPipeReader::OnAborted, base::Unretained(this)));
+}
+
+ServiceWorkerDataPipeReader::~ServiceWorkerDataPipeReader() {
+ DCHECK(streaming_version_);
+ streaming_version_->RemoveStreamingURLRequestJob(owner_);
+ streaming_version_ = nullptr;
+
+ TRACE_EVENT_ASYNC_END0("ServiceWorker", "ServiceWorkerDataPipeReader", this);
+}
+
+void ServiceWorkerDataPipeReader::Start() {
+ TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
+ this, "Start");
+ handle_watcher_.Watch(
+ stream_.get(),
+ MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ base::Bind(&ServiceWorkerDataPipeReader::OnHandleGotSignal,
+ base::Unretained(this)));
+ owner_->OnResponseStarted();
+}
+
+void ServiceWorkerDataPipeReader::OnHandleGotSignal(MojoResult) {
+ TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
+ this, "OnHandleGotReadable");
+ // Do nothing if stream_pending_buffer_ is empty, i.e. there's no ReadRawData
+ // operation waiting for IO completion.
+ if (!stream_pending_buffer_)
+ return;
+
+ // If state() is not STREAMING, it means the data pipe was disconnected and
+ // OnCompleted/OnAborted has already been called.
+ if (state() != State::STREAMING) {
+ handle_watcher_.Cancel();
+ AsyncComplete();
+ }
+
+ // |stream_pending_buffer_| is set to the IOBuffer instance provided to
+ // ReadRawData() by URLRequestJob.
+ uint32_t size_to_pass = stream_pending_buffer_size_;
+ MojoResult mojo_result =
+ mojo::ReadDataRaw(stream_.get(), stream_pending_buffer_->data(),
+ &size_to_pass, MOJO_READ_DATA_FLAG_NONE);
+
+ switch (mojo_result) {
+ case MOJO_RESULT_OK:
+ stream_pending_buffer_ = nullptr;
+ stream_pending_buffer_size_ = 0;
+ owner_->OnReadRawDataComplete(size_to_pass);
+ return;
+ case MOJO_RESULT_FAILED_PRECONDITION:
+ stream_.reset();
+ handle_watcher_.Cancel();
+ // If OnCompleted/OnAborted has already been called, let this request
+ // complete.
+ if (state() != State::STREAMING)
+ AsyncComplete();
+ return;
+ case MOJO_RESULT_SHOULD_WAIT:
+ return;
+ case MOJO_RESULT_BUSY:
+ case MOJO_RESULT_INVALID_ARGUMENT:
+ stream_pending_buffer_ = nullptr;
+ stream_pending_buffer_size_ = 0;
+ stream_.reset();
+ handle_watcher_.Cancel();
+ owner_->OnReadRawDataComplete(net::ERR_FAILED);
+ return;
+ }
+ NOTREACHED();
+}
+
+int ServiceWorkerDataPipeReader::ReadRawData(net::IOBuffer* buf, int buf_size) {
+ TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
+ this, "ReadRawData");
+ DCHECK(!stream_pending_buffer_);
+ // If state() is not STREAMING, it means the data pipe was disconnected and
+ // OnCompleted/OnAborted has already been called.
+ if (state() != State::STREAMING)
+ return SyncComplete();
+
+ uint32_t size_to_pass = buf_size;
+ MojoResult mojo_result = mojo::ReadDataRaw(
+ stream_.get(), buf->data(), &size_to_pass, MOJO_READ_DATA_FLAG_NONE);
+ switch (mojo_result) {
+ case MOJO_RESULT_OK:
+ return size_to_pass;
+ case MOJO_RESULT_FAILED_PRECONDITION:
+ stream_.reset();
+ handle_watcher_.Cancel();
+ // Complete/Abort asynchronously if OnCompleted/OnAborted has not been
+ // called yet.
falken 2017/04/18 05:05:33 Where does this async completion happen? In OnHand
shimazu 2017/04/19 05:49:46 This state means |stream_| has already been discon
+ if (state() == State::STREAMING) {
+ stream_pending_buffer_ = buf;
+ stream_pending_buffer_size_ = buf_size;
+ return net::ERR_IO_PENDING;
+ }
+ return SyncComplete();
+ case MOJO_RESULT_SHOULD_WAIT:
+ stream_pending_buffer_ = buf;
+ stream_pending_buffer_size_ = buf_size;
+ return net::ERR_IO_PENDING;
+ case MOJO_RESULT_BUSY:
+ case MOJO_RESULT_INVALID_ARGUMENT:
+ return net::ERR_FAILED;
+ }
+ NOTREACHED();
+ return net::ERR_FAILED;
+}
+
+void ServiceWorkerDataPipeReader::OnCompleted() {
+ producer_state_ = State::COMPLETED;
+ if (stream_pending_buffer_ && state() != State::STREAMING)
+ AsyncComplete();
+}
+
+void ServiceWorkerDataPipeReader::OnAborted() {
+ producer_state_ = State::ABORTED;
+ if (stream_pending_buffer_ && state() != State::STREAMING)
+ AsyncComplete();
+}
+
+void ServiceWorkerDataPipeReader::AsyncComplete() {
+ // This works only after ReadRawData returns net::ERR_IO_PENDING.
+ DCHECK(stream_pending_buffer_);
+
+ switch (state()) {
+ case State::STREAMING:
+ NOTREACHED();
+ case State::COMPLETED:
+ stream_pending_buffer_ = nullptr;
+ stream_pending_buffer_size_ = 0;
+ handle_watcher_.Cancel();
+ owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE);
+ owner_->OnReadRawDataComplete(net::OK);
+ return;
+ case State::ABORTED:
+ stream_pending_buffer_ = nullptr;
+ stream_pending_buffer_size_ = 0;
+ handle_watcher_.Cancel();
+ owner_->RecordResult(
+ ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED);
+ owner_->OnReadRawDataComplete(net::ERR_CONNECTION_RESET);
+ return;
+ }
+}
+
+int ServiceWorkerDataPipeReader::SyncComplete() {
+ // This works only in ReadRawData.
+ DCHECK(!stream_pending_buffer_);
+
+ switch (state()) {
+ case State::STREAMING:
+ break;
+ case State::COMPLETED:
+ owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE);
+ return net::OK;
+ case State::ABORTED:
+ owner_->RecordResult(
+ ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED);
+ return net::ERR_CONNECTION_RESET;
+ }
+ NOTREACHED();
+ return net::ERR_FAILED;
+}
+
+ServiceWorkerDataPipeReader::State ServiceWorkerDataPipeReader::state() {
+ if (!stream_.is_valid())
+ return producer_state_;
+ return State::STREAMING;
+}
+
+} // namespace content

Powered by Google App Engine
This is Rietveld 408576698