Index: net/websockets/websocket_job.cc |
diff --git a/net/websockets/websocket_job.cc b/net/websockets/websocket_job.cc |
index 59acfc5ef8f62bc5729456e2613daa9271c240f0..62a62b7246d988e168477b11d401ddf9af72e960 100644 |
--- a/net/websockets/websocket_job.cc |
+++ b/net/websockets/websocket_job.cc |
@@ -10,6 +10,31 @@ |
#include "net/base/cookie_store.h" |
#include "net/http/http_util.h" |
#include "net/url_request/url_request_context.h" |
+#include "net/websockets/websocket_throttle.h" |
+ |
+namespace { |
+ |
+class CompletionCallbackRunner |
+ : public base::RefCountedThreadSafe<CompletionCallbackRunner> { |
+ public: |
+ explicit CompletionCallbackRunner(net::CompletionCallback* callback) |
+ : callback_(callback) { |
+ DCHECK(callback_); |
+ } |
+ void Run() { |
+ callback_->Run(net::OK); |
+ } |
+ private: |
+ friend class base::RefCountedThreadSafe<CompletionCallbackRunner>; |
+ |
+ virtual ~CompletionCallbackRunner() {} |
+ |
+ net::CompletionCallback* callback_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner); |
+}; |
+ |
+} |
namespace net { |
@@ -75,6 +100,8 @@ void WebSocketJob::EnsureInit() { |
WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate) |
: delegate_(delegate), |
state_(INITIALIZED), |
+ waiting_(false), |
+ callback_(NULL), |
handshake_request_sent_(0), |
handshake_response_header_length_(0), |
response_cookies_save_index_(0), |
@@ -128,12 +155,27 @@ void WebSocketJob::RestartWithAuth( |
void WebSocketJob::DetachDelegate() { |
state_ = CLOSED; |
+ Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); |
+ Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); |
+ |
delegate_ = NULL; |
if (socket_) |
socket_->DetachDelegate(); |
socket_ = NULL; |
} |
+int WebSocketJob::OnStartOpenConnection( |
+ SocketStream* socket, CompletionCallback* callback) { |
+ DCHECK(!callback_); |
+ state_ = CONNECTING; |
+ addresses_.Copy(socket->address_list().head(), true); |
+ Singleton<WebSocketThrottle>::get()->PutInQueue(this); |
+ if (!waiting_) |
+ return OK; |
+ callback_ = callback; |
+ return ERR_IO_PENDING; |
+} |
+ |
void WebSocketJob::OnConnected( |
SocketStream* socket, int max_pending_send_allowed) { |
if (delegate_) |
@@ -161,6 +203,9 @@ void WebSocketJob::OnReceivedData( |
void WebSocketJob::OnClose(SocketStream* socket) { |
state_ = CLOSED; |
+ Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); |
+ Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); |
+ |
SocketStream::Delegate* delegate = delegate_; |
delegate_ = NULL; |
socket_ = NULL; |
@@ -325,6 +370,9 @@ void WebSocketJob::SaveNextCookie() { |
"\r\n" + |
remaining_data; |
state_ = OPEN; |
+ Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); |
+ Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); |
+ |
if (delegate_) |
delegate_->OnReceivedData(socket_, |
received_data.data(), received_data.size()); |
@@ -376,4 +424,29 @@ GURL WebSocketJob::GetURLForCookies() const { |
return url.ReplaceComponents(replacements); |
} |
+const AddressList& WebSocketJob::address_list() const { |
+ return addresses_; |
+} |
+ |
+void WebSocketJob::SetWaiting() { |
+ waiting_ = true; |
+} |
+ |
+bool WebSocketJob::IsWaiting() const { |
+ return waiting_; |
+} |
+ |
+void WebSocketJob::Wakeup() { |
+ waiting_ = false; |
+ DCHECK(callback_); |
+ // We wrap |callback_| to keep this alive while this is released. |
+ scoped_refptr<CompletionCallbackRunner> runner = |
+ new CompletionCallbackRunner(callback_); |
+ callback_ = NULL; |
+ MessageLoopForIO::current()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(runner.get(), |
+ &CompletionCallbackRunner::Run)); |
+} |
+ |
} // namespace net |