Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp

Issue 350763007: [WebSocket] Create Peer on the worker thread. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/websockets/WorkerThreadableWebSocketChannel.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved. 2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
3 * 3 *
4 * Redistribution and use in source and binary forms, with or without 4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are 5 * modification, are permitted provided that the following conditions are
6 * met: 6 * met:
7 * 7 *
8 * * Redistributions of source code must retain the above copyright 8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer. 9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above 10 * * Redistributions in binary form must reproduce the above
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 { 90 {
91 return m_sendRequestResult; 91 return m_sendRequestResult;
92 } 92 }
93 93
94 // This should be called after all setters are called and before any 94 // This should be called after all setters are called and before any
95 // getters are called. 95 // getters are called.
96 void signalWorkerThread() 96 void signalWorkerThread()
97 { 97 {
98 m_event->signal(); 98 m_event->signal();
99 } 99 }
100 100 void wait()
101 blink::WebWaitableEvent* event() const
102 { 101 {
103 return m_event.get(); 102 m_event->wait();
104 } 103 }
105 104
106 void trace(Visitor* visitor) { } 105 void trace(Visitor* visitor) { }
107 106
108 private: 107 private:
109 explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableE vent> event) 108 explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableE vent> event)
110 : m_event(event) 109 : m_event(event)
111 , m_connectRequestResult(false) 110 , m_connectRequestResult(false)
112 , m_sendRequestResult(WebSocketChannel::SendFail) 111 , m_sendRequestResult(WebSocketChannel::SendFail)
113 { 112 {
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
194 m_bridge.clear(); 193 m_bridge.clear();
195 } 194 }
196 195
197 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor) 196 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
198 { 197 {
199 visitor->trace(m_bridge); 198 visitor->trace(m_bridge);
200 visitor->trace(m_workerClientWrapper); 199 visitor->trace(m_workerClientWrapper);
201 WebSocketChannel::trace(visitor); 200 WebSocketChannel::trace(visitor);
202 } 201 }
203 202
204 #if ENABLE(OILPAN) 203 Peer::Peer(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> clien tWrapper, WorkerLoaderProxy& loaderProxy, PassRefPtrWillBeRawPtr<ThreadableWebSo cketChannelSyncHelper> syncHelper)
205 Peer::Peer(RawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, Worker LoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, un signed lineNumber, RawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
206 #else
207 Peer::Peer(PassRefPtr<WeakReference<Peer> > reference, PassRefPtr<ThreadableWebS ocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, Execut ionContext* context, const String& sourceURL, unsigned lineNumber, PassRefPtr<Th readableWebSocketChannelSyncHelper> syncHelper)
208 #endif
209 : m_workerClientWrapper(clientWrapper) 204 : m_workerClientWrapper(clientWrapper)
210 , m_loaderProxy(loaderProxy) 205 , m_loaderProxy(loaderProxy)
211 , m_mainWebSocketChannel(nullptr) 206 , m_mainWebSocketChannel(nullptr)
212 , m_syncHelper(syncHelper) 207 , m_syncHelper(syncHelper)
213 #if ENABLE(OILPAN) 208 {
214 , m_keepAlive(this) 209 ASSERT(!isMainThread());
215 #else 210 }
216 , m_weakFactory(reference, this) 211
217 #endif 212 Peer::~Peer()
213 {
214 ASSERT(!isMainThread());
215 }
216
217 PassOwnPtrWillBeRawPtr<Peer> Peer::create(PassRefPtrWillBeRawPtr<ThreadableWebSo cketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, PassRef PtrWillBeRawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
218 {
219 return adoptPtrWillBeNoop(new Peer(clientWrapper, loaderProxy, syncHelper));
220 }
221
222 void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL , unsigned lineNumber)
218 { 223 {
219 ASSERT(isMainThread()); 224 ASSERT(isMainThread());
220 ASSERT(m_workerClientWrapper.get());
221
222 Document* document = toDocument(context); 225 Document* document = toDocument(context);
223 if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) { 226 if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
224 m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber); 227 m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
225 } else { 228 } else {
226 m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, th is, sourceURL, lineNumber); 229 m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, th is, sourceURL, lineNumber);
227 } 230 }
228 }
229
230 Peer::~Peer()
231 {
232 ASSERT(isMainThread());
233 }
234
235 #if ENABLE(OILPAN)
236 void Peer::initialize(ExecutionContext* context, WeakMember<Peer>* reference, Wo rkerLoaderProxy* loaderProxy, RawPtr<ThreadableWebSocketChannelClientWrapper> cl ientWrapper, const String& sourceURLAtConnection, unsigned lineNumberAtConnectio n, RawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
237 {
238 // The caller must call destroy() to free the peer.
239 *reference = new Peer(clientWrapper, *loaderProxy, context, sourceURLAtConne ction, lineNumberAtConnection, syncHelper);
240 syncHelper->signalWorkerThread();
241 }
242 #else
243 void Peer::initialize(ExecutionContext* context, PassRefPtr<WeakReference<Peer> > reference, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChann elClientWrapper> clientWrapper, const String& sourceURLAtConnection, unsigned li neNumberAtConnection, PassRefPtr<ThreadableWebSocketChannelSyncHelper> prpSyncHe lper)
244 {
245 RefPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = prpSyncHelper;
246 // The caller must call destroy() to free the peer.
247 new Peer(reference, clientWrapper, *loaderProxy, context, sourceURLAtConnect ion, lineNumberAtConnection, syncHelper);
248 syncHelper->signalWorkerThread();
249 }
250 #endif
251
252 void Peer::destroy()
253 {
254 ASSERT(isMainThread());
255 disconnect();
256
257 #if ENABLE(OILPAN)
258 m_keepAlive = nullptr;
259 m_syncHelper->signalWorkerThread(); 231 m_syncHelper->signalWorkerThread();
260 m_syncHelper = nullptr;
261 #else
262 delete this;
263 #endif
264 } 232 }
265 233
266 void Peer::connect(const KURL& url, const String& protocol) 234 void Peer::connect(const KURL& url, const String& protocol)
267 { 235 {
268 ASSERT(isMainThread()); 236 ASSERT(isMainThread());
269 ASSERT(m_syncHelper); 237 ASSERT(m_syncHelper);
270 if (!m_mainWebSocketChannel) { 238 if (!m_mainWebSocketChannel) {
271 m_syncHelper->setConnectRequestResult(false); 239 m_syncHelper->setConnectRequestResult(false);
272 } else { 240 } else {
273 bool connectRequestResult = m_mainWebSocketChannel->connect(url, protoco l); 241 bool connectRequestResult = m_mainWebSocketChannel->connect(url, protoco l);
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
330 ASSERT(m_syncHelper); 298 ASSERT(m_syncHelper);
331 if (!m_mainWebSocketChannel) 299 if (!m_mainWebSocketChannel)
332 return; 300 return;
333 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber); 301 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
334 } 302 }
335 303
336 void Peer::disconnect() 304 void Peer::disconnect()
337 { 305 {
338 ASSERT(isMainThread()); 306 ASSERT(isMainThread());
339 ASSERT(m_syncHelper); 307 ASSERT(m_syncHelper);
340 if (!m_mainWebSocketChannel) 308 if (m_mainWebSocketChannel) {
341 return; 309 m_mainWebSocketChannel->disconnect();
342 m_mainWebSocketChannel->disconnect(); 310 m_mainWebSocketChannel = nullptr;
343 m_mainWebSocketChannel = nullptr; 311 }
312 m_syncHelper->signalWorkerThread();
344 } 313 }
345 314
346 static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtrWil lBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const St ring& subprotocol, const String& extensions) 315 static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtrWil lBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const St ring& subprotocol, const String& extensions)
347 { 316 {
348 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 317 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
349 workerClientWrapper->didConnect(subprotocol, extensions); 318 workerClientWrapper->didConnect(subprotocol, extensions);
350 } 319 }
351 320
352 void Peer::didConnect(const String& subprotocol, const String& extensions) 321 void Peer::didConnect(const String& subprotocol, const String& extensions)
353 { 322 {
354 ASSERT(isMainThread()); 323 ASSERT(isMainThread());
355 // It is important to seprate task creation from posting 324 // It is important to seprate task creation from posting
356 // the task. See the above comment. 325 // the task. See the above comment.
357 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidConnect, m_workerClientWrapper.get(), subprotocol, extensions); 326 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidConnect, m_workerClientWrapper, subprotocol, extensions);
358 m_loaderProxy.postTaskToWorkerGlobalScope(task.release()); 327 m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
359 } 328 }
360 329
361 static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, PassRe fPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, c onst String& message) 330 static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, PassRe fPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, c onst String& message)
362 { 331 {
363 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 332 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
364 workerClientWrapper->didReceiveMessage(message); 333 workerClientWrapper->didReceiveMessage(message);
365 } 334 }
366 335
367 void Peer::didReceiveMessage(const String& message) 336 void Peer::didReceiveMessage(const String& message)
368 { 337 {
369 ASSERT(isMainThread()); 338 ASSERT(isMainThread());
370 // It is important to seprate task creation from posting 339 // It is important to seprate task creation from posting
371 // the task. See the above comment. 340 // the task. See the above comment.
372 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidReceiveMessage, m_workerClientWrapper.get(), message); 341 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidReceiveMessage, m_workerClientWrapper, message);
373 m_loaderProxy.postTaskToWorkerGlobalScope(task.release()); 342 m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
374 } 343 }
375 344
376 static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, Pas sRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper , PassOwnPtr<Vector<char> > binaryData) 345 static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, Pas sRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper , PassOwnPtr<Vector<char> > binaryData)
377 { 346 {
378 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 347 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
379 workerClientWrapper->didReceiveBinaryData(binaryData); 348 workerClientWrapper->didReceiveBinaryData(binaryData);
380 } 349 }
381 350
382 void Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData) 351 void Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
383 { 352 {
384 ASSERT(isMainThread()); 353 ASSERT(isMainThread());
385 // It is important to seprate task creation from posting 354 // It is important to seprate task creation from posting
386 // the task. See the above comment. 355 // the task. See the above comment.
387 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidReceiveBinaryData, m_workerClientWrapper.get(), binaryData); 356 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidReceiveBinaryData, m_workerClientWrapper, binaryData);
388 m_loaderProxy.postTaskToWorkerGlobalScope(task.release()); 357 m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
389 } 358 }
390 359
391 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper, unsigned long consumed) 360 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper, unsigned long consumed)
392 { 361 {
393 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 362 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
394 workerClientWrapper->didConsumeBufferedAmount(consumed); 363 workerClientWrapper->didConsumeBufferedAmount(consumed);
395 } 364 }
396 365
397 void Peer::didConsumeBufferedAmount(unsigned long consumed) 366 void Peer::didConsumeBufferedAmount(unsigned long consumed)
398 { 367 {
399 ASSERT(isMainThread()); 368 ASSERT(isMainThread());
400 // It is important to seprate task creation from posting 369 // It is important to seprate task creation from posting
401 // the task. See the above comment. 370 // the task. See the above comment.
402 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidConsumeBufferedAmount, m_workerClientWrapper.get(), consumed); 371 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidConsumeBufferedAmount, m_workerClientWrapper, consumed);
403 m_loaderProxy.postTaskToWorkerGlobalScope(task.release()); 372 m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
404 } 373 }
405 374
406 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper) 375 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper)
407 { 376 {
408 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 377 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
409 workerClientWrapper->didStartClosingHandshake(); 378 workerClientWrapper->didStartClosingHandshake();
410 } 379 }
411 380
412 void Peer::didStartClosingHandshake() 381 void Peer::didStartClosingHandshake()
413 { 382 {
414 ASSERT(isMainThread()); 383 ASSERT(isMainThread());
415 // It is important to seprate task creation from posting 384 // It is important to seprate task creation from posting
416 // the task. See the above comment. 385 // the task. See the above comment.
417 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidStartClosingHandshake, m_workerClientWrapper.get()); 386 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidStartClosingHandshake, m_workerClientWrapper);
418 m_loaderProxy.postTaskToWorkerGlobalScope(task.release()); 387 m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
419 } 388 }
420 389
421 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillB eRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketC hannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsig ned short code, const String& reason) 390 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillB eRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketC hannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsig ned short code, const String& reason)
422 { 391 {
423 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 392 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
424 workerClientWrapper->didClose(closingHandshakeCompletion, code, reason); 393 workerClientWrapper->didClose(closingHandshakeCompletion, code, reason);
425 } 394 }
426 395
427 void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) 396 void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
428 { 397 {
429 ASSERT(isMainThread()); 398 ASSERT(isMainThread());
430 m_mainWebSocketChannel = nullptr; 399 if (m_mainWebSocketChannel) {
400 m_mainWebSocketChannel->disconnect();
401 m_mainWebSocketChannel = nullptr;
402 }
431 // It is important to seprate task creation from posting 403 // It is important to seprate task creation from posting
432 // the task. See the above comment. 404 // the task. See the above comment.
433 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidClose, m_workerClientWrapper.get(), closingHandshakeCompletion, code, reason) ; 405 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidClose, m_workerClientWrapper, closingHandshakeCompletion, code, reason);
434 m_loaderProxy.postTaskToWorkerGlobalScope(task.release()); 406 m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
435 } 407 }
436 408
437 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, P assRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapp er) 409 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, P assRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapp er)
438 { 410 {
439 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 411 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
440 workerClientWrapper->didReceiveMessageError(); 412 workerClientWrapper->didReceiveMessageError();
441 } 413 }
442 414
443 void Peer::didReceiveMessageError() 415 void Peer::didReceiveMessageError()
444 { 416 {
445 ASSERT(isMainThread()); 417 ASSERT(isMainThread());
446 // It is important to seprate task creation from posting 418 // It is important to seprate task creation from posting
447 // the task. See the above comment. 419 // the task. See the above comment.
448 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidReceiveMessageError, m_workerClientWrapper.get()); 420 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&workerGlobalScope DidReceiveMessageError, m_workerClientWrapper);
449 m_loaderProxy.postTaskToWorkerGlobalScope(task.release()); 421 m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
450 } 422 }
451 423
452 void Peer::trace(Visitor* visitor) 424 void Peer::trace(Visitor* visitor)
453 { 425 {
454 visitor->trace(m_workerClientWrapper); 426 visitor->trace(m_workerClientWrapper);
455 visitor->trace(m_mainWebSocketChannel); 427 visitor->trace(m_mainWebSocketChannel);
456 visitor->trace(m_syncHelper); 428 visitor->trace(m_syncHelper);
457 WebSocketChannelClient::trace(visitor); 429 WebSocketChannelClient::trace(visitor);
458 } 430 }
459 431
460 Bridge::Bridge(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> w orkerClientWrapper, WorkerGlobalScope& workerGlobalScope) 432 Bridge::Bridge(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> w orkerClientWrapper, WorkerGlobalScope& workerGlobalScope)
461 : m_workerClientWrapper(workerClientWrapper) 433 : m_workerClientWrapper(workerClientWrapper)
462 , m_workerGlobalScope(workerGlobalScope) 434 , m_workerGlobalScope(workerGlobalScope)
463 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy()) 435 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
464 , m_syncHelper(nullptr) 436 , m_syncHelper(ThreadableWebSocketChannelSyncHelper::create(adoptPtr(blink:: Platform::current()->createWaitableEvent())))
465 , m_peer(nullptr) 437 , m_peer(Peer::create(m_workerClientWrapper, m_loaderProxy, m_syncHelper))
466 { 438 {
467 ASSERT(m_workerClientWrapper.get()); 439 ASSERT(m_workerClientWrapper.get());
468 } 440 }
469 441
470 Bridge::~Bridge() 442 Bridge::~Bridge()
471 { 443 {
472 ASSERT(hasTerminatedPeer()); 444 ASSERT(!m_peer);
473 } 445 }
474 446
475 void Bridge::initialize(const String& sourceURL, unsigned lineNumber) 447 void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
476 { 448 {
477 #if !ENABLE(OILPAN)
478 RefPtr<WeakReference<Peer> > reference = WeakReference<Peer>::createUnbound( );
479 m_peer = WeakPtr<Peer>(reference);
480 #endif
481
482 RefPtrWillBeRawPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = Thread ableWebSocketChannelSyncHelper::create(adoptPtr(blink::Platform::current()->crea teWaitableEvent()));
483 // This pointer is guaranteed to be valid until we call terminatePeer.
484 m_syncHelper = syncHelper.get();
485
486 RefPtrWillBeRawPtr<Bridge> protect(this); 449 RefPtrWillBeRawPtr<Bridge> protect(this);
487 #if ENABLE(OILPAN)
488 // In order to assure all temporary objects to be destroyed before 450 // In order to assure all temporary objects to be destroyed before
489 // posting the task, we separate task creation and posting. 451 // posting the task, we separate task creation and posting.
490 // In other words, it is dangerous to have a complicated expression 452 // In other words, it is dangerous to have a complicated expression
491 // as a waitForMethodCompletion argument. 453 // as a waitForMethodCompletion argument.
492 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&Peer::initialize, &m_peer, AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper.get(), s ourceURL, lineNumber, syncHelper.get()); 454 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber);
493 #else
494 // See the above comment.
495 OwnPtr<ExecutionContextTask> task = createCrossThreadTask(&Peer::initialize, reference, AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper.get(), sourceURL, lineNumber, syncHelper.get());
496 #endif
497 if (!waitForMethodCompletion(task.release())) { 455 if (!waitForMethodCompletion(task.release())) {
498 // The worker thread has been signalled to shutdown before method comple tion. 456 // The worker thread has been signalled to shutdown before method comple tion.
499 disconnect(); 457 disconnect();
500 } 458 }
501 } 459 }
502 460
503 bool Bridge::connect(const KURL& url, const String& protocol) 461 bool Bridge::connect(const KURL& url, const String& protocol)
504 { 462 {
505 if (hasTerminatedPeer()) 463 if (!m_peer)
506 return false; 464 return false;
507 465
508 RefPtrWillBeRawPtr<Bridge> protect(this); 466 RefPtrWillBeRawPtr<Bridge> protect(this);
509 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer, u rl, protocol))) 467 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.ge t(), url, protocol)))
510 return false; 468 return false;
511 469
512 return m_syncHelper->connectRequestResult(); 470 return m_syncHelper->connectRequestResult();
513 } 471 }
514 472
515 WebSocketChannel::SendResult Bridge::send(const String& message) 473 WebSocketChannel::SendResult Bridge::send(const String& message)
516 { 474 {
517 if (hasTerminatedPeer()) 475 if (!m_peer)
518 return WebSocketChannel::SendFail; 476 return WebSocketChannel::SendFail;
519 477
520 RefPtrWillBeRawPtr<Bridge> protect(this); 478 RefPtrWillBeRawPtr<Bridge> protect(this);
521 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::send, m_peer, mess age))) 479 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::send, m_peer.get() , message)))
522 return WebSocketChannel::SendFail; 480 return WebSocketChannel::SendFail;
523 481
524 return m_syncHelper->sendRequestResult(); 482 return m_syncHelper->sendRequestResult();
525 } 483 }
526 484
527 WebSocketChannel::SendResult Bridge::send(const ArrayBuffer& binaryData, unsigne d byteOffset, unsigned byteLength) 485 WebSocketChannel::SendResult Bridge::send(const ArrayBuffer& binaryData, unsigne d byteOffset, unsigned byteLength)
528 { 486 {
529 if (hasTerminatedPeer()) 487 if (!m_peer)
530 return WebSocketChannel::SendFail; 488 return WebSocketChannel::SendFail;
531 489
532 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>. 490 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
533 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength)); 491 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
534 if (binaryData.byteLength()) 492 if (binaryData.byteLength())
535 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteO ffset, byteLength); 493 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteO ffset, byteLength);
536 494
537 RefPtrWillBeRawPtr<Bridge> protect(this); 495 RefPtrWillBeRawPtr<Bridge> protect(this);
538 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::sendArrayBuffer, m _peer, data.release()))) 496 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::sendArrayBuffer, m _peer.get(), data.release())))
539 return WebSocketChannel::SendFail; 497 return WebSocketChannel::SendFail;
540 498
541 return m_syncHelper->sendRequestResult(); 499 return m_syncHelper->sendRequestResult();
542 } 500 }
543 501
544 WebSocketChannel::SendResult Bridge::send(PassRefPtr<BlobDataHandle> data) 502 WebSocketChannel::SendResult Bridge::send(PassRefPtr<BlobDataHandle> data)
545 { 503 {
546 if (hasTerminatedPeer()) 504 if (!m_peer)
547 return WebSocketChannel::SendFail; 505 return WebSocketChannel::SendFail;
548 506
549 RefPtrWillBeRawPtr<Bridge> protect(this); 507 RefPtrWillBeRawPtr<Bridge> protect(this);
550 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::sendBlob, m_peer, data))) 508 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::sendBlob, m_peer.g et(), data)))
551 return WebSocketChannel::SendFail; 509 return WebSocketChannel::SendFail;
552 510
553 return m_syncHelper->sendRequestResult(); 511 return m_syncHelper->sendRequestResult();
554 } 512 }
555 513
556 void Bridge::close(int code, const String& reason) 514 void Bridge::close(int code, const String& reason)
557 { 515 {
558 if (hasTerminatedPeer()) 516 if (!m_peer)
559 return; 517 return;
560 518
561 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer, c ode, reason)); 519 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.ge t(), code, reason));
562 } 520 }
563 521
564 void Bridge::fail(const String& reason, MessageLevel level, const String& source URL, unsigned lineNumber) 522 void Bridge::fail(const String& reason, MessageLevel level, const String& source URL, unsigned lineNumber)
565 { 523 {
566 if (hasTerminatedPeer()) 524 if (!m_peer)
567 return; 525 return;
568 526
569 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer, re ason, level, sourceURL, lineNumber)); 527 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get (), reason, level, sourceURL, lineNumber));
570 } 528 }
571 529
572 void Bridge::disconnect() 530 void Bridge::disconnect()
573 { 531 {
574 if (hasTerminatedPeer()) 532 if (!m_peer)
575 return; 533 return;
576 534
577 clearClientWrapper(); 535 m_workerClientWrapper->clearClient();
578 terminatePeer(); 536 waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get( )));
579 } 537 // Here |m_peer| is detached from the main thread and we can delete it.
580 538
581 void Bridge::clearClientWrapper() 539 m_peer = nullptr;
582 { 540 m_syncHelper = nullptr;
583 m_workerClientWrapper->clearClient(); 541 // We won't use this any more.
542 m_workerGlobalScope.clear();
584 } 543 }
585 544
586 // Caller of this function should hold a reference to the bridge, because this f unction may call WebSocket::didClose() in the end, 545 // Caller of this function should hold a reference to the bridge, because this f unction may call WebSocket::didClose() in the end,
587 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference. 546 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
588 bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task) 547 bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
589 { 548 {
590 ASSERT(m_workerGlobalScope); 549 ASSERT(m_workerGlobalScope);
591 ASSERT(m_syncHelper); 550 ASSERT(m_syncHelper);
592 551
593 m_loaderProxy.postTaskToLoader(task); 552 m_loaderProxy.postTaskToLoader(task);
594 553
595 // We wait for the syncHelper event even if a shutdown event is fired. 554 // We wait for the syncHelper event even if a shutdown event is fired.
596 // See https://codereview.chromium.org/267323004/#msg43 for why we need to w ait this. 555 // See https://codereview.chromium.org/267323004/#msg43 for why we need to w ait this.
597 Vector<blink::WebWaitableEvent*> events;
598 events.append(m_syncHelper->event());
599 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack); 556 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
600 blink::Platform::current()->waitMultipleEvents(events); 557 m_syncHelper->wait();
601 // This is checking whether a shutdown event is fired or not. 558 // This is checking whether a shutdown event is fired or not.
602 return !m_workerGlobalScope->thread()->terminated(); 559 return !m_workerGlobalScope->thread()->terminated();
603 } 560 }
604 561
605 void Bridge::terminatePeer()
606 {
607 ASSERT(!hasTerminatedPeer());
608
609 #if ENABLE(OILPAN)
610 // The worker thread has to wait for the main thread to complete Peer::destr oy,
611 // because the worker thread has to make sure that the main thread does not have any
612 // references to on-heap objects allocated in the thread heap of the worker thread
613 // before the worker thread shuts down.
614 waitForMethodCompletion(createCrossThreadTask(&Peer::destroy, m_peer));
615 #else
616 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::destroy, m_peer) );
617 #endif
618
619 // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
620 // We must not touch m_syncHelper any more.
621 m_syncHelper = nullptr;
622
623 // We won't use this any more.
624 m_workerGlobalScope = nullptr;
625 }
626
627 void Bridge::trace(Visitor* visitor) 562 void Bridge::trace(Visitor* visitor)
628 { 563 {
629 visitor->trace(m_workerClientWrapper); 564 visitor->trace(m_workerClientWrapper);
630 visitor->trace(m_workerGlobalScope); 565 visitor->trace(m_workerGlobalScope);
631 visitor->trace(m_syncHelper); 566 visitor->trace(m_syncHelper);
632 visitor->trace(m_peer); 567 visitor->trace(m_peer);
633 } 568 }
634 569
635 } // namespace blink 570 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/websockets/WorkerThreadableWebSocketChannel.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698