Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/FetchDataLoader.cpp

Issue 2703343002: ServiceWorker: Use mojo's data pipe for respondWith(stream) (Closed)
Patch Set: Used TEST_P to test closing the connection first and On{Aborted,Completed} first Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/FetchDataLoader.h" 5 #include "modules/fetch/FetchDataLoader.h"
6 6
7 #include <memory>
7 #include "core/html/parser/TextResourceDecoder.h" 8 #include "core/html/parser/TextResourceDecoder.h"
8 #include "modules/fetch/BytesConsumer.h" 9 #include "modules/fetch/BytesConsumer.h"
10 #include "mojo/public/cpp/system/simple_watcher.h"
11 #include "platform/instrumentation/tracing/TraceEvent.h"
12 #include "wtf/Functional.h"
9 #include "wtf/PtrUtil.h" 13 #include "wtf/PtrUtil.h"
10 #include "wtf/text/StringBuilder.h" 14 #include "wtf/text/StringBuilder.h"
11 #include "wtf/text/WTFString.h" 15 #include "wtf/text/WTFString.h"
12 #include "wtf/typed_arrays/ArrayBufferBuilder.h" 16 #include "wtf/typed_arrays/ArrayBufferBuilder.h"
13 #include <memory>
14 17
15 namespace blink { 18 namespace blink {
16 19
17 namespace { 20 namespace {
18 21
19 class FetchDataLoaderAsBlobHandle final : public FetchDataLoader, 22 class FetchDataLoaderAsBlobHandle final : public FetchDataLoader,
20 public BytesConsumer::Client { 23 public BytesConsumer::Client {
21 USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsBlobHandle); 24 USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsBlobHandle);
22 25
23 public: 26 public:
(...skipping 200 matching lines...) Expand 10 before | Expand all | Expand 10 after
224 } 227 }
225 228
226 private: 229 private:
227 Member<BytesConsumer> m_consumer; 230 Member<BytesConsumer> m_consumer;
228 Member<FetchDataLoader::Client> m_client; 231 Member<FetchDataLoader::Client> m_client;
229 232
230 std::unique_ptr<TextResourceDecoder> m_decoder; 233 std::unique_ptr<TextResourceDecoder> m_decoder;
231 StringBuilder m_builder; 234 StringBuilder m_builder;
232 }; 235 };
233 236
234 class FetchDataLoaderAsStream final : public FetchDataLoader, 237 class FetchDataLoaderAsDataPipe final : public FetchDataLoader,
235 public BytesConsumer::Client { 238 public BytesConsumer::Client {
236 USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsStream); 239 USING_GARBAGE_COLLECTED_MIXIN(FetchDataLoaderAsDataPipe);
237 240
238 public: 241 public:
239 explicit FetchDataLoaderAsStream(Stream* outStream) 242 explicit FetchDataLoaderAsDataPipe(
240 : m_outStream(outStream) {} 243 mojo::ScopedDataPipeProducerHandle outDataPipe)
241 244 : m_outDataPipe(std::move(outDataPipe)),
245 m_dataPipeWatcher(FROM_HERE,
246 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
247 TRACE_EVENT0("ServiceWorker",
248 "FetchDataLoaderAsDataPipe::FetchDataLoaderAsDataPipe");
249 }
250 ~FetchDataLoaderAsDataPipe() override {}
242 void start(BytesConsumer* consumer, 251 void start(BytesConsumer* consumer,
243 FetchDataLoader::Client* client) override { 252 FetchDataLoader::Client* client) override {
253 TRACE_EVENT_NESTABLE_ASYNC_BEGIN0("ServiceWorker",
254 "FetchDataLoaderAsDataPipe", this);
244 DCHECK(!m_client); 255 DCHECK(!m_client);
245 DCHECK(!m_consumer); 256 DCHECK(!m_consumer);
257 m_dataPipeWatcher.Watch(
258 m_outDataPipe.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
259 convertToBaseCallback(WTF::bind(&FetchDataLoaderAsDataPipe::onWritable,
260 wrapWeakPersistent(this))));
246 m_client = client; 261 m_client = client;
247 m_consumer = consumer; 262 m_consumer = consumer;
248 m_consumer->setClient(this); 263 m_consumer->setClient(this);
264 }
265
266 void onWritable(MojoResult result) {
267 if (result != MOJO_RESULT_OK)
268 cancel();
249 onStateChange(); 269 onStateChange();
250 } 270 }
251 271
272 // Implements BytesConsumer::Client.
252 void onStateChange() override { 273 void onStateChange() override {
253 bool needToFlush = false; 274 bool shouldWait = false;
254 while (true) { 275 while (!shouldWait) {
255 const char* buffer; 276 const char* buffer;
256 size_t available; 277 size_t available;
257 auto result = m_consumer->beginRead(&buffer, &available); 278 auto result = m_consumer->beginRead(&buffer, &available);
258 if (result == BytesConsumer::Result::ShouldWait) { 279 if (result == BytesConsumer::Result::ShouldWait)
259 if (needToFlush)
260 m_outStream->flush();
261 return; 280 return;
262 }
263 if (result == BytesConsumer::Result::Ok) { 281 if (result == BytesConsumer::Result::Ok) {
264 m_outStream->addData(buffer, available); 282 DCHECK_GT(available, 0UL);
265 needToFlush = true; 283 uint32_t numBytes = available;
266 result = m_consumer->endRead(available); 284 MojoResult mojoResult = mojo::WriteDataRaw(
285 m_outDataPipe.get(), buffer, &numBytes, MOJO_WRITE_DATA_FLAG_NONE);
286 if (mojoResult == MOJO_RESULT_OK) {
287 TRACE_EVENT_NESTABLE_ASYNC_INSTANT2(
288 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
289 "MOJO_RESULT_OK", "bytes", numBytes);
290 result = m_consumer->endRead(numBytes);
291 } else if (mojoResult == MOJO_RESULT_SHOULD_WAIT) {
292 TRACE_EVENT_NESTABLE_ASYNC_INSTANT1(
293 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
294 "MOJO_RESULT_SHOULD_WAIT");
295 result = m_consumer->endRead(0);
296 shouldWait = true;
297 } else {
298 TRACE_EVENT_NESTABLE_ASYNC_END1("ServiceWorker",
299 "FetchDataLoaderAsDataPipe", this,
300 "MojoResult", mojoResult);
301 result = m_consumer->endRead(0);
302 cancel();
303 m_client->didFetchDataLoadFailed();
304 return;
305 }
267 } 306 }
268 switch (result) { 307 switch (result) {
269 case BytesConsumer::Result::Ok: 308 case BytesConsumer::Result::Ok:
270 break; 309 break;
271 case BytesConsumer::Result::ShouldWait: 310 case BytesConsumer::Result::ShouldWait:
272 NOTREACHED(); 311 NOTREACHED();
273 return; 312 return;
274 case BytesConsumer::Result::Done: 313 case BytesConsumer::Result::Done:
275 if (needToFlush) 314 TRACE_EVENT_NESTABLE_ASYNC_END1(
276 m_outStream->flush(); 315 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
277 m_outStream->finalize(); 316 "BytesConsumer::Result::Done");
278 m_client->didFetchDataLoadedStream(); 317 cancel();
318 m_client->didFetchDataLoadedDataPipe();
279 return; 319 return;
280 case BytesConsumer::Result::Error: 320 case BytesConsumer::Result::Error:
281 // If the stream is aborted soon after the stream is registered 321 TRACE_EVENT_NESTABLE_ASYNC_END1(
282 // to the StreamRegistry, ServiceWorkerURLRequestJob may not 322 "ServiceWorker", "FetchDataLoaderAsDataPipe", this, "state",
283 // notice the error and continue waiting forever. 323 "BytesConsumer::Result::Error");
284 // TODO(yhirano): Add new message to report the error to the 324 // TODO(shimazu): Some mechanism to notify error state is needed.
285 // browser process. 325 cancel();
286 m_outStream->abort();
287 m_client->didFetchDataLoadFailed(); 326 m_client->didFetchDataLoadFailed();
288 return; 327 return;
289 } 328 }
290 } 329 }
291 } 330 }
292 331
293 void cancel() override { m_consumer->cancel(); } 332 void cancel() override {
333 m_consumer->cancel();
334 m_dataPipeWatcher.Cancel();
335 m_outDataPipe.reset();
336 }
294 337
295 DEFINE_INLINE_TRACE() { 338 DEFINE_INLINE_TRACE() {
296 visitor->trace(m_consumer); 339 visitor->trace(m_consumer);
297 visitor->trace(m_client); 340 visitor->trace(m_client);
298 visitor->trace(m_outStream);
299 FetchDataLoader::trace(visitor); 341 FetchDataLoader::trace(visitor);
300 BytesConsumer::Client::trace(visitor); 342 BytesConsumer::Client::trace(visitor);
301 } 343 }
302 344
345 private:
303 Member<BytesConsumer> m_consumer; 346 Member<BytesConsumer> m_consumer;
304 Member<FetchDataLoader::Client> m_client; 347 Member<FetchDataLoader::Client> m_client;
305 Member<Stream> m_outStream; 348
349 mojo::ScopedDataPipeProducerHandle m_outDataPipe;
350 mojo::SimpleWatcher m_dataPipeWatcher;
306 }; 351 };
307 352
308 } // namespace 353 } // namespace
309 354
310 FetchDataLoader* FetchDataLoader::createLoaderAsBlobHandle( 355 FetchDataLoader* FetchDataLoader::createLoaderAsBlobHandle(
311 const String& mimeType) { 356 const String& mimeType) {
312 return new FetchDataLoaderAsBlobHandle(mimeType); 357 return new FetchDataLoaderAsBlobHandle(mimeType);
313 } 358 }
314 359
315 FetchDataLoader* FetchDataLoader::createLoaderAsArrayBuffer() { 360 FetchDataLoader* FetchDataLoader::createLoaderAsArrayBuffer() {
316 return new FetchDataLoaderAsArrayBuffer(); 361 return new FetchDataLoaderAsArrayBuffer();
317 } 362 }
318 363
319 FetchDataLoader* FetchDataLoader::createLoaderAsString() { 364 FetchDataLoader* FetchDataLoader::createLoaderAsString() {
320 return new FetchDataLoaderAsString(); 365 return new FetchDataLoaderAsString();
321 } 366 }
322 367
323 FetchDataLoader* FetchDataLoader::createLoaderAsStream(Stream* outStream) { 368 FetchDataLoader* FetchDataLoader::createLoaderAsDataPipe(
324 return new FetchDataLoaderAsStream(outStream); 369 mojo::ScopedDataPipeProducerHandle outDataPipe) {
370 return new FetchDataLoaderAsDataPipe(std::move(outDataPipe));
325 } 371 }
326 372
327 } // namespace blink 373 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698