Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(103)

Side by Side Diff: trunk/Source/modules/websockets/WorkerThreadableWebSocketChannel.cpp

Issue 338243006: Revert 176298 "[WebSocket] bufferedAmount should not decrease in..." (Closed) Base URL: svn://svn.chromium.org/blink/
Patch Set: Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 /* 1 /*
2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved. 2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
3 * 3 *
4 * Redistribution and use in source and binary forms, with or without 4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are 5 * modification, are permitted provided that the following conditions are
6 * met: 6 * met:
7 * 7 *
8 * * Redistributions of source code must retain the above copyright 8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer. 9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above 10 * * Redistributions in binary form must reproduce the above
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 67
68 // All setters are called on the main thread. 68 // All setters are called on the main thread.
69 void setConnectRequestResult(bool connectRequestResult) 69 void setConnectRequestResult(bool connectRequestResult)
70 { 70 {
71 m_connectRequestResult = connectRequestResult; 71 m_connectRequestResult = connectRequestResult;
72 } 72 }
73 void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult) 73 void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult)
74 { 74 {
75 m_sendRequestResult = sendRequestResult; 75 m_sendRequestResult = sendRequestResult;
76 } 76 }
77 void setBufferedAmount(unsigned long bufferedAmount)
78 {
79 m_bufferedAmount = bufferedAmount;
80 }
77 81
78 // All getter are called on the worker thread. 82 // All getter are called on the worker thread.
79 bool connectRequestResult() const 83 bool connectRequestResult() const
80 { 84 {
81 return m_connectRequestResult; 85 return m_connectRequestResult;
82 } 86 }
83 WebSocketChannel::SendResult sendRequestResult() const 87 WebSocketChannel::SendResult sendRequestResult() const
84 { 88 {
85 return m_sendRequestResult; 89 return m_sendRequestResult;
86 } 90 }
91 unsigned long bufferedAmount() const
92 {
93 return m_bufferedAmount;
94 }
87 95
88 // This should be called after all setters are called and before any 96 // This should be called after all setters are called and before any
89 // getters are called. 97 // getters are called.
90 void signalWorkerThread() 98 void signalWorkerThread()
91 { 99 {
92 m_event->signal(); 100 m_event->signal();
93 } 101 }
94 102
95 blink::WebWaitableEvent* event() const 103 blink::WebWaitableEvent* event() const
96 { 104 {
97 return m_event.get(); 105 return m_event.get();
98 } 106 }
99 107
100 private: 108 private:
101 ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> eve nt) 109 ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> eve nt)
102 : m_event(event) 110 : m_event(event)
103 , m_connectRequestResult(false) 111 , m_connectRequestResult(false)
104 , m_sendRequestResult(WebSocketChannel::SendFail) 112 , m_sendRequestResult(WebSocketChannel::SendFail)
113 , m_bufferedAmount(0)
105 { 114 {
106 } 115 }
107 116
108 OwnPtr<blink::WebWaitableEvent> m_event; 117 OwnPtr<blink::WebWaitableEvent> m_event;
109 bool m_connectRequestResult; 118 bool m_connectRequestResult;
110 WebSocketChannel::SendResult m_sendRequestResult; 119 WebSocketChannel::SendResult m_sendRequestResult;
120 unsigned long m_bufferedAmount;
111 }; 121 };
112 122
113 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL , unsigned lineNumber) 123 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalS cope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL , unsigned lineNumber)
114 : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(clie nt)) 124 : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(clie nt))
115 , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope)) 125 , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
116 , m_sourceURLAtConnection(sourceURL) 126 , m_sourceURLAtConnection(sourceURL)
117 , m_lineNumberAtConnection(lineNumber) 127 , m_lineNumberAtConnection(lineNumber)
118 { 128 {
119 ASSERT(m_workerClientWrapper.get()); 129 ASSERT(m_workerClientWrapper.get());
120 m_bridge->initialize(sourceURL, lineNumber); 130 m_bridge->initialize(sourceURL, lineNumber);
(...skipping 26 matching lines...) Expand all
147 return m_bridge->send(binaryData, byteOffset, byteLength); 157 return m_bridge->send(binaryData, byteOffset, byteLength);
148 } 158 }
149 159
150 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<B lobDataHandle> blobData) 160 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<B lobDataHandle> blobData)
151 { 161 {
152 if (!m_bridge) 162 if (!m_bridge)
153 return WebSocketChannel::SendFail; 163 return WebSocketChannel::SendFail;
154 return m_bridge->send(blobData); 164 return m_bridge->send(blobData);
155 } 165 }
156 166
167 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
168 {
169 if (!m_bridge)
170 return 0;
171 return m_bridge->bufferedAmount();
172 }
173
157 void WorkerThreadableWebSocketChannel::close(int code, const String& reason) 174 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
158 { 175 {
159 if (m_bridge) 176 if (m_bridge)
160 m_bridge->close(code, reason); 177 m_bridge->close(code, reason);
161 } 178 }
162 179
163 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel l evel, const String& sourceURL, unsigned lineNumber) 180 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel l evel, const String& sourceURL, unsigned lineNumber)
164 { 181 {
165 if (!m_bridge) 182 if (!m_bridge)
166 return; 183 return;
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
287 ASSERT(isMainThread()); 304 ASSERT(isMainThread());
288 if (!m_mainWebSocketChannel) { 305 if (!m_mainWebSocketChannel) {
289 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail); 306 m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
290 } else { 307 } else {
291 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel- >send(blobData); 308 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel- >send(blobData);
292 m_syncHelper->setSendRequestResult(sendRequestResult); 309 m_syncHelper->setSendRequestResult(sendRequestResult);
293 } 310 }
294 m_syncHelper->signalWorkerThread(); 311 m_syncHelper->signalWorkerThread();
295 } 312 }
296 313
314 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
315 {
316 ASSERT(isMainThread());
317 if (!m_mainWebSocketChannel) {
318 m_syncHelper->setBufferedAmount(0);
319 } else {
320 unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
321 m_syncHelper->setBufferedAmount(bufferedAmount);
322 }
323 m_syncHelper->signalWorkerThread();
324 }
325
297 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reaso n) 326 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reaso n)
298 { 327 {
299 ASSERT(isMainThread()); 328 ASSERT(isMainThread());
300 if (!m_mainWebSocketChannel) 329 if (!m_mainWebSocketChannel)
301 return; 330 return;
302 m_mainWebSocketChannel->close(code, reason); 331 m_mainWebSocketChannel->close(code, reason);
303 } 332 }
304 333
305 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageL evel level, const String& sourceURL, unsigned lineNumber) 334 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageL evel level, const String& sourceURL, unsigned lineNumber)
306 { 335 {
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
364 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 393 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
365 workerClientWrapper->didReceiveBinaryData(binaryData); 394 workerClientWrapper->didReceiveBinaryData(binaryData);
366 } 395 }
367 396
368 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vec tor<char> > binaryData) 397 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vec tor<char> > binaryData)
369 { 398 {
370 ASSERT(isMainThread()); 399 ASSERT(isMainThread());
371 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData)); 400 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData));
372 } 401 }
373 402
374 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper, unsigned long consumed) 403 static void workerGlobalScopeDidUpdateBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrap per, unsigned long bufferedAmount)
375 { 404 {
376 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 405 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
377 workerClientWrapper->didConsumeBufferedAmount(consumed); 406 workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
378 } 407 }
379 408
380 void WorkerThreadableWebSocketChannel::Peer::didConsumeBufferedAmount(unsigned l ong consumed) 409 void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned lo ng bufferedAmount)
381 { 410 {
382 ASSERT(isMainThread()); 411 ASSERT(isMainThread());
383 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidConsumeBufferedAmount, m_workerClientWrapper.get(), consumed)); 412 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidUpdateBufferedAmount, m_workerClientWrapper.get(), bufferedAmount));
384 } 413 }
385 414
386 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper) 415 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWra pper)
387 { 416 {
388 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 417 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
389 workerClientWrapper->didStartClosingHandshake(); 418 workerClientWrapper->didStartClosingHandshake();
390 } 419 }
391 420
392 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake() 421 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
393 { 422 {
394 ASSERT(isMainThread()); 423 ASSERT(isMainThread());
395 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidStartClosingHandshake, m_workerClientWrapper.get())); 424 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidStartClosingHandshake, m_workerClientWrapper.get()));
396 } 425 }
397 426
398 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillB eRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketC hannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsig ned short code, const String& reason) 427 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillB eRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned l ong unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionS tatus closingHandshakeCompletion, unsigned short code, const String& reason)
399 { 428 {
400 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 429 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
401 workerClientWrapper->didClose(closingHandshakeCompletion, code, reason); 430 workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompl etion, code, reason);
402 } 431 }
403 432
404 void WorkerThreadableWebSocketChannel::Peer::didClose(ClosingHandshakeCompletion Status closingHandshakeCompletion, unsigned short code, const String& reason) 433 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBuf feredAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsign ed short code, const String& reason)
405 { 434 {
406 ASSERT(isMainThread()); 435 ASSERT(isMainThread());
407 m_mainWebSocketChannel = nullptr; 436 m_mainWebSocketChannel = nullptr;
408 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidClose, m_workerClientWrapper.get(), closingHandshakeCompletion, code, reas on)); 437 m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalSc opeDidClose, m_workerClientWrapper.get(), unhandledBufferedAmount, closingHandsh akeCompletion, code, reason));
409 } 438 }
410 439
411 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, P assRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapp er) 440 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, P assRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapp er)
412 { 441 {
413 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 442 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
414 workerClientWrapper->didReceiveMessageError(); 443 workerClientWrapper->didReceiveMessageError();
415 } 444 }
416 445
417 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError() 446 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
418 { 447 {
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
496 if (hasTerminatedPeer()) 525 if (hasTerminatedPeer())
497 return WebSocketChannel::SendFail; 526 return WebSocketChannel::SendFail;
498 527
499 RefPtr<Bridge> protect(this); 528 RefPtr<Bridge> protect(this);
500 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m _peer, data)))) 529 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m _peer, data))))
501 return WebSocketChannel::SendFail; 530 return WebSocketChannel::SendFail;
502 531
503 return m_syncHelper->sendRequestResult(); 532 return m_syncHelper->sendRequestResult();
504 } 533 }
505 534
535 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
536 {
537 if (hasTerminatedPeer())
538 return 0;
539
540 RefPtr<Bridge> protect(this);
541 if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::bufferedAmo unt, m_peer))))
542 return 0;
543
544 return m_syncHelper->bufferedAmount();
545 }
546
506 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& rea son) 547 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& rea son)
507 { 548 {
508 if (hasTerminatedPeer()) 549 if (hasTerminatedPeer())
509 return; 550 return;
510 551
511 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_ peer, code, reason.isolatedCopy()))); 552 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_ peer, code, reason.isolatedCopy())));
512 } 553 }
513 554
514 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, Messag eLevel level, const String& sourceURL, unsigned lineNumber) 555 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, Messag eLevel level, const String& sourceURL, unsigned lineNumber)
515 { 556 {
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
570 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer))); 611 m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer)));
571 // Peer::destroy() deletes m_peer and then m_syncHelper will be released. 612 // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
572 // We must not touch m_syncHelper any more. 613 // We must not touch m_syncHelper any more.
573 m_syncHelper = 0; 614 m_syncHelper = 0;
574 615
575 // We won't use this any more. 616 // We won't use this any more.
576 m_workerGlobalScope = nullptr; 617 m_workerGlobalScope = nullptr;
577 } 618 }
578 619
579 } // namespace WebCore 620 } // namespace WebCore
OLDNEW
« no previous file with comments | « trunk/Source/modules/websockets/WorkerThreadableWebSocketChannel.h ('k') | trunk/Source/platform/network/SocketStreamHandle.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698