Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(121)

Side by Side Diff: Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp

Issue 311993006: [WebSocket] bufferedAmount should not decrease inside a task. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved. 2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
3 * 3 *
4 * Redistribution and use in source and binary forms, with or without 4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are 5 * modification, are permitted provided that the following conditions are
6 * met: 6 * met:
7 * 7 *
8 * * Redistributions of source code must retain the above copyright 8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer. 9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above 10 * * Redistributions in binary form must reproduce the above
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 67
68 // All setters are called on the main thread. 68 // All setters are called on the main thread.
69 void setConnectRequestResult(bool connectRequestResult) 69 void setConnectRequestResult(bool connectRequestResult)
70 { 70 {
71 m_connectRequestResult = connectRequestResult; 71 m_connectRequestResult = connectRequestResult;
72 } 72 }
73 void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult) 73 void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult)
74 { 74 {
75 m_sendRequestResult = sendRequestResult; 75 m_sendRequestResult = sendRequestResult;
76 } 76 }
77 void setBufferedAmount(unsigned long bufferedAmount)
78 {
79 m_bufferedAmount = bufferedAmount;
80 }
81 77
82 // All getter are called on the worker thread. 78 // All getter are called on the worker thread.
83 bool connectRequestResult() const 79 bool connectRequestResult() const
84 { 80 {
85 return m_connectRequestResult; 81 return m_connectRequestResult;
86 } 82 }
87 WebSocketChannel::SendResult sendRequestResult() const 83 WebSocketChannel::SendResult sendRequestResult() const
88 { 84 {
89 return m_sendRequestResult; 85 return m_sendRequestResult;
90 } 86 }
91 unsigned long bufferedAmount() const
92 {
93 return m_bufferedAmount;
94 }
95 87
96 // This should be called after all setters are called and before any 88 // This should be called after all setters are called and before any
97 // getters are called. 89 // getters are called.
98 void signalWorkerThread() 90 void signalWorkerThread()
99 { 91 {
100 m_event->signal(); 92 m_event->signal();
101 } 93 }
102 94
103 blink::WebWaitableEvent* event() const 95 blink::WebWaitableEvent* event() const
104 { 96 {
105 return m_event.get(); 97 return m_event.get();
106 } 98 }
107 99
108 private: 100 private:
109 ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> eve nt) 101 ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> eve nt)
110 : m_event(event) 102 : m_event(event)
111 , m_connectRequestResult(false) 103 , m_connectRequestResult(false)
112 , m_sendRequestResult(WebSocketChannel::SendFail) 104 , m_sendRequestResult(WebSocketChannel::SendFail)
113 , m_bufferedAmount(0)
114 { 105 {
115 } 106 }
116 107
117 OwnPtr<blink::WebWaitableEvent> m_event; 108 OwnPtr<blink::WebWaitableEvent> m_event;
118 bool m_connectRequestResult; 109 bool m_connectRequestResult;
119 WebSocketChannel::SendResult m_sendRequestResult; 110 WebSocketChannel::SendResult m_sendRequestResult;
120 unsigned long m_bufferedAmount;
121 }; 111 };
122 112
123 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL , unsigned lineNumber) 113 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL , unsigned lineNumber)
124 : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(clie nt)) 114 : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(clie nt))
125 , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope)) 115 , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
126 , m_sourceURLAtConnection(sourceURL) 116 , m_sourceURLAtConnection(sourceURL)
127 , m_lineNumberAtConnection(lineNumber) 117 , m_lineNumberAtConnection(lineNumber)
128 { 118 {
129 ASSERT(m_workerClientWrapper.get()); 119 ASSERT(m_workerClientWrapper.get());
130 m_bridge->initialize(sourceURL, lineNumber); 120 m_bridge->initialize(sourceURL, lineNumber);
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
167 return m_bridge->send(binaryData, byteOffset, byteLength); 157 return m_bridge->send(binaryData, byteOffset, byteLength);
168 } 158 }
169 159
170 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<B lobDataHandle> blobData) 160 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<B lobDataHandle> blobData)
171 { 161 {
172 if (!m_bridge) 162 if (!m_bridge)
173 return WebSocketChannel::SendFail; 163 return WebSocketChannel::SendFail;
174 return m_bridge->send(blobData); 164 return m_bridge->send(blobData);
175 } 165 }
176 166
177 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
178 {
179 if (!m_bridge)
180 return 0;
181 return m_bridge->bufferedAmount();
182 }
183
184 void WorkerThreadableWebSocketChannel::close(int code, const String& reason) 167 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
185 { 168 {
186 if (m_bridge) 169 if (m_bridge)
187 m_bridge->close(code, reason); 170 m_bridge->close(code, reason);
188 } 171 }
189 172
190 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel l evel, const String& sourceURL, unsigned lineNumber) 173 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel l evel, const String& sourceURL, unsigned lineNumber)
191 { 174 {
192 if (!m_bridge) 175 if (!m_bridge)
193 return; 176 return;
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
314 ASSERT(isMainThread()); 297 ASSERT(isMainThread());
315 if (!m_mainWebSocketChannel) { 298 if (!m_mainWebSocketChannel) {
316 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail); 299 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
317 } else { 300 } else {
318 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel- >send(blobData); 301 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel- >send(blobData);
319 m_syncHelper->setSendRequestResult(sendRequestResult); 302 m_syncHelper->setSendRequestResult(sendRequestResult);
320 } 303 }
321 m_syncHelper->signalWorkerThread(); 304 m_syncHelper->signalWorkerThread();
322 } 305 }
323 306
324 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
325 {
326 ASSERT(isMainThread());
327 if (!m_mainWebSocketChannel) {
328 m_syncHelper->setBufferedAmount(0);
329 } else {
330 unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
331 m_syncHelper->setBufferedAmount(bufferedAmount);
332 }
333 m_syncHelper->signalWorkerThread();
334 }
335
336 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reaso n) 307 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reaso n)
337 { 308 {
338 ASSERT(isMainThread()); 309 ASSERT(isMainThread());
339 if (!m_mainWebSocketChannel) 310 if (!m_mainWebSocketChannel)
340 return; 311 return;
341 m_mainWebSocketChannel->close(code, reason); 312 m_mainWebSocketChannel->close(code, reason);
342 } 313 }
343 314
344 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageL evel level, const String& sourceURL, unsigned lineNumber) 315 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageL evel level, const String& sourceURL, unsigned lineNumber)
345 { 316 {
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
405 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 376 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
406 workerClientWrapper->didReceiveBinaryData(binaryData); 377 workerClientWrapper->didReceiveBinaryData(binaryData);
407 } 378 }
408 379
409 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vec tor<char> > binaryData) 380 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vec tor<char> > binaryData)
410 { 381 {
411 ASSERT(isMainThread()); 382 ASSERT(isMainThread());
412 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData)); 383 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData));
413 } 384 }
414 385
415 static void workerGlobalScopeDidUpdateBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrap per, unsigned long bufferedAmount) 386 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper, unsigned long consumed)
416 { 387 {
417 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 388 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
418 workerClientWrapper->didUpdateBufferedAmount(bufferedAmount); 389 workerClientWrapper->didConsumeBufferedAmount(consumed);
419 } 390 }
420 391
421 void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned lo ng bufferedAmount) 392 void WorkerThreadableWebSocketChannel::Peer::didConsumeBufferedAmount(unsigned l ong consumed)
422 { 393 {
423 ASSERT(isMainThread()); 394 ASSERT(isMainThread());
424 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidUpdateBufferedAmount, m_workerClientWrapper.get(), bufferedAmount)); 395 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidConsumeBufferedAmount, m_workerClientWrapper.get(), consumed));
425 } 396 }
426 397
427 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper) 398 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper)
428 { 399 {
429 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 400 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
430 workerClientWrapper->didStartClosingHandshake(); 401 workerClientWrapper->didStartClosingHandshake();
431 } 402 }
432 403
433 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake() 404 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
434 { 405 {
435 ASSERT(isMainThread()); 406 ASSERT(isMainThread());
436 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidStartClosingHandshake, m_workerClientWrapper.get())); 407 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidStartClosingHandshake, m_workerClientWrapper.get()));
437 } 408 }
438 409
439 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillB eRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned l ong unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionS tatus closingHandshakeCompletion, unsigned short code, const String& reason) 410 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillB eRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketC hannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsig ned short code, const String& reason)
440 { 411 {
441 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 412 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
442 workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompl etion, code, reason); 413 workerClientWrapper->didClose(closingHandshakeCompletion, code, reason);
443 } 414 }
444 415
445 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBuf feredAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsign ed short code, const String& reason) 416 void WorkerThreadableWebSocketChannel::Peer::didClose(ClosingHandshakeCompletion Status closingHandshakeCompletion, unsigned short code, const String& reason)
446 { 417 {
447 ASSERT(isMainThread()); 418 ASSERT(isMainThread());
448 m_mainWebSocketChannel = nullptr; 419 m_mainWebSocketChannel = nullptr;
449 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidClose, m_workerClientWrapper.get(), unhandledBufferedAmount, closingHandsh akeCompletion, code, reason)); 420 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidClose, m_workerClientWrapper.get(), closingHandshakeCompletion, code, reas on));
450 } 421 }
451 422
452 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, P assRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapp er) 423 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, P assRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapp er)
453 { 424 {
454 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 425 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
455 workerClientWrapper->didReceiveMessageError(); 426 workerClientWrapper->didReceiveMessageError();
456 } 427 }
457 428
458 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError() 429 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
459 { 430 {
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
537 if (hasTerminatedPeer()) 508 if (hasTerminatedPeer())
538 return WebSocketChannel::SendFail; 509 return WebSocketChannel::SendFail;
539 510
540 RefPtr<Bridge> protect(this); 511 RefPtr<Bridge> protect(this);
541 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m _peer, data)))) 512 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m _peer, data))))
542 return WebSocketChannel::SendFail; 513 return WebSocketChannel::SendFail;
543 514
544 return m_syncHelper->sendRequestResult(); 515 return m_syncHelper->sendRequestResult();
545 } 516 }
546 517
547 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
548 {
549 if (hasTerminatedPeer())
550 return 0;
551
552 RefPtr<Bridge> protect(this);
553 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::bufferedAmo unt, m_peer))))
554 return 0;
555
556 return m_syncHelper->bufferedAmount();
557 }
558
559 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& rea son) 518 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& rea son)
560 { 519 {
561 if (hasTerminatedPeer()) 520 if (hasTerminatedPeer())
562 return; 521 return;
563 522
564 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_ peer, code, reason.isolatedCopy()))); 523 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_ peer, code, reason.isolatedCopy())));
565 } 524 }
566 525
567 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, Messag eLevel level, const String& sourceURL, unsigned lineNumber) 526 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, Messag eLevel level, const String& sourceURL, unsigned lineNumber)
568 { 527 {
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
623 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer))); 582 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer)));
624 // Peer::destroy() deletes m_peer and then m_syncHelper will be released. 583 // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
625 // We must not touch m_syncHelper any more. 584 // We must not touch m_syncHelper any more.
626 m_syncHelper = 0; 585 m_syncHelper = 0;
627 586
628 // We won't use this any more. 587 // We won't use this any more.
629 m_workerGlobalScope = nullptr; 588 m_workerGlobalScope = nullptr;
630 } 589 }
631 590
632 } // namespace WebCore 591 } // namespace WebCore
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698