Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1518)

Side by Side Diff: content/browser/service_worker/service_worker_data_pipe_reader.cc

Issue 2703343002: ServiceWorker: Use mojo's data pipe for respondWith(stream) (Closed)
Patch Set: Make SWDataPipeReader::State private Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/service_worker/service_worker_data_pipe_reader.h"
6
7 #include "base/trace_event/trace_event.h"
8 #include "content/browser/service_worker/service_worker_url_request_job.h"
9 #include "content/browser/service_worker/service_worker_version.h"
10 #include "net/base/io_buffer.h"
11
12 namespace content {
13
14 ServiceWorkerDataPipeReader::ServiceWorkerDataPipeReader(
15 ServiceWorkerURLRequestJob* owner,
16 scoped_refptr<ServiceWorkerVersion> streaming_version,
17 blink::mojom::ServiceWorkerStreamHandlePtr stream_handle)
18 : owner_(owner),
19 streaming_version_(streaming_version),
20 handle_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC),
21 stream_(std::move(stream_handle->stream)),
22 binding_(this, std::move(stream_handle->callback_request)),
23 producer_state_(State::STREAMING) {
24 TRACE_EVENT_ASYNC_BEGIN1("ServiceWorker", "ServiceWorkerDataPipeReader", this,
25 "Url", owner->request()->url().spec());
26 streaming_version_->AddStreamingURLRequestJob(owner_);
27 binding_.set_connection_error_handler(base::Bind(
28 &ServiceWorkerDataPipeReader::OnAborted, base::Unretained(this)));
29 }
30
31 ServiceWorkerDataPipeReader::~ServiceWorkerDataPipeReader() {
32 DCHECK(streaming_version_);
33 streaming_version_->RemoveStreamingURLRequestJob(owner_);
34 streaming_version_ = nullptr;
35
36 TRACE_EVENT_ASYNC_END0("ServiceWorker", "ServiceWorkerDataPipeReader", this);
37 }
38
39 void ServiceWorkerDataPipeReader::Start() {
40 TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
41 this, "Start");
42 handle_watcher_.Watch(
43 stream_.get(),
44 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
45 base::Bind(&ServiceWorkerDataPipeReader::OnHandleGotSignal,
46 base::Unretained(this)));
47 owner_->OnResponseStarted();
48 }
49
50 void ServiceWorkerDataPipeReader::OnHandleGotSignal(MojoResult) {
51 TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
52 this, "OnHandleGotReadable");
53 // Do nothing if stream_pending_buffer_ is empty, i.e. there's no ReadRawData
54 // operation waiting for IO completion.
55 if (!stream_pending_buffer_)
56 return;
57
58 // If state() is not STREAMING, it means the data pipe was disconnected and
59 // OnCompleted/OnAborted has already been called.
60 if (state() != State::STREAMING) {
61 handle_watcher_.Cancel();
62 AsyncComplete();
63 }
64
65 // |stream_pending_buffer_| is set to the IOBuffer instance provided to
66 // ReadRawData() by URLRequestJob.
67 uint32_t size_to_pass = stream_pending_buffer_size_;
68 MojoResult mojo_result =
69 mojo::ReadDataRaw(stream_.get(), stream_pending_buffer_->data(),
70 &size_to_pass, MOJO_READ_DATA_FLAG_NONE);
71
72 switch (mojo_result) {
73 case MOJO_RESULT_OK:
74 stream_pending_buffer_ = nullptr;
75 stream_pending_buffer_size_ = 0;
76 owner_->OnReadRawDataComplete(size_to_pass);
77 return;
78 case MOJO_RESULT_FAILED_PRECONDITION:
79 stream_.reset();
80 handle_watcher_.Cancel();
81 // If OnCompleted/OnAborted has already been called, let this request
82 // complete.
83 if (state() != State::STREAMING)
84 AsyncComplete();
85 return;
86 case MOJO_RESULT_SHOULD_WAIT:
87 return;
88 case MOJO_RESULT_BUSY:
89 case MOJO_RESULT_INVALID_ARGUMENT:
90 stream_pending_buffer_ = nullptr;
91 stream_pending_buffer_size_ = 0;
92 stream_.reset();
93 handle_watcher_.Cancel();
94 owner_->OnReadRawDataComplete(net::ERR_FAILED);
95 return;
96 }
97 NOTREACHED();
98 }
99
100 int ServiceWorkerDataPipeReader::ReadRawData(net::IOBuffer* buf, int buf_size) {
101 TRACE_EVENT_ASYNC_STEP_INTO0("ServiceWorker", "ServiceWorkerDataPipeReader",
102 this, "ReadRawData");
103 DCHECK(!stream_pending_buffer_);
104 // If state() is not STREAMING, it means the data pipe was disconnected and
105 // OnCompleted/OnAborted has already been called.
106 if (state() != State::STREAMING)
107 return SyncComplete();
108
109 uint32_t size_to_pass = buf_size;
110 MojoResult mojo_result = mojo::ReadDataRaw(
111 stream_.get(), buf->data(), &size_to_pass, MOJO_READ_DATA_FLAG_NONE);
112 switch (mojo_result) {
113 case MOJO_RESULT_OK:
114 return size_to_pass;
115 case MOJO_RESULT_FAILED_PRECONDITION:
116 stream_.reset();
117 handle_watcher_.Cancel();
118 // Complete/Abort asynchronously if OnCompleted/OnAborted has not been
119 // called yet.
falken 2017/04/18 05:05:33 Where does this async completion happen? In OnHand
shimazu 2017/04/19 05:49:46 This state means |stream_| has already been discon
120 if (state() == State::STREAMING) {
121 stream_pending_buffer_ = buf;
122 stream_pending_buffer_size_ = buf_size;
123 return net::ERR_IO_PENDING;
124 }
125 return SyncComplete();
126 case MOJO_RESULT_SHOULD_WAIT:
127 stream_pending_buffer_ = buf;
128 stream_pending_buffer_size_ = buf_size;
129 return net::ERR_IO_PENDING;
130 case MOJO_RESULT_BUSY:
131 case MOJO_RESULT_INVALID_ARGUMENT:
132 return net::ERR_FAILED;
133 }
134 NOTREACHED();
135 return net::ERR_FAILED;
136 }
137
138 void ServiceWorkerDataPipeReader::OnCompleted() {
139 producer_state_ = State::COMPLETED;
140 if (stream_pending_buffer_ && state() != State::STREAMING)
141 AsyncComplete();
142 }
143
144 void ServiceWorkerDataPipeReader::OnAborted() {
145 producer_state_ = State::ABORTED;
146 if (stream_pending_buffer_ && state() != State::STREAMING)
147 AsyncComplete();
148 }
149
150 void ServiceWorkerDataPipeReader::AsyncComplete() {
151 // This works only after ReadRawData returns net::ERR_IO_PENDING.
152 DCHECK(stream_pending_buffer_);
153
154 switch (state()) {
155 case State::STREAMING:
156 NOTREACHED();
157 case State::COMPLETED:
158 stream_pending_buffer_ = nullptr;
159 stream_pending_buffer_size_ = 0;
160 handle_watcher_.Cancel();
161 owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE);
162 owner_->OnReadRawDataComplete(net::OK);
163 return;
164 case State::ABORTED:
165 stream_pending_buffer_ = nullptr;
166 stream_pending_buffer_size_ = 0;
167 handle_watcher_.Cancel();
168 owner_->RecordResult(
169 ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED);
170 owner_->OnReadRawDataComplete(net::ERR_CONNECTION_RESET);
171 return;
172 }
173 }
174
175 int ServiceWorkerDataPipeReader::SyncComplete() {
176 // This works only in ReadRawData.
177 DCHECK(!stream_pending_buffer_);
178
179 switch (state()) {
180 case State::STREAMING:
181 break;
182 case State::COMPLETED:
183 owner_->RecordResult(ServiceWorkerMetrics::REQUEST_JOB_STREAM_RESPONSE);
184 return net::OK;
185 case State::ABORTED:
186 owner_->RecordResult(
187 ServiceWorkerMetrics::REQUEST_JOB_ERROR_STREAM_ABORTED);
188 return net::ERR_CONNECTION_RESET;
189 }
190 NOTREACHED();
191 return net::ERR_FAILED;
192 }
193
194 ServiceWorkerDataPipeReader::State ServiceWorkerDataPipeReader::state() {
195 if (!stream_.is_valid())
196 return producer_state_;
197 return State::STREAMING;
198 }
199
200 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698