Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2294)

Unified Diff: Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp

Issue 350763007: [WebSocket] Create Peer on the worker thread. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp
diff --git a/Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp b/Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp
index 5340c467f353adfbb22fa2fcc46fe29fd3933598..d7785b86e4c680a749ba664ea56d31b1f64a6dc3 100644
--- a/Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp
+++ b/Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp
@@ -201,66 +201,35 @@ void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
WebSocketChannel::trace(visitor);
}
-#if ENABLE(OILPAN)
-Peer::Peer(RawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, unsigned lineNumber, RawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
-#else
-Peer::Peer(PassRefPtr<WeakReference<Peer> > reference, PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, unsigned lineNumber, PassRefPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
-#endif
+Peer::Peer(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
: m_workerClientWrapper(clientWrapper)
, m_loaderProxy(loaderProxy)
, m_mainWebSocketChannel(nullptr)
, m_syncHelper(syncHelper)
-#if ENABLE(OILPAN)
- , m_keepAlive(this)
-#else
- , m_weakFactory(reference, this)
-#endif
{
- ASSERT(isMainThread());
- ASSERT(m_workerClientWrapper.get());
-
- Document* document = toDocument(context);
- if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
- m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
- } else {
- m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
- }
+ ASSERT(!isMainThread());
}
Peer::~Peer()
{
- ASSERT(isMainThread());
+ ASSERT(!isMainThread());
}
-#if ENABLE(OILPAN)
-void Peer::initialize(ExecutionContext* context, WeakMember<Peer>* reference, WorkerLoaderProxy* loaderProxy, RawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& sourceURLAtConnection, unsigned lineNumberAtConnection, RawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
+PassOwnPtrWillBeRawPtr<Peer> Peer::create(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
{
- // The caller must call destroy() to free the peer.
- *reference = new Peer(clientWrapper, *loaderProxy, context, sourceURLAtConnection, lineNumberAtConnection, syncHelper);
- syncHelper->signalWorkerThread();
+ return adoptPtrWillBeNoop(new Peer(clientWrapper, loaderProxy, syncHelper));
}
-#else
-void Peer::initialize(ExecutionContext* context, PassRefPtr<WeakReference<Peer> > reference, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& sourceURLAtConnection, unsigned lineNumberAtConnection, PassRefPtr<ThreadableWebSocketChannelSyncHelper> prpSyncHelper)
-{
- RefPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = prpSyncHelper;
- // The caller must call destroy() to free the peer.
- new Peer(reference, clientWrapper, *loaderProxy, context, sourceURLAtConnection, lineNumberAtConnection, syncHelper);
- syncHelper->signalWorkerThread();
-}
-#endif
-void Peer::destroy()
+void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL, unsigned lineNumber)
{
ASSERT(isMainThread());
- disconnect();
-
-#if ENABLE(OILPAN)
- m_keepAlive = nullptr;
+ Document* document = toDocument(context);
+ if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
+ m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
+ } else {
+ m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
+ }
m_syncHelper->signalWorkerThread();
- m_syncHelper = nullptr;
-#else
- delete this;
-#endif
}
void Peer::connect(const KURL& url, const String& protocol)
@@ -337,10 +306,11 @@ void Peer::disconnect()
{
ASSERT(isMainThread());
ASSERT(m_syncHelper);
- if (!m_mainWebSocketChannel)
- return;
- m_mainWebSocketChannel->disconnect();
- m_mainWebSocketChannel = nullptr;
+ if (m_mainWebSocketChannel) {
+ m_mainWebSocketChannel->disconnect();
+ m_mainWebSocketChannel = nullptr;
+ }
+ m_syncHelper->signalWorkerThread();
}
static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
@@ -354,7 +324,7 @@ void Peer::didConnect(const String& subprotocol, const String& extensions)
ASSERT(isMainThread());
// It is important to seprate task creation from posting
// the task. See the above comment.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidConnect, m_workerClientWrapper.get(), subprotocol, extensions);
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, subprotocol, extensions);
m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
@@ -369,7 +339,7 @@ void Peer::didReceiveMessage(const String& message)
ASSERT(isMainThread());
// It is important to seprate task creation from posting
// the task. See the above comment.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper.get(), message);
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message);
m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
@@ -384,7 +354,7 @@ void Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
ASSERT(isMainThread());
// It is important to seprate task creation from posting
// the task. See the above comment.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData);
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData);
m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
@@ -399,7 +369,7 @@ void Peer::didConsumeBufferedAmount(unsigned long consumed)
ASSERT(isMainThread());
// It is important to seprate task creation from posting
// the task. See the above comment.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_workerClientWrapper.get(), consumed);
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_workerClientWrapper, consumed);
m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
@@ -414,7 +384,7 @@ void Peer::didStartClosingHandshake()
ASSERT(isMainThread());
// It is important to seprate task creation from posting
// the task. See the above comment.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper.get());
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper);
m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
@@ -430,7 +400,7 @@ void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion,
m_mainWebSocketChannel = nullptr;
// It is important to seprate task creation from posting
// the task. See the above comment.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidClose, m_workerClientWrapper.get(), closingHandshakeCompletion, code, reason);
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidClose, m_workerClientWrapper, closingHandshakeCompletion, code, reason);
m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
@@ -445,7 +415,7 @@ void Peer::didReceiveMessageError()
ASSERT(isMainThread());
// It is important to seprate task creation from posting
// the task. See the above comment.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper.get());
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper);
m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
}
@@ -462,38 +432,28 @@ Bridge::Bridge(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> w
, m_workerGlobalScope(workerGlobalScope)
, m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
, m_syncHelper(nullptr)
- , m_peer(nullptr)
+ , m_peer(Peer::create(m_workerClientWrapper, m_loaderProxy, m_syncHelper))
{
ASSERT(m_workerClientWrapper.get());
}
Bridge::~Bridge()
{
- ASSERT(hasTerminatedPeer());
+ ASSERT(!hasPeer());
}
void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
{
-#if !ENABLE(OILPAN)
- RefPtr<WeakReference<Peer> > reference = WeakReference<Peer>::createUnbound();
- m_peer = WeakPtr<Peer>(reference);
-#endif
-
RefPtrWillBeRawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = ThreadableWebSocketChannelSyncHelper::create(adoptPtr(blink::Platform::current()->createWaitableEvent()));
// This pointer is guaranteed to be valid until we call terminatePeer.
m_syncHelper = syncHelper.get();
RefPtrWillBeRawPtr<Bridge> protect(this);
-#if ENABLE(OILPAN)
// In order to assure all temporary objects to be destroyed before
// posting the task, we separate task creation and posting.
// In other words, it is dangerous to have a complicated expression
// as a waitForMethodCompletion argument.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&Peer::initialize, &m_peer, AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper.get(), sourceURL, lineNumber, syncHelper.get());
-#else
- // See the above comment.
- OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&Peer::initialize, reference, AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper.get(), sourceURL, lineNumber, syncHelper.get());
-#endif
+ OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber);
if (!waitForMethodCompletion(task.release())) {
// The worker thread has been signalled to shutdown before method completion.
disconnect();
@@ -502,11 +462,11 @@ void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
bool Bridge::connect(const KURL& url, const String& protocol)
{
- if (hasTerminatedPeer())
+ if (!hasPeer())
return false;
RefPtrWillBeRawPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer, url, protocol)))
+ if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.get(), url, protocol)))
return false;
return m_syncHelper->connectRequestResult();
@@ -514,11 +474,11 @@ bool Bridge::connect(const KURL& url, const String& protocol)
WebSocketChannel::SendResult Bridge::send(const String& message)
{
- if (hasTerminatedPeer())
+ if (!hasPeer())
return WebSocketChannel::SendFail;
RefPtrWillBeRawPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(createCrossThreadTask(&Peer::send, m_peer, message)))
+ if (!waitForMethodCompletion(createCrossThreadTask(&Peer::send, m_peer.get(), message)))
return WebSocketChannel::SendFail;
return m_syncHelper->sendRequestResult();
@@ -526,7 +486,7 @@ WebSocketChannel::SendResult Bridge::send(const String& message)
WebSocketChannel::SendResult Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
{
- if (hasTerminatedPeer())
+ if (!hasPeer())
return WebSocketChannel::SendFail;
// ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
@@ -535,7 +495,7 @@ WebSocketChannel::SendResult Bridge::send(const ArrayBuffer& binaryData, unsigne
memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
RefPtrWillBeRawPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer, data.release())))
+ if (!waitForMethodCompletion(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release())))
return WebSocketChannel::SendFail;
return m_syncHelper->sendRequestResult();
@@ -543,11 +503,11 @@ WebSocketChannel::SendResult Bridge::send(const ArrayBuffer& binaryData, unsigne
WebSocketChannel::SendResult Bridge::send(PassRefPtr<BlobDataHandle> data)
{
- if (hasTerminatedPeer())
+ if (!hasPeer())
return WebSocketChannel::SendFail;
RefPtrWillBeRawPtr<Bridge> protect(this);
- if (!waitForMethodCompletion(createCrossThreadTask(&Peer::sendBlob, m_peer, data)))
+ if (!waitForMethodCompletion(createCrossThreadTask(&Peer::sendBlob, m_peer.get(), data)))
return WebSocketChannel::SendFail;
return m_syncHelper->sendRequestResult();
@@ -555,32 +515,32 @@ WebSocketChannel::SendResult Bridge::send(PassRefPtr<BlobDataHandle> data)
void Bridge::close(int code, const String& reason)
{
- if (hasTerminatedPeer())
+ if (!hasPeer())
return;
- m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer, code, reason));
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.get(), code, reason));
}
void Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
{
- if (hasTerminatedPeer())
+ if (!hasPeer())
return;
- m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer, reason, level, sourceURL, lineNumber));
+ m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get(), reason, level, sourceURL, lineNumber));
}
void Bridge::disconnect()
{
- if (hasTerminatedPeer())
+ if (!hasPeer())
return;
- clearClientWrapper();
- terminatePeer();
-}
+ waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get()));
+ // Here |m_peer| is detached from the main thread and we can delete it.
-void Bridge::clearClientWrapper()
-{
- m_workerClientWrapper->clearClient();
+ m_peer = nullptr;
+ m_syncHelper = nullptr;
+ // We won't use this any more.
+ m_workerGlobalScope.clear();
}
// Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
@@ -602,28 +562,6 @@ bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
return !m_workerGlobalScope->thread()->terminated();
}
-void Bridge::terminatePeer()
-{
- ASSERT(!hasTerminatedPeer());
-
-#if ENABLE(OILPAN)
- // The worker thread has to wait for the main thread to complete Peer::destroy,
- // because the worker thread has to make sure that the main thread does not have any
- // references to on-heap objects allocated in the thread heap of the worker thread
- // before the worker thread shuts down.
- waitForMethodCompletion(createCrossThreadTask(&Peer::destroy, m_peer));
-#else
- m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::destroy, m_peer));
-#endif
-
- // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
- // We must not touch m_syncHelper any more.
- m_syncHelper = nullptr;
-
- // We won't use this any more.
- m_workerGlobalScope = nullptr;
-}
-
void Bridge::trace(Visitor* visitor)
{
visitor->trace(m_workerClientWrapper);

Powered by Google App Engine
This is Rietveld 408576698