OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved. | |
3 * | |
4 * Redistribution and use in source and binary forms, with or without | |
5 * modification, are permitted provided that the following conditions are | |
6 * met: | |
7 * | |
8 * * Redistributions of source code must retain the above copyright | |
9 * notice, this list of conditions and the following disclaimer. | |
10 * * Redistributions in binary form must reproduce the above | |
11 * copyright notice, this list of conditions and the following disclaimer | |
12 * in the documentation and/or other materials provided with the | |
13 * distribution. | |
14 * * Neither the name of Google Inc. nor the names of its | |
15 * contributors may be used to endorse or promote products derived from | |
16 * this software without specific prior written permission. | |
17 * | |
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | |
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | |
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | |
22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | |
23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | |
24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | |
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | |
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | |
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
29 */ | |
30 | |
31 #include "config.h" | |
32 | |
33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h" | |
34 | |
35 #include "bindings/core/v8/ScriptCallStackFactory.h" | |
36 #include "core/dom/CrossThreadTask.h" | |
37 #include "core/dom/Document.h" | |
38 #include "core/dom/ExecutionContext.h" | |
39 #include "core/dom/ExecutionContextTask.h" | |
40 #include "core/fileapi/Blob.h" | |
41 #include "core/inspector/ScriptCallFrame.h" | |
42 #include "core/inspector/ScriptCallStack.h" | |
43 #include "core/workers/WorkerGlobalScope.h" | |
44 #include "core/workers/WorkerLoaderProxy.h" | |
45 #include "core/workers/WorkerThread.h" | |
46 #include "modules/websockets/DocumentWebSocketChannel.h" | |
47 #include "public/platform/Platform.h" | |
48 #include "public/platform/WebWaitableEvent.h" | |
49 #include "wtf/ArrayBuffer.h" | |
50 #include "wtf/Assertions.h" | |
51 #include "wtf/Functional.h" | |
52 #include "wtf/MainThread.h" | |
53 #include "wtf/text/WTFString.h" | |
54 | |
55 namespace blink { | |
56 | |
57 typedef WorkerThreadableWebSocketChannel::Bridge Bridge; | |
58 typedef WorkerThreadableWebSocketChannel::Peer Peer; | |
59 | |
60 // Created and destroyed on the worker thread. All setters of this class are | |
61 // called on the main thread, while all getters are called on the worker | |
62 // thread. signalWorkerThread() must be called before any getters are called. | |
63 class ThreadableWebSocketChannelSyncHelper : public GarbageCollectedFinalized<Th
readableWebSocketChannelSyncHelper> { | |
64 public: | |
65 static ThreadableWebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEv
ent> event) | |
66 { | |
67 return new ThreadableWebSocketChannelSyncHelper(event); | |
68 } | |
69 | |
70 ~ThreadableWebSocketChannelSyncHelper() | |
71 { | |
72 } | |
73 | |
74 // All setters are called on the main thread. | |
75 void setConnectRequestResult(bool connectRequestResult) | |
76 { | |
77 m_connectRequestResult = connectRequestResult; | |
78 } | |
79 | |
80 // All getter are called on the worker thread. | |
81 bool connectRequestResult() const | |
82 { | |
83 return m_connectRequestResult; | |
84 } | |
85 | |
86 // This should be called after all setters are called and before any | |
87 // getters are called. | |
88 void signalWorkerThread() | |
89 { | |
90 m_event->signal(); | |
91 } | |
92 void wait() | |
93 { | |
94 m_event->wait(); | |
95 } | |
96 | |
97 void trace(Visitor* visitor) { } | |
98 | |
99 private: | |
100 explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> e
vent) | |
101 : m_event(event) | |
102 , m_connectRequestResult(false) | |
103 { | |
104 } | |
105 | |
106 OwnPtr<WebWaitableEvent> m_event; | |
107 bool m_connectRequestResult; | |
108 }; | |
109 | |
110 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS
cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL
, unsigned lineNumber) | |
111 : m_bridge(new Bridge(client, workerGlobalScope)) | |
112 , m_sourceURLAtConnection(sourceURL) | |
113 , m_lineNumberAtConnection(lineNumber) | |
114 { | |
115 m_bridge->initialize(sourceURL, lineNumber); | |
116 } | |
117 | |
118 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel() | |
119 { | |
120 ASSERT(!m_bridge); | |
121 } | |
122 | |
123 bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& pr
otocol) | |
124 { | |
125 ASSERT(m_bridge); | |
126 return m_bridge->connect(url, protocol); | |
127 } | |
128 | |
129 void WorkerThreadableWebSocketChannel::send(const String& message) | |
130 { | |
131 ASSERT(m_bridge); | |
132 m_bridge->send(message); | |
133 } | |
134 | |
135 void WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsig
ned byteOffset, unsigned byteLength) | |
136 { | |
137 ASSERT(m_bridge); | |
138 m_bridge->send(binaryData, byteOffset, byteLength); | |
139 } | |
140 | |
141 void WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData) | |
142 { | |
143 ASSERT(m_bridge); | |
144 m_bridge->send(blobData); | |
145 } | |
146 | |
147 void WorkerThreadableWebSocketChannel::close(int code, const String& reason) | |
148 { | |
149 ASSERT(m_bridge); | |
150 m_bridge->close(code, reason); | |
151 } | |
152 | |
153 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel l
evel, const String& sourceURL, unsigned lineNumber) | |
154 { | |
155 if (!m_bridge) | |
156 return; | |
157 | |
158 RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, tru
e); | |
159 if (callStack && callStack->size()) { | |
160 // In order to emulate the ConsoleMessage behavior, | |
161 // we should ignore the specified url and line number if | |
162 // we can get the JavaScript context. | |
163 m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->a
t(0).lineNumber()); | |
164 } else if (sourceURL.isEmpty() && !lineNumber) { | |
165 // No information is specified by the caller - use the url | |
166 // and the line number at the connection. | |
167 m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtCon
nection); | |
168 } else { | |
169 // Use the specified information. | |
170 m_bridge->fail(reason, level, sourceURL, lineNumber); | |
171 } | |
172 } | |
173 | |
174 void WorkerThreadableWebSocketChannel::disconnect() | |
175 { | |
176 m_bridge->disconnect(); | |
177 m_bridge.clear(); | |
178 } | |
179 | |
180 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor) | |
181 { | |
182 visitor->trace(m_bridge); | |
183 WebSocketChannel::trace(visitor); | |
184 } | |
185 | |
186 Peer::Peer(Bridge* bridge, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketCh
annelSyncHelper* syncHelper) | |
187 : m_bridge(bridge) | |
188 , m_loaderProxy(loaderProxy) | |
189 , m_mainWebSocketChannel(nullptr) | |
190 , m_syncHelper(syncHelper) | |
191 { | |
192 ASSERT(!isMainThread()); | |
193 } | |
194 | |
195 Peer::~Peer() | |
196 { | |
197 ASSERT(!isMainThread()); | |
198 } | |
199 | |
200 void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL
, unsigned lineNumber) | |
201 { | |
202 ASSERT(isMainThread()); | |
203 Document* document = toDocument(context); | |
204 m_mainWebSocketChannel = DocumentWebSocketChannel::create(document, this, so
urceURL, lineNumber); | |
205 m_syncHelper->signalWorkerThread(); | |
206 } | |
207 | |
208 void Peer::connect(const KURL& url, const String& protocol) | |
209 { | |
210 ASSERT(isMainThread()); | |
211 ASSERT(m_syncHelper); | |
212 if (!m_mainWebSocketChannel) { | |
213 m_syncHelper->setConnectRequestResult(false); | |
214 } else { | |
215 bool connectRequestResult = m_mainWebSocketChannel->connect(url, protoco
l); | |
216 m_syncHelper->setConnectRequestResult(connectRequestResult); | |
217 } | |
218 m_syncHelper->signalWorkerThread(); | |
219 } | |
220 | |
221 void Peer::send(const String& message) | |
222 { | |
223 ASSERT(isMainThread()); | |
224 if (m_mainWebSocketChannel) | |
225 m_mainWebSocketChannel->send(message); | |
226 } | |
227 | |
228 void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data) | |
229 { | |
230 ASSERT(isMainThread()); | |
231 if (m_mainWebSocketChannel) | |
232 m_mainWebSocketChannel->send(data); | |
233 } | |
234 | |
235 void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData) | |
236 { | |
237 ASSERT(isMainThread()); | |
238 if (m_mainWebSocketChannel) | |
239 m_mainWebSocketChannel->send(blobData); | |
240 } | |
241 | |
242 void Peer::close(int code, const String& reason) | |
243 { | |
244 ASSERT(isMainThread()); | |
245 ASSERT(m_syncHelper); | |
246 if (!m_mainWebSocketChannel) | |
247 return; | |
248 m_mainWebSocketChannel->close(code, reason); | |
249 } | |
250 | |
251 void Peer::fail(const String& reason, MessageLevel level, const String& sourceUR
L, unsigned lineNumber) | |
252 { | |
253 ASSERT(isMainThread()); | |
254 ASSERT(m_syncHelper); | |
255 if (!m_mainWebSocketChannel) | |
256 return; | |
257 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber); | |
258 } | |
259 | |
260 void Peer::disconnect() | |
261 { | |
262 ASSERT(isMainThread()); | |
263 ASSERT(m_syncHelper); | |
264 if (m_mainWebSocketChannel) { | |
265 m_mainWebSocketChannel->disconnect(); | |
266 m_mainWebSocketChannel = nullptr; | |
267 } | |
268 m_syncHelper->signalWorkerThread(); | |
269 } | |
270 | |
271 static void workerGlobalScopeDidConnect(ExecutionContext* context, Bridge* bridg
e, const String& subprotocol, const String& extensions) | |
272 { | |
273 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); | |
274 if (bridge->client()) | |
275 bridge->client()->didConnect(subprotocol, extensions); | |
276 } | |
277 | |
278 void Peer::didConnect(const String& subprotocol, const String& extensions) | |
279 { | |
280 ASSERT(isMainThread()); | |
281 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba
lScopeDidConnect, m_bridge, subprotocol, extensions)); | |
282 } | |
283 | |
284 static void workerGlobalScopeDidReceiveTextMessage(ExecutionContext* context, Br
idge* bridge, const String& payload) | |
285 { | |
286 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); | |
287 if (bridge->client()) | |
288 bridge->client()->didReceiveTextMessage(payload); | |
289 } | |
290 | |
291 void Peer::didReceiveTextMessage(const String& payload) | |
292 { | |
293 ASSERT(isMainThread()); | |
294 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba
lScopeDidReceiveTextMessage, m_bridge, payload)); | |
295 } | |
296 | |
297 static void workerGlobalScopeDidReceiveBinaryMessage(ExecutionContext* context,
Bridge* bridge, PassOwnPtr<Vector<char> > payload) | |
298 { | |
299 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); | |
300 if (bridge->client()) | |
301 bridge->client()->didReceiveBinaryMessage(payload); | |
302 } | |
303 | |
304 void Peer::didReceiveBinaryMessage(PassOwnPtr<Vector<char> > payload) | |
305 { | |
306 ASSERT(isMainThread()); | |
307 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba
lScopeDidReceiveBinaryMessage, m_bridge, payload)); | |
308 } | |
309 | |
310 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context,
Bridge* bridge, unsigned long consumed) | |
311 { | |
312 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); | |
313 if (bridge->client()) | |
314 bridge->client()->didConsumeBufferedAmount(consumed); | |
315 } | |
316 | |
317 void Peer::didConsumeBufferedAmount(unsigned long consumed) | |
318 { | |
319 ASSERT(isMainThread()); | |
320 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba
lScopeDidConsumeBufferedAmount, m_bridge, consumed)); | |
321 } | |
322 | |
323 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context,
Bridge* bridge) | |
324 { | |
325 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); | |
326 if (bridge->client()) | |
327 bridge->client()->didStartClosingHandshake(); | |
328 } | |
329 | |
330 void Peer::didStartClosingHandshake() | |
331 { | |
332 ASSERT(isMainThread()); | |
333 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba
lScopeDidStartClosingHandshake, m_bridge)); | |
334 } | |
335 | |
336 static void workerGlobalScopeDidClose(ExecutionContext* context, Bridge* bridge,
WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeComple
tion, unsigned short code, const String& reason) | |
337 { | |
338 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); | |
339 if (bridge->client()) | |
340 bridge->client()->didClose(closingHandshakeCompletion, code, reason); | |
341 } | |
342 | |
343 void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion,
unsigned short code, const String& reason) | |
344 { | |
345 ASSERT(isMainThread()); | |
346 if (m_mainWebSocketChannel) { | |
347 m_mainWebSocketChannel->disconnect(); | |
348 m_mainWebSocketChannel = nullptr; | |
349 } | |
350 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba
lScopeDidClose, m_bridge, closingHandshakeCompletion, code, reason)); | |
351 } | |
352 | |
353 static void workerGlobalScopeDidError(ExecutionContext* context, Bridge* bridge) | |
354 { | |
355 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); | |
356 if (bridge->client()) | |
357 bridge->client()->didError(); | |
358 } | |
359 | |
360 void Peer::didError() | |
361 { | |
362 ASSERT(isMainThread()); | |
363 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGloba
lScopeDidError, m_bridge)); | |
364 } | |
365 | |
366 void Peer::trace(Visitor* visitor) | |
367 { | |
368 visitor->trace(m_bridge); | |
369 visitor->trace(m_mainWebSocketChannel); | |
370 visitor->trace(m_syncHelper); | |
371 WebSocketChannelClient::trace(visitor); | |
372 } | |
373 | |
374 Bridge::Bridge(WebSocketChannelClient* client, WorkerGlobalScope& workerGlobalSc
ope) | |
375 : m_client(client) | |
376 , m_workerGlobalScope(workerGlobalScope) | |
377 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy()) | |
378 , m_syncHelper(ThreadableWebSocketChannelSyncHelper::create(adoptPtr(Platfor
m::current()->createWaitableEvent()))) | |
379 , m_peer(new Peer(this, m_loaderProxy, m_syncHelper)) | |
380 { | |
381 } | |
382 | |
383 Bridge::~Bridge() | |
384 { | |
385 ASSERT(!m_peer); | |
386 } | |
387 | |
388 void Bridge::initialize(const String& sourceURL, unsigned lineNumber) | |
389 { | |
390 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::initialize, AllowC
rossThreadAccess(m_peer.get()), sourceURL, lineNumber))) { | |
391 // The worker thread has been signalled to shutdown before method comple
tion. | |
392 disconnect(); | |
393 } | |
394 } | |
395 | |
396 bool Bridge::connect(const KURL& url, const String& protocol) | |
397 { | |
398 if (!m_peer) | |
399 return false; | |
400 | |
401 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.ge
t(), url, protocol))) | |
402 return false; | |
403 | |
404 return m_syncHelper->connectRequestResult(); | |
405 } | |
406 | |
407 void Bridge::send(const String& message) | |
408 { | |
409 ASSERT(m_peer); | |
410 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get
(), message)); | |
411 } | |
412 | |
413 void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned b
yteLength) | |
414 { | |
415 ASSERT(m_peer); | |
416 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied
into Vector<char>. | |
417 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength)); | |
418 if (binaryData.byteLength()) | |
419 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteO
ffset, byteLength); | |
420 | |
421 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer,
m_peer.get(), data.release())); | |
422 } | |
423 | |
424 void Bridge::send(PassRefPtr<BlobDataHandle> data) | |
425 { | |
426 ASSERT(m_peer); | |
427 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer
.get(), data)); | |
428 } | |
429 | |
430 void Bridge::close(int code, const String& reason) | |
431 { | |
432 ASSERT(m_peer); | |
433 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.ge
t(), code, reason)); | |
434 } | |
435 | |
436 void Bridge::fail(const String& reason, MessageLevel level, const String& source
URL, unsigned lineNumber) | |
437 { | |
438 ASSERT(m_peer); | |
439 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get
(), reason, level, sourceURL, lineNumber)); | |
440 } | |
441 | |
442 void Bridge::disconnect() | |
443 { | |
444 if (!m_peer) | |
445 return; | |
446 | |
447 waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get(
))); | |
448 // Here |m_peer| is detached from the main thread and we can delete it. | |
449 | |
450 m_client = nullptr; | |
451 m_peer = nullptr; | |
452 m_syncHelper = nullptr; | |
453 // We won't use this any more. | |
454 m_workerGlobalScope.clear(); | |
455 } | |
456 | |
457 // Caller of this function should hold a reference to the bridge, because this f
unction may call WebSocket::didClose() in the end, | |
458 // which causes the bridge to get disconnected from the WebSocket and deleted if
there is no other reference. | |
459 bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task) | |
460 { | |
461 ASSERT(m_workerGlobalScope); | |
462 ASSERT(m_syncHelper); | |
463 | |
464 m_loaderProxy.postTaskToLoader(task); | |
465 | |
466 // We wait for the syncHelper event even if a shutdown event is fired. | |
467 // See https://codereview.chromium.org/267323004/#msg43 for why we need to w
ait this. | |
468 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack); | |
469 m_syncHelper->wait(); | |
470 // This is checking whether a shutdown event is fired or not. | |
471 return !m_workerGlobalScope->thread()->terminated(); | |
472 } | |
473 | |
474 void Bridge::trace(Visitor* visitor) | |
475 { | |
476 visitor->trace(m_client); | |
477 visitor->trace(m_workerGlobalScope); | |
478 visitor->trace(m_syncHelper); | |
479 visitor->trace(m_peer); | |
480 } | |
481 | |
482 } // namespace blink | |
OLD | NEW |