Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp

Issue 2703343002: ServiceWorker: Use mojo's data pipe for respondWith(stream) (Closed)
Patch Set: IFRAME_URL -> SCOPE Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/FetchDataLoader.h" 5 #include "modules/fetch/FetchDataLoader.h"
6 6
7 #include <memory> 7 #include <memory>
8 #include "core/html/parser/TextResourceDecoder.h" 8 #include "core/html/parser/TextResourceDecoder.h"
9 #include "modules/fetch/BytesConsumer.h" 9 #include "modules/fetch/BytesConsumer.h"
10 #include "mojo/public/cpp/system/simple_watcher.h"
11 #include "platform/instrumentation/tracing/TraceEvent.h"
12 #include "platform/wtf/Functional.h"
10 #include "platform/wtf/PtrUtil.h" 13 #include "platform/wtf/PtrUtil.h"
11 #include "platform/wtf/text/StringBuilder.h" 14 #include "platform/wtf/text/StringBuilder.h"
12 #include "platform/wtf/text/WTFString.h" 15 #include "platform/wtf/text/WTFString.h"
13 #include "platform/wtf/typed_arrays/ArrayBufferBuilder.h" 16 #include "platform/wtf/typed_arrays/ArrayBufferBuilder.h"
14 17
15 namespace blink { 18 namespace blink {
16 19
17 namespace { 20 namespace {
18 21
19 class FetchDataLoaderAsBlobHandle final : public FetchDataLoader, 22 class FetchDataLoaderAsBlobHandle final : public FetchDataLoader,
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after
224 } 227 }
225 228
226 private: 229 private:
227 Member<BytesConsumer> consumer_; 230 Member<BytesConsumer> consumer_;
228 Member<FetchDataLoader::Client> client_; 231 Member<FetchDataLoader::Client> client_;
229 232
230 std::unique_ptr<TextResourceDecoder> decoder_; 233 std::unique_ptr<TextResourceDecoder> decoder_;
231 StringBuilder builder_; 234 StringBuilder builder_;
232 }; 235 };
233 236
234 class FetchDataLoaderAsStream final : public FetchDataLoader, 237 class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
235 public BytesConsumer::Client { 238 public BytesConsumer::Client {
236 USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsStream); 239 USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsDataPipe);
237 240
238 public: 241 public:
239 explicit FetchDataLoaderAsStream(Stream* out_stream) 242 explicit FetchDataLoaderAsDataPipe(
240 : out_stream_(out_stream) {} 243 mojo::ScopedDataPipeProducerHandle out_datapipe)
244 : out_datapipe_(std::move(out_datapipe)),
245 datapipe_watcher_(FROM_HERE,
246 mojo::SimpleWatcher::ArmingPolicy::MANUAL) {
247 TRACE_EVENT0("ServiceWorker",
248 "FetchDataLoaderAsDataPipe::FetchDataLoaderAsDataPipe");
249 }
250 ~FetchDataLoaderAsDataPipe() override {}
241 251
242 void Start(BytesConsumer* consumer, 252 void Start(BytesConsumer* consumer,
243 FetchDataLoader::Client* client) override { 253 FetchDataLoader::Client* client) override {
254 TRACE_EVENT_NESTABLE_ASYNC_BEGIN0("ServiceWorker",
255 "FetchDataLoaderAsDataPipe", this);
244 DCHECK(!client_); 256 DCHECK(!client_);
245 DCHECK(!consumer_); 257 DCHECK(!consumer_);
258 datapipe_watcher_.Watch(
259 out_datapipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
260 ConvertToBaseCallback(WTF::Bind(&FetchDataLoaderAsDataPipe::OnWritable,
261 WrapWeakPersistent(this))));
246 client_ = client; 262 client_ = client;
247 consumer_ = consumer; 263 consumer_ = consumer;
248 consumer_->SetClient(this); 264 consumer_->SetClient(this);
249 OnStateChange();
250 } 265 }
251 266
267 void OnWritable(MojoResult) { OnStateChange(); }
268
269 // Implements BytesConsumer::Client.
252 void OnStateChange() override { 270 void OnStateChange() override {
253 bool need_to_flush = false; 271 bool should_wait = false;
254 while (true) { 272 while (!should_wait) {
255 const char* buffer; 273 const char* buffer;
256 size_t available; 274 size_t available;
257 auto result = consumer_->BeginRead(&buffer, &available); 275 auto result = consumer_->BeginRead(&buffer, &available);
258 if (result == BytesConsumer::Result::kShouldWait) { 276 if (result == BytesConsumer::Result::kShouldWait)
259 if (need_to_flush)
260 out_stream_->Flush();
261 return; 277 return;
262 }
263 if (result == BytesConsumer::Result::kOk) { 278 if (result == BytesConsumer::Result::kOk) {
264 out_stream_->AddData(buffer, available); 279 DCHECK_GT(available, 0UL);
265 need_to_flush = true; 280 uint32_t num_bytes = available;
266 result = consumer_->EndRead(available); 281 MojoResult mojo_result = mojo::WriteDataRaw(
282 out_datapipe_.get(), buffer, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
283 if (mojo_result == MOJO_RESULT_OK) {
284 TRACE_EVENT_NESTABLE_ASYNC_INSTANT2(
285 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
286 "MOJO_RESULT_OK", "bytes", num_bytes);
287 result = consumer_->EndRead(num_bytes);
288 } else if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
289 TRACE_EVENT_NESTABLE_ASYNC_INSTANT1(
290 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
291 "MOJO_RESULT_SHOULD_WAIT");
292 result = consumer_->EndRead(0);
293 should_wait = true;
294 datapipe_watcher_.ArmOrNotify();
295 } else {
296 TRACE_EVENT_NESTABLE_ASYNC_END1("ServiceWorker",
297 "FetchDataLoaderAsDataPipe", this,
298 "MojoResult", mojo_result);
299 result = consumer_->EndRead(0);
300 StopInternal();
301 client_->DidFetchDataLoadFailed();
302 return;
303 }
267 } 304 }
268 switch (result) { 305 switch (result) {
269 case BytesConsumer::Result::kOk: 306 case BytesConsumer::Result::kOk:
270 break; 307 break;
271 case BytesConsumer::Result::kShouldWait: 308 case BytesConsumer::Result::kShouldWait:
272 NOTREACHED(); 309 NOTREACHED();
273 return; 310 return;
274 case BytesConsumer::Result::kDone: 311 case BytesConsumer::Result::kDone:
275 if (need_to_flush) 312 TRACE_EVENT_NESTABLE_ASYNC_END1(
276 out_stream_->Flush(); 313 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
277 out_stream_->Finalize(); 314 "BytesConsumer::Result::Done");
278 client_->DidFetchDataLoadedStream(); 315 StopInternal();
316 client_->DidFetchDataLoadedDataPipe();
279 return; 317 return;
280 case BytesConsumer::Result::kError: 318 case BytesConsumer::Result::kError:
281 // If the stream is aborted soon after the stream is registered 319 TRACE_EVENT_NESTABLE_ASYNC_END1(
282 // to the StreamRegistry, ServiceWorkerURLRequestJob may not 320 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
283 // notice the error and continue waiting forever. 321 "BytesConsumer::Result::Error");
284 // TODO(yhirano): Add new message to report the error to the 322 StopInternal();
285 // browser process.
286 out_stream_->Abort();
287 client_->DidFetchDataLoadFailed(); 323 client_->DidFetchDataLoadFailed();
288 return; 324 return;
289 } 325 }
290 } 326 }
291 } 327 }
292 328
293 void Cancel() override { consumer_->Cancel(); } 329 void Cancel() override { StopInternal(); }
294 330
295 DEFINE_INLINE_TRACE() { 331 DEFINE_INLINE_TRACE() {
296 visitor->Trace(consumer_); 332 visitor->Trace(consumer_);
297 visitor->Trace(client_); 333 visitor->Trace(client_);
298 visitor->Trace(out_stream_);
299 FetchDataLoader::Trace(visitor); 334 FetchDataLoader::Trace(visitor);
300 BytesConsumer::Client::Trace(visitor); 335 BytesConsumer::Client::Trace(visitor);
301 } 336 }
302 337
338 private:
339 void StopInternal() {
340 consumer_->Cancel();
341 datapipe_watcher_.Cancel();
342 out_datapipe_.reset();
343 }
344
303 Member<BytesConsumer> consumer_; 345 Member<BytesConsumer> consumer_;
304 Member<FetchDataLoader::Client> client_; 346 Member<FetchDataLoader::Client> client_;
305 Member<Stream> out_stream_; 347
348 mojo::ScopedDataPipeProducerHandle out_datapipe_;
349 mojo::SimpleWatcher datapipe_watcher_;
falken 2017/04/12 08:00:16 nit: since the CamelCase version would be DataPipe
shimazu 2017/04/18 01:50:05 Done.
306 }; 350 };
307 351
308 } // namespace 352 } // namespace
309 353
310 FetchDataLoader* FetchDataLoader::CreateLoaderAsBlobHandle( 354 FetchDataLoader* FetchDataLoader::CreateLoaderAsBlobHandle(
311 const String& mime_type) { 355 const String& mime_type) {
312 return new FetchDataLoaderAsBlobHandle(mime_type); 356 return new FetchDataLoaderAsBlobHandle(mime_type);
313 } 357 }
314 358
315 FetchDataLoader* FetchDataLoader::CreateLoaderAsArrayBuffer() { 359 FetchDataLoader* FetchDataLoader::CreateLoaderAsArrayBuffer() {
316 return new FetchDataLoaderAsArrayBuffer(); 360 return new FetchDataLoaderAsArrayBuffer();
317 } 361 }
318 362
319 FetchDataLoader* FetchDataLoader::CreateLoaderAsString() { 363 FetchDataLoader* FetchDataLoader::CreateLoaderAsString() {
320 return new FetchDataLoaderAsString(); 364 return new FetchDataLoaderAsString();
321 } 365 }
322 366
323 FetchDataLoader* FetchDataLoader::CreateLoaderAsStream(Stream* out_stream) { 367 FetchDataLoader* FetchDataLoader::CreateLoaderAsDataPipe(
324 return new FetchDataLoaderAsStream(out_stream); 368 mojo::ScopedDataPipeProducerHandle out_datapipe) {
369 return new FetchDataLoaderAsDataPipe(std::move(out_datapipe));
325 } 370 }
326 371
327 } // namespace blink 372 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698