Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(12)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp

Issue 2703343002: ServiceWorker: Use mojo's data pipe for respondWith(stream) (Closed)
Patch Set: Update comments Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/FetchDataLoader.h" 5 #include "modules/fetch/FetchDataLoader.h"
6 6
7 #include <memory> 7 #include <memory>
8 #include "core/html/parser/TextResourceDecoder.h" 8 #include "core/html/parser/TextResourceDecoder.h"
9 #include "modules/fetch/BytesConsumer.h" 9 #include "modules/fetch/BytesConsumer.h"
10 #include "mojo/public/cpp/system/simple_watcher.h"
11 #include "platform/instrumentation/tracing/TraceEvent.h"
12 #include "platform/wtf/Functional.h"
10 #include "platform/wtf/PtrUtil.h" 13 #include "platform/wtf/PtrUtil.h"
11 #include "platform/wtf/text/StringBuilder.h" 14 #include "platform/wtf/text/StringBuilder.h"
12 #include "platform/wtf/text/WTFString.h" 15 #include "platform/wtf/text/WTFString.h"
13 #include "platform/wtf/typed_arrays/ArrayBufferBuilder.h" 16 #include "platform/wtf/typed_arrays/ArrayBufferBuilder.h"
14 17
15 namespace blink { 18 namespace blink {
16 19
17 namespace { 20 namespace {
18 21
19 class FetchDataLoaderAsBlobHandle final : public FetchDataLoader, 22 class FetchDataLoaderAsBlobHandle final : public FetchDataLoader,
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after
224 } 227 }
225 228
226 private: 229 private:
227 Member<BytesConsumer> consumer_; 230 Member<BytesConsumer> consumer_;
228 Member<FetchDataLoader::Client> client_; 231 Member<FetchDataLoader::Client> client_;
229 232
230 std::unique_ptr<TextResourceDecoder> decoder_; 233 std::unique_ptr<TextResourceDecoder> decoder_;
231 StringBuilder builder_; 234 StringBuilder builder_;
232 }; 235 };
233 236
234 class FetchDataLoaderAsStream final : public FetchDataLoader, 237 class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
235 public BytesConsumer::Client { 238 public BytesConsumer::Client {
236 USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsStream); 239 USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsDataPipe);
237 240
238 public: 241 public:
239 explicit FetchDataLoaderAsStream(Stream* out_stream) 242 explicit FetchDataLoaderAsDataPipe(
240 : out_stream_(out_stream) {} 243 mojo::ScopedDataPipeProducerHandle out_data_pipe)
244 : out_data_pipe_(std::move(out_data_pipe)),
245 data_pipe_watcher_(FROM_HERE,
246 mojo::SimpleWatcher::ArmingPolicy::MANUAL) {
247 TRACE_EVENT0("ServiceWorker",
248 "FetchDataLoaderAsDataPipe::FetchDataLoaderAsDataPipe");
haraken 2017/04/19 11:29:10 Is this trace event useful?
shimazu 2017/04/20 04:20:42 Thanks! It's for debug. Removed.
249 }
250 ~FetchDataLoaderAsDataPipe() override {}
241 251
242 void Start(BytesConsumer* consumer, 252 void Start(BytesConsumer* consumer,
243 FetchDataLoader::Client* client) override { 253 FetchDataLoader::Client* client) override {
254 TRACE_EVENT_NESTABLE_ASYNC_BEGIN0("ServiceWorker",
255 "FetchDataLoaderAsDataPipe", this);
244 DCHECK(!client_); 256 DCHECK(!client_);
245 DCHECK(!consumer_); 257 DCHECK(!consumer_);
258 data_pipe_watcher_.Watch(
259 out_data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
260 ConvertToBaseCallback(WTF::Bind(&FetchDataLoaderAsDataPipe::OnWritable,
261 WrapWeakPersistent(this))));
262 data_pipe_watcher_.ArmOrNotify();
246 client_ = client; 263 client_ = client;
247 consumer_ = consumer; 264 consumer_ = consumer;
248 consumer_->SetClient(this); 265 consumer_->SetClient(this);
249 OnStateChange();
250 } 266 }
251 267
268 void OnWritable(MojoResult) { OnStateChange(); }
269
270 // Implements BytesConsumer::Client.
252 void OnStateChange() override { 271 void OnStateChange() override {
253 bool need_to_flush = false; 272 bool should_wait = false;
254 while (true) { 273 while (!should_wait) {
255 const char* buffer; 274 const char* buffer;
256 size_t available; 275 size_t available;
257 auto result = consumer_->BeginRead(&buffer, &available); 276 auto result = consumer_->BeginRead(&buffer, &available);
258 if (result == BytesConsumer::Result::kShouldWait) { 277 if (result == BytesConsumer::Result::kShouldWait)
259 if (need_to_flush)
260 out_stream_->Flush();
261 return; 278 return;
262 }
263 if (result == BytesConsumer::Result::kOk) { 279 if (result == BytesConsumer::Result::kOk) {
264 out_stream_->AddData(buffer, available); 280 DCHECK_GT(available, 0UL);
265 need_to_flush = true; 281 uint32_t num_bytes = available;
266 result = consumer_->EndRead(available); 282 MojoResult mojo_result =
283 mojo::WriteDataRaw(out_data_pipe_.get(), buffer, &num_bytes,
284 MOJO_WRITE_DATA_FLAG_NONE);
285 if (mojo_result == MOJO_RESULT_OK) {
286 TRACE_EVENT_NESTABLE_ASYNC_INSTANT2(
287 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
288 "MOJO_RESULT_OK", "bytes", num_bytes);
289 result = consumer_->EndRead(num_bytes);
290 } else if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
291 TRACE_EVENT_NESTABLE_ASYNC_INSTANT1(
292 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
293 "MOJO_RESULT_SHOULD_WAIT");
294 result = consumer_->EndRead(0);
295 should_wait = true;
296 data_pipe_watcher_.ArmOrNotify();
297 } else {
298 TRACE_EVENT_NESTABLE_ASYNC_END1("ServiceWorker",
299 "FetchDataLoaderAsDataPipe", this,
300 "MojoResult", mojo_result);
301 result = consumer_->EndRead(0);
302 StopInternal();
303 client_->DidFetchDataLoadFailed();
304 return;
305 }
267 } 306 }
268 switch (result) { 307 switch (result) {
269 case BytesConsumer::Result::kOk: 308 case BytesConsumer::Result::kOk:
270 break; 309 break;
271 case BytesConsumer::Result::kShouldWait: 310 case BytesConsumer::Result::kShouldWait:
272 NOTREACHED(); 311 NOTREACHED();
273 return; 312 return;
274 case BytesConsumer::Result::kDone: 313 case BytesConsumer::Result::kDone:
275 if (need_to_flush) 314 TRACE_EVENT_NESTABLE_ASYNC_END1(
276 out_stream_->Flush(); 315 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
277 out_stream_->Finalize(); 316 "BytesConsumer::Result::Done");
278 client_->DidFetchDataLoadedStream(); 317 StopInternal();
318 client_->DidFetchDataLoadedDataPipe();
279 return; 319 return;
280 case BytesConsumer::Result::kError: 320 case BytesConsumer::Result::kError:
281 // If the stream is aborted soon after the stream is registered 321 TRACE_EVENT_NESTABLE_ASYNC_END1(
282 // to the StreamRegistry, ServiceWorkerURLRequestJob may not 322 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
283 // notice the error and continue waiting forever. 323 "BytesConsumer::Result::Error");
284 // TODO(yhirano): Add new message to report the error to the 324 StopInternal();
285 // browser process.
286 out_stream_->Abort();
287 client_->DidFetchDataLoadFailed(); 325 client_->DidFetchDataLoadFailed();
288 return; 326 return;
289 } 327 }
290 } 328 }
291 } 329 }
292 330
293 void Cancel() override { consumer_->Cancel(); } 331 void Cancel() override { StopInternal(); }
294 332
295 DEFINE_INLINE_TRACE() { 333 DEFINE_INLINE_TRACE() {
296 visitor->Trace(consumer_); 334 visitor->Trace(consumer_);
297 visitor->Trace(client_); 335 visitor->Trace(client_);
298 visitor->Trace(out_stream_);
299 FetchDataLoader::Trace(visitor); 336 FetchDataLoader::Trace(visitor);
300 BytesConsumer::Client::Trace(visitor); 337 BytesConsumer::Client::Trace(visitor);
301 } 338 }
302 339
340 private:
341 void StopInternal() {
342 consumer_->Cancel();
343 data_pipe_watcher_.Cancel();
344 out_data_pipe_.reset();
345 }
346
303 Member<BytesConsumer> consumer_; 347 Member<BytesConsumer> consumer_;
304 Member<FetchDataLoader::Client> client_; 348 Member<FetchDataLoader::Client> client_;
305 Member<Stream> out_stream_; 349
350 mojo::ScopedDataPipeProducerHandle out_data_pipe_;
351 mojo::SimpleWatcher data_pipe_watcher_;
306 }; 352 };
307 353
308 } // namespace 354 } // namespace
309 355
310 FetchDataLoader* FetchDataLoader::CreateLoaderAsBlobHandle( 356 FetchDataLoader* FetchDataLoader::CreateLoaderAsBlobHandle(
311 const String& mime_type) { 357 const String& mime_type) {
312 return new FetchDataLoaderAsBlobHandle(mime_type); 358 return new FetchDataLoaderAsBlobHandle(mime_type);
313 } 359 }
314 360
315 FetchDataLoader* FetchDataLoader::CreateLoaderAsArrayBuffer() { 361 FetchDataLoader* FetchDataLoader::CreateLoaderAsArrayBuffer() {
316 return new FetchDataLoaderAsArrayBuffer(); 362 return new FetchDataLoaderAsArrayBuffer();
317 } 363 }
318 364
319 FetchDataLoader* FetchDataLoader::CreateLoaderAsString() { 365 FetchDataLoader* FetchDataLoader::CreateLoaderAsString() {
320 return new FetchDataLoaderAsString(); 366 return new FetchDataLoaderAsString();
321 } 367 }
322 368
323 FetchDataLoader* FetchDataLoader::CreateLoaderAsStream(Stream* out_stream) { 369 FetchDataLoader* FetchDataLoader::CreateLoaderAsDataPipe(
324 return new FetchDataLoaderAsStream(out_stream); 370 mojo::ScopedDataPipeProducerHandle out_data_pipe) {
371 return new FetchDataLoaderAsDataPipe(std::move(out_data_pipe));
325 } 372 }
326 373
327 } // namespace blink 374 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698