Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1146)

Side by Side Diff: content/browser/service_worker/service_worker_data_pipe_reader.cc

Issue 2703343002: ServiceWorker: Use mojo's data pipe for respondWith(stream) (Closed)
Patch Set: Addressed comments from kinuko and haraken Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/service_worker/service_worker_data_pipe_reader.h"
6
7 #include "base/trace_event/trace_event.h"
8 #include "content/browser/service_worker/service_worker_url_request_job.h"
9 #include "content/browser/service_worker/service_worker_version.h"
10 #include "net/base/io_buffer.h"
11
12 namespace content {
13
14 ServiceWorkerDataPipeReader::ServiceWorkerDataPipeReader(
15 ServiceWorkerURLRequestJob* owner,
16 scoped_refptr<ServiceWorkerVersion> streaming_version,
17 blink::mojom::ServiceWorkerStreamHandlePtr stream_handle)
18 : owner_(owner),
19 streaming_version_(streaming_version),
20 stream_pending_buffer_size_(0),
21 handle_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC),
22 stream_(std::move(stream_handle->stream)),
23 binding_(this, std::move(stream_handle->callback_request)),
24 producer_state_(State::kStreaming) {
25 TRACE_EVENT_ASYNC_BEGIN1("ServiceWorker", "ServiceWorkerDataPipeReader", this,
26 "Url", owner->request()->url().spec());
27 streaming_version_->AddStreamingURLRequestJob(owner_);
28 binding_.set_connection_error_handler(base::Bind(
29 &ServiceWorkerDataPipeReader::OnAborted, base::Unretained(this)));
30 }
31
32 ServiceWorkerDataPipeReader::~ServiceWorkerDataPipeReader() {
33 DCHECK(streaming_version_);
34 streaming_version_->RemoveStreamingURLRequestJob(owner_);
35 streaming_version_ = nullptr;
36
37 TRACE_EVENT_ASYNC_END0("ServiceWorker", "ServiceWorkerDataPipeReader", this);
38 }
39
40 void ServiceWorkerDataPipeReader::Start() {
41 TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
42 this, "Start");
43 handle_watcher_.Watch(
44 stream_.get(),
45 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
46 base::Bind(&ServiceWorkerDataPipeReader::OnHandleGotSignal,
47 base::Unretained(this)));
48 owner_->OnResponseStarted();
49 }
50
51 void ServiceWorkerDataPipeReader::OnHandleGotSignal(MojoResult) {
52 TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
53 this, "OnHandleGotSignal");
54 // Do nothing if stream_pending_buffer_ is empty, i.e. there's no ReadRawData
55 // operation waiting for IO completion.
56 if (!stream_pending_buffer_)
57 return;
58
59 // If state() is not STREAMING, it means the data pipe was disconnected and
60 // OnCompleted/OnAborted has already been called.
61 if (state() != State::kStreaming) {
62 handle_watcher_.Cancel();
63 AsyncComplete();
64 }
65
66 // |stream_pending_buffer_| is set to the IOBuffer instance provided to
67 // ReadRawData() by URLRequestJob.
68 uint32_t size_to_pass = stream_pending_buffer_size_;
69 MojoResult mojo_result =
70 mojo::ReadDataRaw(stream_.get(), stream_pending_buffer_->data(),
71 &size_to_pass, MOJO_READ_DATA_FLAG_NONE);
72
73 switch (mojo_result) {
74 case MOJO_RESULT_OK:
75 stream_pending_buffer_ = nullptr;
76 stream_pending_buffer_size_ = 0;
77 owner_->OnReadRawDataComplete(size_to_pass);
78 return;
79 case MOJO_RESULT_FAILED_PRECONDITION:
80 stream_.reset();
81 handle_watcher_.Cancel();
82 // If OnCompleted/OnAborted has already been called, let this request
83 // complete.
84 if (state() != State::kStreaming)
85 AsyncComplete();
86 return;
87 case MOJO_RESULT_SHOULD_WAIT:
88 return;
89 case MOJO_RESULT_INVALID_ARGUMENT:
90 case MOJO_RESULT_OUT_OF_RANGE:
91 case MOJO_RESULT_BUSY:
92 break;
93 }
94 NOTREACHED();
95 }
96
97 int ServiceWorkerDataPipeReader::ReadRawData(net::IOBuffer* buf, int buf_size) {
98 TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
99 this, "ReadRawData");
100 DCHECK(!stream_pending_buffer_);
101 // If state() is not STREAMING, it means the data pipe was disconnected and
102 // OnCompleted/OnAborted has already been called.
103 if (state() != State::kStreaming)
104 return SyncComplete();
105
106 uint32_t size_to_pass = buf_size;
107 MojoResult mojo_result = mojo::ReadDataRaw(
108 stream_.get(), buf->data(), &size_to_pass, MOJO_READ_DATA_FLAG_NONE);
109 switch (mojo_result) {
110 case MOJO_RESULT_OK:
111 return size_to_pass;
112 case MOJO_RESULT_FAILED_PRECONDITION:
113 stream_.reset();
114 handle_watcher_.Cancel();
115 // Complete/Abort asynchronously if OnCompleted/OnAborted has not been
116 // called yet.
117 if (state() == State::kStreaming) {
118 stream_pending_buffer_ = buf;
119 stream_pending_buffer_size_ = buf_size;
120 return net::ERR_IO_PENDING;
121 }
122 return SyncComplete();
123 case MOJO_RESULT_SHOULD_WAIT:
124 stream_pending_buffer_ = buf;
125 stream_pending_buffer_size_ = buf_size;
126 return net::ERR_IO_PENDING;
127 case MOJO_RESULT_INVALID_ARGUMENT:
128 case MOJO_RESULT_OUT_OF_RANGE:
129 case MOJO_RESULT_BUSY:
130 break;
131 }
132 NOTREACHED();
133 return net::ERR_FAILED;
134 }
135
136 void ServiceWorkerDataPipeReader::OnCompleted() {
137 producer_state_ = State::kCompleted;
138 if (stream_pending_buffer_ && state() != State::kStreaming)
139 AsyncComplete();
140 }
141
142 void ServiceWorkerDataPipeReader::OnAborted() {
143 producer_state_ = State::kAborted;
144 if (stream_pending_buffer_ && state() != State::kStreaming)
145 AsyncComplete();
146 }
147
148 void ServiceWorkerDataPipeReader::AsyncComplete() {
149 // This works only after ReadRawData returns net::ERR_IO_PENDING.
150 DCHECK(stream_pending_buffer_);
151
152 switch (state()) {
153 case State::kStreaming:
154 NOTREACHED();
155 case State::kCompleted:
156 stream_pending_buffer_ = nullptr;
157 stream_pending_buffer_size_ = 0;
158 handle_watcher_.Cancel();
159 owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE);
160 owner_->OnReadRawDataComplete(net::OK);
161 return;
162 case State::kAborted:
163 stream_pending_buffer_ = nullptr;
164 stream_pending_buffer_size_ = 0;
165 handle_watcher_.Cancel();
166 owner_->RecordResult(
167 ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED);
168 owner_->OnReadRawDataComplete(net::ERR_CONNECTION_RESET);
169 return;
170 }
171 }
172
173 int ServiceWorkerDataPipeReader::SyncComplete() {
174 // This works only in ReadRawData.
175 DCHECK(!stream_pending_buffer_);
176
177 switch (state()) {
178 case State::kStreaming:
179 break;
180 case State::kCompleted:
181 owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE);
182 return net::OK;
183 case State::kAborted:
184 owner_->RecordResult(
185 ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED);
186 return net::ERR_CONNECTION_RESET;
187 }
188 NOTREACHED();
189 return net::ERR_FAILED;
190 }
191
192 ServiceWorkerDataPipeReader::State ServiceWorkerDataPipeReader::state() {
193 if (!stream_.is_valid())
194 return producer_state_;
195 return State::kStreaming;
196 }
197
198 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698