Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp

Issue 311993006: [WebSocket] bufferedAmount should not decrease inside a task. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved. 2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
3 * 3 *
4 * Redistribution and use in source and binary forms, with or without 4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are 5 * modification, are permitted provided that the following conditions are
6 * met: 6 * met:
7 * 7 *
8 * * Redistributions of source code must retain the above copyright 8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer. 9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above 10 * * Redistributions in binary form must reproduce the above
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 67
68 // All setters are called on the main thread. 68 // All setters are called on the main thread.
69 void setConnectRequestResult(bool connectRequestResult) 69 void setConnectRequestResult(bool connectRequestResult)
70 { 70 {
71 m_connectRequestResult = connectRequestResult; 71 m_connectRequestResult = connectRequestResult;
72 } 72 }
73 void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult) 73 void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult)
74 { 74 {
75 m_sendRequestResult = sendRequestResult; 75 m_sendRequestResult = sendRequestResult;
76 } 76 }
77 void setBufferedAmount(unsigned long bufferedAmount)
78 {
79 m_bufferedAmount = bufferedAmount;
80 }
81 77
82 // All getter are called on the worker thread. 78 // All getter are called on the worker thread.
83 bool connectRequestResult() const 79 bool connectRequestResult() const
84 { 80 {
85 return m_connectRequestResult; 81 return m_connectRequestResult;
86 } 82 }
87 WebSocketChannel::SendResult sendRequestResult() const 83 WebSocketChannel::SendResult sendRequestResult() const
88 { 84 {
89 return m_sendRequestResult; 85 return m_sendRequestResult;
90 } 86 }
91 unsigned long bufferedAmount() const
92 {
93 return m_bufferedAmount;
94 }
95 87
96 // This should be called after all setters are called and before any 88 // This should be called after all setters are called and before any
97 // getters are called. 89 // getters are called.
98 void signalWorkerThread() 90 void signalWorkerThread()
99 { 91 {
100 m_event->signal(); 92 m_event->signal();
101 } 93 }
102 94
103 blink::WebWaitableEvent* event() const 95 blink::WebWaitableEvent* event() const
104 { 96 {
105 return m_event.get(); 97 return m_event.get();
106 } 98 }
107 99
108 private: 100 private:
109 ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> eve nt) 101 ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> eve nt)
110 : m_event(event) 102 : m_event(event)
111 , m_connectRequestResult(false) 103 , m_connectRequestResult(false)
112 , m_sendRequestResult(WebSocketChannel::SendFail) 104 , m_sendRequestResult(WebSocketChannel::SendFail)
113 , m_bufferedAmount(0)
114 { 105 {
115 } 106 }
116 107
117 OwnPtr<blink::WebWaitableEvent> m_event; 108 OwnPtr<blink::WebWaitableEvent> m_event;
118 bool m_connectRequestResult; 109 bool m_connectRequestResult;
119 WebSocketChannel::SendResult m_sendRequestResult; 110 WebSocketChannel::SendResult m_sendRequestResult;
120 unsigned long m_bufferedAmount;
121 }; 111 };
122 112
123 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL , unsigned lineNumber) 113 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL , unsigned lineNumber)
124 : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(clie nt)) 114 : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(clie nt))
125 , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope)) 115 , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
126 , m_sourceURLAtConnection(sourceURL) 116 , m_sourceURLAtConnection(sourceURL)
127 , m_lineNumberAtConnection(lineNumber) 117 , m_lineNumberAtConnection(lineNumber)
128 { 118 {
129 ASSERT(m_workerClientWrapper.get()); 119 ASSERT(m_workerClientWrapper.get());
130 m_bridge->initialize(sourceURL, lineNumber); 120 m_bridge->initialize(sourceURL, lineNumber);
(...skipping 26 matching lines...) Expand all
157 return m_bridge->send(binaryData, byteOffset, byteLength); 147 return m_bridge->send(binaryData, byteOffset, byteLength);
158 } 148 }
159 149
160 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<B lobDataHandle> blobData) 150 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<B lobDataHandle> blobData)
161 { 151 {
162 if (!m_bridge) 152 if (!m_bridge)
163 return WebSocketChannel::SendFail; 153 return WebSocketChannel::SendFail;
164 return m_bridge->send(blobData); 154 return m_bridge->send(blobData);
165 } 155 }
166 156
167 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
168 {
169 if (!m_bridge)
170 return 0;
171 return m_bridge->bufferedAmount();
172 }
173
174 void WorkerThreadableWebSocketChannel::close(int code, const String& reason) 157 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
175 { 158 {
176 if (m_bridge) 159 if (m_bridge)
177 m_bridge->close(code, reason); 160 m_bridge->close(code, reason);
178 } 161 }
179 162
180 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel l evel, const String& sourceURL, unsigned lineNumber) 163 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel l evel, const String& sourceURL, unsigned lineNumber)
181 { 164 {
182 if (!m_bridge) 165 if (!m_bridge)
183 return; 166 return;
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
290 ASSERT(isMainThread()); 273 ASSERT(isMainThread());
291 if (!m_mainWebSocketChannel) { 274 if (!m_mainWebSocketChannel) {
292 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail); 275 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
293 } else { 276 } else {
294 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel- >send(blobData); 277 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel- >send(blobData);
295 m_syncHelper->setSendRequestResult(sendRequestResult); 278 m_syncHelper->setSendRequestResult(sendRequestResult);
296 } 279 }
297 m_syncHelper->signalWorkerThread(); 280 m_syncHelper->signalWorkerThread();
298 } 281 }
299 282
300 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
301 {
302 ASSERT(isMainThread());
303 if (!m_mainWebSocketChannel) {
304 m_syncHelper->setBufferedAmount(0);
305 } else {
306 unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
307 m_syncHelper->setBufferedAmount(bufferedAmount);
308 }
309 m_syncHelper->signalWorkerThread();
310 }
311
312 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reaso n) 283 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reaso n)
313 { 284 {
314 ASSERT(isMainThread()); 285 ASSERT(isMainThread());
315 if (!m_mainWebSocketChannel) 286 if (!m_mainWebSocketChannel)
316 return; 287 return;
317 m_mainWebSocketChannel->close(code, reason); 288 m_mainWebSocketChannel->close(code, reason);
318 } 289 }
319 290
320 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageL evel level, const String& sourceURL, unsigned lineNumber) 291 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageL evel level, const String& sourceURL, unsigned lineNumber)
321 { 292 {
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
363 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 334 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
364 workerClientWrapper->didReceiveBinaryData(binaryData); 335 workerClientWrapper->didReceiveBinaryData(binaryData);
365 } 336 }
366 337
367 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vec tor<char> > binaryData) 338 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vec tor<char> > binaryData)
368 { 339 {
369 ASSERT(isMainThread()); 340 ASSERT(isMainThread());
370 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData)); 341 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData));
371 } 342 }
372 343
373 static void workerGlobalScopeDidUpdateBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrap per, unsigned long bufferedAmount) 344 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper, unsigned long consumed)
374 { 345 {
375 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 346 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
376 workerClientWrapper->didUpdateBufferedAmount(bufferedAmount); 347 workerClientWrapper->didConsumeBufferedAmount(consumed);
377 } 348 }
378 349
379 void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned lo ng bufferedAmount) 350 void WorkerThreadableWebSocketChannel::Peer::didConsumeBufferedAmount(unsigned l ong consumed)
380 { 351 {
381 ASSERT(isMainThread()); 352 ASSERT(isMainThread());
382 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidUpdateBufferedAmount, m_workerClientWrapper.get(), bufferedAmount)); 353 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidConsumeBufferedAmount, m_workerClientWrapper.get(), consumed));
383 } 354 }
384 355
385 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper) 356 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper)
386 { 357 {
387 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 358 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
388 workerClientWrapper->didStartClosingHandshake(); 359 workerClientWrapper->didStartClosingHandshake();
389 } 360 }
390 361
391 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake() 362 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
392 { 363 {
393 ASSERT(isMainThread()); 364 ASSERT(isMainThread());
394 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidStartClosingHandshake, m_workerClientWrapper.get())); 365 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidStartClosingHandshake, m_workerClientWrapper.get()));
395 } 366 }
396 367
397 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillB eRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned l ong unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionS tatus closingHandshakeCompletion, unsigned short code, const String& reason) 368 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillB eRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketC hannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsig ned short code, const String& reason)
398 { 369 {
399 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 370 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
400 workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompl etion, code, reason); 371 workerClientWrapper->didClose(closingHandshakeCompletion, code, reason);
401 } 372 }
402 373
403 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBuf feredAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsign ed short code, const String& reason) 374 void WorkerThreadableWebSocketChannel::Peer::didClose(ClosingHandshakeCompletion Status closingHandshakeCompletion, unsigned short code, const String& reason)
404 { 375 {
405 ASSERT(isMainThread()); 376 ASSERT(isMainThread());
406 m_mainWebSocketChannel = nullptr; 377 m_mainWebSocketChannel = nullptr;
407 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidClose, m_workerClientWrapper.get(), unhandledBufferedAmount, closingHandsh akeCompletion, code, reason)); 378 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidClose, m_workerClientWrapper.get(), closingHandshakeCompletion, code, reas on));
408 } 379 }
409 380
410 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, P assRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapp er) 381 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, P assRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapp er)
411 { 382 {
412 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 383 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
413 workerClientWrapper->didReceiveMessageError(); 384 workerClientWrapper->didReceiveMessageError();
414 } 385 }
415 386
416 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError() 387 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
417 { 388 {
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
495 if (hasTerminatedPeer()) 466 if (hasTerminatedPeer())
496 return WebSocketChannel::SendFail; 467 return WebSocketChannel::SendFail;
497 468
498 RefPtr<Bridge> protect(this); 469 RefPtr<Bridge> protect(this);
499 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m _peer, data)))) 470 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m _peer, data))))
500 return WebSocketChannel::SendFail; 471 return WebSocketChannel::SendFail;
501 472
502 return m_syncHelper->sendRequestResult(); 473 return m_syncHelper->sendRequestResult();
503 } 474 }
504 475
505 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
506 {
507 if (hasTerminatedPeer())
508 return 0;
509
510 RefPtr<Bridge> protect(this);
511 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::bufferedAmo unt, m_peer))))
512 return 0;
513
514 return m_syncHelper->bufferedAmount();
515 }
516
517 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& rea son) 476 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& rea son)
518 { 477 {
519 if (hasTerminatedPeer()) 478 if (hasTerminatedPeer())
520 return; 479 return;
521 480
522 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_ peer, code, reason.isolatedCopy()))); 481 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_ peer, code, reason.isolatedCopy())));
523 } 482 }
524 483
525 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, Messag eLevel level, const String& sourceURL, unsigned lineNumber) 484 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, Messag eLevel level, const String& sourceURL, unsigned lineNumber)
526 { 485 {
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
565 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer))); 524 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer)));
566 // Peer::destroy() deletes m_peer and then m_syncHelper will be released. 525 // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
567 // We must not touch m_syncHelper any more. 526 // We must not touch m_syncHelper any more.
568 m_syncHelper = 0; 527 m_syncHelper = 0;
569 528
570 // We won't use this any more. 529 // We won't use this any more.
571 m_workerGlobalScope = nullptr; 530 m_workerGlobalScope = nullptr;
572 } 531 }
573 532
574 } // namespace WebCore 533 } // namespace WebCore
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698