Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2775)

Side by Side Diff: Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp

Issue 701753002: [WebSocket] Remove "Threadable" from class names. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 * * Neither the name of Google Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived from
16 * this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "config.h"
32
33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h"
34
35 #include "bindings/core/v8/ScriptCallStackFactory.h"
36 #include "core/dom/CrossThreadTask.h"
37 #include "core/dom/Document.h"
38 #include "core/dom/ExecutionContext.h"
39 #include "core/dom/ExecutionContextTask.h"
40 #include "core/fileapi/Blob.h"
41 #include "core/inspector/ScriptCallFrame.h"
42 #include "core/inspector/ScriptCallStack.h"
43 #include "core/workers/WorkerGlobalScope.h"
44 #include "core/workers/WorkerLoaderProxy.h"
45 #include "core/workers/WorkerThread.h"
46 #include "modules/websockets/DocumentWebSocketChannel.h"
47 #include "public/platform/Platform.h"
48 #include "public/platform/WebWaitableEvent.h"
49 #include "wtf/ArrayBuffer.h"
50 #include "wtf/Assertions.h"
51 #include "wtf/Functional.h"
52 #include "wtf/MainThread.h"
53 #include "wtf/text/WTFString.h"
54
55 namespace blink {
56
57 typedef WorkerThreadableWebSocketChannel::Bridge Bridge;
58 typedef WorkerThreadableWebSocketChannel::Peer Peer;
59
60 // Created and destroyed on the worker thread. All setters of this class are
61 // called on the main thread, while all getters are called on the worker
62 // thread. signalWorkerThread() must be called before any getters are called.
63 class ThreadableWebSocketChannelSyncHelper : public GarbageCollectedFinalized<Th readableWebSocketChannelSyncHelper> {
64 public:
65 static ThreadableWebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEv ent> event)
66 {
67 return new ThreadableWebSocketChannelSyncHelper(event);
68 }
69
70 ~ThreadableWebSocketChannelSyncHelper()
71 {
72 }
73
74 // All setters are called on the main thread.
75 void setConnectRequestResult(bool connectRequestResult)
76 {
77 m_connectRequestResult = connectRequestResult;
78 }
79
80 // All getter are called on the worker thread.
81 bool connectRequestResult() const
82 {
83 return m_connectRequestResult;
84 }
85
86 // This should be called after all setters are called and before any
87 // getters are called.
88 void signalWorkerThread()
89 {
90 m_event->signal();
91 }
92 void wait()
93 {
94 m_event->wait();
95 }
96
97 void trace(Visitor* visitor) { }
98
99 private:
100 explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> e vent)
101 : m_event(event)
102 , m_connectRequestResult(false)
103 {
104 }
105
106 OwnPtr<WebWaitableEvent> m_event;
107 bool m_connectRequestResult;
108 };
109
110 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL , unsigned lineNumber)
111 : m_bridge(new Bridge(client, workerGlobalScope))
112 , m_sourceURLAtConnection(sourceURL)
113 , m_lineNumberAtConnection(lineNumber)
114 {
115 m_bridge->initialize(sourceURL, lineNumber);
116 }
117
118 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
119 {
120 ASSERT(!m_bridge);
121 }
122
123 bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& pr otocol)
124 {
125 ASSERT(m_bridge);
126 return m_bridge->connect(url, protocol);
127 }
128
129 void WorkerThreadableWebSocketChannel::send(const String& message)
130 {
131 ASSERT(m_bridge);
132 m_bridge->send(message);
133 }
134
135 void WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsig ned byteOffset, unsigned byteLength)
136 {
137 ASSERT(m_bridge);
138 m_bridge->send(binaryData, byteOffset, byteLength);
139 }
140
141 void WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
142 {
143 ASSERT(m_bridge);
144 m_bridge->send(blobData);
145 }
146
147 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
148 {
149 ASSERT(m_bridge);
150 m_bridge->close(code, reason);
151 }
152
153 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel l evel, const String& sourceURL, unsigned lineNumber)
154 {
155 if (!m_bridge)
156 return;
157
158 RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, tru e);
159 if (callStack && callStack->size()) {
160 // In order to emulate the ConsoleMessage behavior,
161 // we should ignore the specified url and line number if
162 // we can get the JavaScript context.
163 m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->a t(0).lineNumber());
164 } else if (sourceURL.isEmpty() && !lineNumber) {
165 // No information is specified by the caller - use the url
166 // and the line number at the connection.
167 m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtCon nection);
168 } else {
169 // Use the specified information.
170 m_bridge->fail(reason, level, sourceURL, lineNumber);
171 }
172 }
173
174 void WorkerThreadableWebSocketChannel::disconnect()
175 {
176 m_bridge->disconnect();
177 m_bridge.clear();
178 }
179
180 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
181 {
182 visitor->trace(m_bridge);
183 WebSocketChannel::trace(visitor);
184 }
185
186 Peer::Peer(Bridge* bridge, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketCh annelSyncHelper* syncHelper)
187 : m_bridge(bridge)
188 , m_loaderProxy(loaderProxy)
189 , m_mainWebSocketChannel(nullptr)
190 , m_syncHelper(syncHelper)
191 {
192 ASSERT(!isMainThread());
193 }
194
195 Peer::~Peer()
196 {
197 ASSERT(!isMainThread());
198 }
199
200 void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL , unsigned lineNumber)
201 {
202 ASSERT(isMainThread());
203 Document* document = toDocument(context);
204 m_mainWebSocketChannel = DocumentWebSocketChannel::create(document, this, so urceURL, lineNumber);
205 m_syncHelper->signalWorkerThread();
206 }
207
208 void Peer::connect(const KURL& url, const String& protocol)
209 {
210 ASSERT(isMainThread());
211 ASSERT(m_syncHelper);
212 if (!m_mainWebSocketChannel) {
213 m_syncHelper->setConnectRequestResult(false);
214 } else {
215 bool connectRequestResult = m_mainWebSocketChannel->connect(url, protoco l);
216 m_syncHelper->setConnectRequestResult(connectRequestResult);
217 }
218 m_syncHelper->signalWorkerThread();
219 }
220
221 void Peer::send(const String& message)
222 {
223 ASSERT(isMainThread());
224 if (m_mainWebSocketChannel)
225 m_mainWebSocketChannel->send(message);
226 }
227
228 void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
229 {
230 ASSERT(isMainThread());
231 if (m_mainWebSocketChannel)
232 m_mainWebSocketChannel->send(data);
233 }
234
235 void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
236 {
237 ASSERT(isMainThread());
238 if (m_mainWebSocketChannel)
239 m_mainWebSocketChannel->send(blobData);
240 }
241
242 void Peer::close(int code, const String& reason)
243 {
244 ASSERT(isMainThread());
245 ASSERT(m_syncHelper);
246 if (!m_mainWebSocketChannel)
247 return;
248 m_mainWebSocketChannel->close(code, reason);
249 }
250
251 void Peer::fail(const String& reason, MessageLevel level, const String& sourceUR L, unsigned lineNumber)
252 {
253 ASSERT(isMainThread());
254 ASSERT(m_syncHelper);
255 if (!m_mainWebSocketChannel)
256 return;
257 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
258 }
259
260 void Peer::disconnect()
261 {
262 ASSERT(isMainThread());
263 ASSERT(m_syncHelper);
264 if (m_mainWebSocketChannel) {
265 m_mainWebSocketChannel->disconnect();
266 m_mainWebSocketChannel = nullptr;
267 }
268 m_syncHelper->signalWorkerThread();
269 }
270
271 static void workerGlobalScopeDidConnect(ExecutionContext* context, Bridge* bridg e, const String& subprotocol, const String& extensions)
272 {
273 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
274 if (bridge->client())
275 bridge->client()->didConnect(subprotocol, extensions);
276 }
277
278 void Peer::didConnect(const String& subprotocol, const String& extensions)
279 {
280 ASSERT(isMainThread());
281 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba lScopeDidConnect, m_bridge, subprotocol, extensions));
282 }
283
284 static void workerGlobalScopeDidReceiveTextMessage(ExecutionContext* context, Br idge* bridge, const String& payload)
285 {
286 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
287 if (bridge->client())
288 bridge->client()->didReceiveTextMessage(payload);
289 }
290
291 void Peer::didReceiveTextMessage(const String& payload)
292 {
293 ASSERT(isMainThread());
294 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba lScopeDidReceiveTextMessage, m_bridge, payload));
295 }
296
297 static void workerGlobalScopeDidReceiveBinaryMessage(ExecutionContext* context, Bridge* bridge, PassOwnPtr<Vector<char> > payload)
298 {
299 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
300 if (bridge->client())
301 bridge->client()->didReceiveBinaryMessage(payload);
302 }
303
304 void Peer::didReceiveBinaryMessage(PassOwnPtr<Vector<char> > payload)
305 {
306 ASSERT(isMainThread());
307 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba lScopeDidReceiveBinaryMessage, m_bridge, payload));
308 }
309
310 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, Bridge* bridge, unsigned long consumed)
311 {
312 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
313 if (bridge->client())
314 bridge->client()->didConsumeBufferedAmount(consumed);
315 }
316
317 void Peer::didConsumeBufferedAmount(unsigned long consumed)
318 {
319 ASSERT(isMainThread());
320 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba lScopeDidConsumeBufferedAmount, m_bridge, consumed));
321 }
322
323 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, Bridge* bridge)
324 {
325 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
326 if (bridge->client())
327 bridge->client()->didStartClosingHandshake();
328 }
329
330 void Peer::didStartClosingHandshake()
331 {
332 ASSERT(isMainThread());
333 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba lScopeDidStartClosingHandshake, m_bridge));
334 }
335
336 static void workerGlobalScopeDidClose(ExecutionContext* context, Bridge* bridge, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeComple tion, unsigned short code, const String& reason)
337 {
338 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
339 if (bridge->client())
340 bridge->client()->didClose(closingHandshakeCompletion, code, reason);
341 }
342
343 void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
344 {
345 ASSERT(isMainThread());
346 if (m_mainWebSocketChannel) {
347 m_mainWebSocketChannel->disconnect();
348 m_mainWebSocketChannel = nullptr;
349 }
350 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba lScopeDidClose, m_bridge, closingHandshakeCompletion, code, reason));
351 }
352
353 static void workerGlobalScopeDidError(ExecutionContext* context, Bridge* bridge)
354 {
355 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
356 if (bridge->client())
357 bridge->client()->didError();
358 }
359
360 void Peer::didError()
361 {
362 ASSERT(isMainThread());
363 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba lScopeDidError, m_bridge));
364 }
365
366 void Peer::trace(Visitor* visitor)
367 {
368 visitor->trace(m_bridge);
369 visitor->trace(m_mainWebSocketChannel);
370 visitor->trace(m_syncHelper);
371 WebSocketChannelClient::trace(visitor);
372 }
373
374 Bridge::Bridge(WebSocketChannelClient* client, WorkerGlobalScope& workerGlobalSc ope)
375 : m_client(client)
376 , m_workerGlobalScope(workerGlobalScope)
377 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
378 , m_syncHelper(ThreadableWebSocketChannelSyncHelper::create(adoptPtr(Platfor m::current()->createWaitableEvent())))
379 , m_peer(new Peer(this, m_loaderProxy, m_syncHelper))
380 {
381 }
382
383 Bridge::~Bridge()
384 {
385 ASSERT(!m_peer);
386 }
387
388 void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
389 {
390 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::initialize, AllowC rossThreadAccess(m_peer.get()), sourceURL, lineNumber))) {
391 // The worker thread has been signalled to shutdown before method comple tion.
392 disconnect();
393 }
394 }
395
396 bool Bridge::connect(const KURL& url, const String& protocol)
397 {
398 if (!m_peer)
399 return false;
400
401 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.ge t(), url, protocol)))
402 return false;
403
404 return m_syncHelper->connectRequestResult();
405 }
406
407 void Bridge::send(const String& message)
408 {
409 ASSERT(m_peer);
410 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get (), message));
411 }
412
413 void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned b yteLength)
414 {
415 ASSERT(m_peer);
416 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
417 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
418 if (binaryData.byteLength())
419 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteO ffset, byteLength);
420
421 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release()));
422 }
423
424 void Bridge::send(PassRefPtr<BlobDataHandle> data)
425 {
426 ASSERT(m_peer);
427 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer .get(), data));
428 }
429
430 void Bridge::close(int code, const String& reason)
431 {
432 ASSERT(m_peer);
433 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.ge t(), code, reason));
434 }
435
436 void Bridge::fail(const String& reason, MessageLevel level, const String& source URL, unsigned lineNumber)
437 {
438 ASSERT(m_peer);
439 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get (), reason, level, sourceURL, lineNumber));
440 }
441
442 void Bridge::disconnect()
443 {
444 if (!m_peer)
445 return;
446
447 waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get( )));
448 // Here |m_peer| is detached from the main thread and we can delete it.
449
450 m_client = nullptr;
451 m_peer = nullptr;
452 m_syncHelper = nullptr;
453 // We won't use this any more.
454 m_workerGlobalScope.clear();
455 }
456
457 // Caller of this function should hold a reference to the bridge, because this f unction may call WebSocket::didClose() in the end,
458 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
459 bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
460 {
461 ASSERT(m_workerGlobalScope);
462 ASSERT(m_syncHelper);
463
464 m_loaderProxy.postTaskToLoader(task);
465
466 // We wait for the syncHelper event even if a shutdown event is fired.
467 // See https://codereview.chromium.org/267323004/#msg43 for why we need to w ait this.
468 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
469 m_syncHelper->wait();
470 // This is checking whether a shutdown event is fired or not.
471 return !m_workerGlobalScope->thread()->terminated();
472 }
473
474 void Bridge::trace(Visitor* visitor)
475 {
476 visitor->trace(m_client);
477 visitor->trace(m_workerGlobalScope);
478 visitor->trace(m_syncHelper);
479 visitor->trace(m_peer);
480 }
481
482 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/websockets/WorkerThreadableWebSocketChannel.h ('k') | Source/modules/websockets/WorkerWebSocketChannel.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698