Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(104)

Side by Side Diff: Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp

Issue 265713004: Cleanup WorkerThreadableWebSocketChannel's logic to wait for the main thread (Closed) Base URL: svn://svn.chromium.org/blink/trunk
Patch Set: Addressed #2 Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « Source/modules/websockets/WorkerThreadableWebSocketChannel.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved. 2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
3 * 3 *
4 * Redistribution and use in source and binary forms, with or without 4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are 5 * modification, are permitted provided that the following conditions are
6 * met: 6 * met:
7 * 7 *
8 * * Redistributions of source code must retain the above copyright 8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer. 9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above 10 * * Redistributions in binary form must reproduce the above
(...skipping 19 matching lines...) Expand all
30 30
31 #include "config.h" 31 #include "config.h"
32 32
33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h" 33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h"
34 34
35 #include "RuntimeEnabledFeatures.h" 35 #include "RuntimeEnabledFeatures.h"
36 #include "bindings/v8/ScriptCallStackFactory.h" 36 #include "bindings/v8/ScriptCallStackFactory.h"
37 #include "core/dom/CrossThreadTask.h" 37 #include "core/dom/CrossThreadTask.h"
38 #include "core/dom/Document.h" 38 #include "core/dom/Document.h"
39 #include "core/dom/ExecutionContext.h" 39 #include "core/dom/ExecutionContext.h"
40 #include "core/dom/ExecutionContextTask.h"
41 #include "core/fileapi/Blob.h" 40 #include "core/fileapi/Blob.h"
42 #include "core/inspector/ScriptCallFrame.h" 41 #include "core/inspector/ScriptCallFrame.h"
43 #include "core/inspector/ScriptCallStack.h" 42 #include "core/inspector/ScriptCallStack.h"
44 #include "core/workers/WorkerLoaderProxy.h" 43 #include "core/workers/WorkerLoaderProxy.h"
45 #include "core/workers/WorkerRunLoop.h" 44 #include "core/workers/WorkerRunLoop.h"
46 #include "core/workers/WorkerThread.h" 45 #include "core/workers/WorkerThread.h"
47 #include "modules/websockets/MainThreadWebSocketChannel.h" 46 #include "modules/websockets/MainThreadWebSocketChannel.h"
48 #include "modules/websockets/NewWebSocketChannelImpl.h" 47 #include "modules/websockets/NewWebSocketChannelImpl.h"
49 #include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h" 48 #include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h"
50 #include "public/platform/Platform.h" 49 #include "public/platform/Platform.h"
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
113 , m_bufferedAmount(0) 112 , m_bufferedAmount(0)
114 { 113 {
115 } 114 }
116 115
117 OwnPtr<blink::WebWaitableEvent> m_event; 116 OwnPtr<blink::WebWaitableEvent> m_event;
118 bool m_connectRequestResult; 117 bool m_connectRequestResult;
119 WebSocketChannel::SendResult m_sendRequestResult; 118 WebSocketChannel::SendResult m_sendRequestResult;
120 unsigned long m_bufferedAmount; 119 unsigned long m_bufferedAmount;
121 }; 120 };
122 121
123 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber) 122 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL , unsigned lineNumber)
124 : m_workerGlobalScope(context) 123 : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(clie nt))
125 , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(clie nt)) 124 , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
126 , m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope))
127 , m_sourceURLAtConnection(sourceURL) 125 , m_sourceURLAtConnection(sourceURL)
128 , m_lineNumberAtConnection(lineNumber) 126 , m_lineNumberAtConnection(lineNumber)
129 { 127 {
130 ASSERT(m_workerClientWrapper.get()); 128 ASSERT(m_workerClientWrapper.get());
131 m_bridge->initialize(sourceURL, lineNumber); 129 m_bridge->initialize(sourceURL, lineNumber);
132 } 130 }
133 131
134 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel() 132 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
135 { 133 {
136 if (m_bridge) 134 if (m_bridge)
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
222 m_bridge->suspend(); 220 m_bridge->suspend();
223 } 221 }
224 222
225 void WorkerThreadableWebSocketChannel::resume() 223 void WorkerThreadableWebSocketChannel::resume()
226 { 224 {
227 m_workerClientWrapper->resume(); 225 m_workerClientWrapper->resume();
228 if (m_bridge) 226 if (m_bridge)
229 m_bridge->resume(); 227 m_bridge->resume();
230 } 228 }
231 229
232 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
233 {
234 visitor->trace(m_workerGlobalScope);
235 WebSocketChannel::trace(visitor);
236 }
237
238 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<WeakReference<Peer> > re ference, PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, Work erLoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, unsigned lineNumber, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper ) 230 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<WeakReference<Peer> > re ference, PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, Work erLoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, unsigned lineNumber, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper )
239 : m_workerClientWrapper(clientWrapper) 231 : m_workerClientWrapper(clientWrapper)
240 , m_loaderProxy(loaderProxy) 232 , m_loaderProxy(loaderProxy)
241 , m_mainWebSocketChannel(nullptr) 233 , m_mainWebSocketChannel(nullptr)
242 , m_syncHelper(syncHelper) 234 , m_syncHelper(syncHelper)
243 , m_weakFactory(reference, this) 235 , m_weakFactory(reference, this)
244 { 236 {
245 ASSERT(isMainThread()); 237 ASSERT(isMainThread());
246 ASSERT(m_workerClientWrapper.get()); 238 ASSERT(m_workerClientWrapper.get());
247 239
(...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after
456 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 448 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
457 workerClientWrapper->didReceiveMessageError(); 449 workerClientWrapper->didReceiveMessageError();
458 } 450 }
459 451
460 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError() 452 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
461 { 453 {
462 ASSERT(isMainThread()); 454 ASSERT(isMainThread());
463 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidReceiveMessageError, m_workerClientWrapper)); 455 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidReceiveMessageError, m_workerClientWrapper));
464 } 456 }
465 457
466 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketC hannelClientWrapper> workerClientWrapper, PassRefPtrWillBeRawPtr<WorkerGlobalSco pe> workerGlobalScope) 458 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketC hannelClientWrapper> workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
467 : m_workerClientWrapper(workerClientWrapper) 459 : m_workerClientWrapper(workerClientWrapper)
468 , m_workerGlobalScope(workerGlobalScope) 460 , m_workerGlobalScope(workerGlobalScope)
469 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy()) 461 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
470 , m_syncHelper(0) 462 , m_syncHelper(0)
471 { 463 {
472 ASSERT(m_workerClientWrapper.get()); 464 ASSERT(m_workerClientWrapper.get());
473 } 465 }
474 466
475 WorkerThreadableWebSocketChannel::Bridge::~Bridge() 467 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
476 { 468 {
477 disconnect(); 469 disconnect();
478 } 470 }
479 471
480 void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceUR L, unsigned lineNumber) 472 void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceUR L, unsigned lineNumber)
481 { 473 {
482 RefPtr<WeakReference<Peer> > reference = WeakReference<Peer>::createUnbound( ); 474 RefPtr<WeakReference<Peer> > reference = WeakReference<Peer>::createUnbound( );
483 m_peer = WeakPtr<Peer>(reference); 475 m_peer = WeakPtr<Peer>(reference);
484 476
485 OwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = ThreadableWebSocke tChannelSyncHelper::create(adoptPtr(blink::Platform::current()->createWaitableEv ent())); 477 OwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = ThreadableWebSocke tChannelSyncHelper::create(adoptPtr(blink::Platform::current()->createWaitableEv ent()));
486 // This pointer is guaranteed to be valid until we call terminatePeer. 478 // This pointer is guaranteed to be valid until we call terminatePeer.
487 m_syncHelper = syncHelper.get(); 479 m_syncHelper = syncHelper.get();
488 480
489 RefPtr<Bridge> protect(this); 481 RefPtr<Bridge> protect(this);
490 m_loaderProxy.postTaskToLoader(createCallbackTask(&Peer::initialize, referen ce.release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, sou rceURL, lineNumber, syncHelper.release())); 482 if (!waitForMethodCompletion(createCallbackTask(&Peer::initialize, reference .release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, sourc eURL, lineNumber, syncHelper.release()))) {
491 if (!waitForMethodCompletion()) {
492 // The worker thread has been signalled to shutdown before method comple tion. 483 // The worker thread has been signalled to shutdown before method comple tion.
493 terminatePeer(); 484 terminatePeer();
494 } 485 }
495 } 486 }
496 487
497 bool WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const St ring& protocol) 488 bool WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const St ring& protocol)
498 { 489 {
499 if (!m_workerGlobalScope) 490 if (hasTerminatedPeer())
500 return false; 491 return false;
501 ASSERT(m_syncHelper); 492
502 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::connect, m_peer, url.copy(), protocol.isolatedCopy())));
503 RefPtr<Bridge> protect(this); 493 RefPtr<Bridge> protect(this);
504 waitForMethodCompletion(); 494 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::connect, m_ peer, url.copy(), protocol.isolatedCopy()))))
495 return false;
496
505 return m_syncHelper->connectRequestResult(); 497 return m_syncHelper->connectRequestResult();
506 } 498 }
507 499
508 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(cons t String& message) 500 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(cons t String& message)
509 { 501 {
510 if (!m_workerGlobalScope) 502 if (hasTerminatedPeer())
511 return WebSocketChannel::SendFail; 503 return WebSocketChannel::SendFail;
512 ASSERT(m_syncHelper); 504
513 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::send, m_p eer, message.isolatedCopy())));
514 RefPtr<Bridge> protect(this); 505 RefPtr<Bridge> protect(this);
515 waitForMethodCompletion(); 506 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::send, m_pee r, message.isolatedCopy()))))
507 return WebSocketChannel::SendFail;
508
516 return m_syncHelper->sendRequestResult(); 509 return m_syncHelper->sendRequestResult();
517 } 510 }
518 511
519 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(cons t ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength) 512 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(cons t ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
520 { 513 {
521 if (!m_workerGlobalScope) 514 if (hasTerminatedPeer())
522 return WebSocketChannel::SendFail; 515 return WebSocketChannel::SendFail;
523 ASSERT(m_syncHelper); 516
524 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>. 517 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
525 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength)); 518 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
526 if (binaryData.byteLength()) 519 if (binaryData.byteLength())
527 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteO ffset, byteLength); 520 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteO ffset, byteLength);
528 521
529 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::sendArray Buffer, m_peer, data.release())));
530 RefPtr<Bridge> protect(this); 522 RefPtr<Bridge> protect(this);
531 waitForMethodCompletion(); 523 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendArrayBu ffer, m_peer, data.release()))))
524 return WebSocketChannel::SendFail;
525
532 return m_syncHelper->sendRequestResult(); 526 return m_syncHelper->sendRequestResult();
533 } 527 }
534 528
535 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(Pass RefPtr<BlobDataHandle> data) 529 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(Pass RefPtr<BlobDataHandle> data)
536 { 530 {
537 if (!m_workerGlobalScope) 531 if (hasTerminatedPeer())
538 return WebSocketChannel::SendFail; 532 return WebSocketChannel::SendFail;
539 ASSERT(m_syncHelper); 533
540 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::sendBlob, m_peer, data)));
541 RefPtr<Bridge> protect(this); 534 RefPtr<Bridge> protect(this);
542 waitForMethodCompletion(); 535 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m _peer, data))))
536 return WebSocketChannel::SendFail;
537
543 return m_syncHelper->sendRequestResult(); 538 return m_syncHelper->sendRequestResult();
544 } 539 }
545 540
546 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount() 541 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
547 { 542 {
548 if (!m_workerGlobalScope) 543 if (hasTerminatedPeer())
549 return 0; 544 return 0;
550 ASSERT(m_syncHelper); 545
551 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::bufferedA mount, m_peer)));
552 RefPtr<Bridge> protect(this); 546 RefPtr<Bridge> protect(this);
553 waitForMethodCompletion(); 547 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::bufferedAmo unt, m_peer))))
548 return 0;
549
554 return m_syncHelper->bufferedAmount(); 550 return m_syncHelper->bufferedAmount();
555 } 551 }
556 552
557 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& rea son) 553 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& rea son)
558 { 554 {
555 if (hasTerminatedPeer())
556 return;
557
559 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_ peer, code, reason.isolatedCopy()))); 558 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_ peer, code, reason.isolatedCopy())));
560 } 559 }
561 560
562 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, Messag eLevel level, const String& sourceURL, unsigned lineNumber) 561 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, Messag eLevel level, const String& sourceURL, unsigned lineNumber)
563 { 562 {
563 if (hasTerminatedPeer())
564 return;
565
564 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::fail, m_p eer, reason.isolatedCopy(), level, sourceURL.isolatedCopy(), lineNumber))); 566 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::fail, m_p eer, reason.isolatedCopy(), level, sourceURL.isolatedCopy(), lineNumber)));
565 } 567 }
566 568
567 void WorkerThreadableWebSocketChannel::Bridge::disconnect() 569 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
568 { 570 {
569 clearClientWrapper(); 571 clearClientWrapper();
570 terminatePeer(); 572 terminatePeer();
571 } 573 }
572 574
573 void WorkerThreadableWebSocketChannel::Bridge::suspend() 575 void WorkerThreadableWebSocketChannel::Bridge::suspend()
574 { 576 {
577 if (hasTerminatedPeer())
578 return;
579
575 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::suspend, m_peer))); 580 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::suspend, m_peer)));
576 } 581 }
577 582
578 void WorkerThreadableWebSocketChannel::Bridge::resume() 583 void WorkerThreadableWebSocketChannel::Bridge::resume()
579 { 584 {
585 if (hasTerminatedPeer())
586 return;
587
580 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::resume, m _peer))); 588 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::resume, m _peer)));
581 } 589 }
582 590
583 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() 591 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
584 { 592 {
585 m_workerClientWrapper->clearClient(); 593 m_workerClientWrapper->clearClient();
586 } 594 }
587 595
588 // Caller of this function should hold a reference to the bridge, because this f unction may call WebSocket::didClose() in the end, 596 // Caller of this function should hold a reference to the bridge, because this f unction may call WebSocket::didClose() in the end,
589 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference. 597 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
590 bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion() 598 bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion(PassOwnPt r<ExecutionContextTask> task)
591 { 599 {
592 if (!m_syncHelper) 600 ASSERT(m_workerGlobalScope);
593 return true; 601 ASSERT(m_syncHelper);
602
603 m_loaderProxy.postTaskToLoader(task);
594 604
595 blink::WebWaitableEvent* shutdownEvent = m_workerGlobalScope->thread()->shut downEvent(); 605 blink::WebWaitableEvent* shutdownEvent = m_workerGlobalScope->thread()->shut downEvent();
596 Vector<blink::WebWaitableEvent*> events; 606 Vector<blink::WebWaitableEvent*> events;
597 events.append(shutdownEvent); 607 events.append(shutdownEvent);
598 events.append(m_syncHelper->event()); 608 events.append(m_syncHelper->event());
599 609
600 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack); 610 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
601 blink::WebWaitableEvent* signalled = blink::Platform::current()->waitMultipl eEvents(events); 611 blink::WebWaitableEvent* signalled = blink::Platform::current()->waitMultipl eEvents(events);
602 return signalled != shutdownEvent; 612 return signalled != shutdownEvent;
603 } 613 }
604 614
605 void WorkerThreadableWebSocketChannel::Bridge::terminatePeer() 615 void WorkerThreadableWebSocketChannel::Bridge::terminatePeer()
606 { 616 {
607 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer))); 617 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer)));
618 // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
619 // We must not touch m_syncHelper any more.
620 m_syncHelper = 0;
621
622 // We won't use this any more.
608 m_workerGlobalScope = nullptr; 623 m_workerGlobalScope = nullptr;
609 m_syncHelper = 0;
610 } 624 }
611 625
612 } // namespace WebCore 626 } // namespace WebCore
OLDNEW
« no previous file with comments | « Source/modules/websockets/WorkerThreadableWebSocketChannel.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698