Index: net/websockets/websocket_throttle.cc |
diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc |
index 8d0d1fbff528493c17facf7e5a10b4e49f515398..e2e98c3ad0cd22b86d1779cae5253161d6527ed7 100644 |
--- a/net/websockets/websocket_throttle.cc |
+++ b/net/websockets/websocket_throttle.cc |
@@ -13,6 +13,7 @@ |
#include "net/base/io_buffer.h" |
#include "net/base/sys_addrinfo.h" |
#include "net/socket_stream/socket_stream.h" |
+#include "net/websockets/websocket_job.h" |
namespace net { |
@@ -41,119 +42,7 @@ static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) { |
} |
} |
-// State for WebSocket protocol on each SocketStream. |
-// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName. |
-// This is alive between connection starts and handshake is finished. |
-// In this class, it doesn't check actual handshake finishes, but only checks |
-// end of header is found in read data. |
-class WebSocketThrottle::WebSocketState : public SocketStream::UserData { |
- public: |
- explicit WebSocketState(const AddressList& addrs) |
- : address_list_(addrs), |
- callback_(NULL), |
- waiting_(false), |
- handshake_finished_(false), |
- buffer_(NULL) { |
- } |
- ~WebSocketState() {} |
- |
- int OnStartOpenConnection(CompletionCallback* callback) { |
- DCHECK(!callback_); |
- if (!waiting_) |
- return OK; |
- callback_ = callback; |
- return ERR_IO_PENDING; |
- } |
- |
- int OnRead(const char* data, int len, CompletionCallback* callback) { |
- DCHECK(!waiting_); |
- DCHECK(!callback_); |
- DCHECK(!handshake_finished_); |
- static const int kBufferSize = 8129; |
- |
- if (!buffer_) { |
- // Fast path. |
- int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0); |
- if (eoh > 0) { |
- handshake_finished_ = true; |
- return OK; |
- } |
- buffer_ = new GrowableIOBuffer(); |
- buffer_->SetCapacity(kBufferSize); |
- } else if (buffer_->RemainingCapacity() < len) { |
- buffer_->SetCapacity(buffer_->capacity() + kBufferSize); |
- } |
- memcpy(buffer_->data(), data, len); |
- buffer_->set_offset(buffer_->offset() + len); |
- |
- int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(), |
- buffer_->offset(), 0); |
- handshake_finished_ = (eoh > 0); |
- return OK; |
- } |
- |
- const AddressList& address_list() const { return address_list_; } |
- void SetWaiting() { waiting_ = true; } |
- bool IsWaiting() const { return waiting_; } |
- bool HandshakeFinished() const { return handshake_finished_; } |
- void Wakeup() { |
- waiting_ = false; |
- // We wrap |callback_| to keep this alive while this is released. |
- scoped_refptr<CompletionCallbackRunner> runner = |
- new CompletionCallbackRunner(callback_); |
- callback_ = NULL; |
- MessageLoopForIO::current()->PostTask( |
- FROM_HERE, |
- NewRunnableMethod(runner.get(), |
- &CompletionCallbackRunner::Run)); |
- } |
- |
- static const char* kKeyName; |
- |
- private: |
- class CompletionCallbackRunner |
- : public base::RefCountedThreadSafe<CompletionCallbackRunner> { |
- public: |
- explicit CompletionCallbackRunner(CompletionCallback* callback) |
- : callback_(callback) { |
- DCHECK(callback_); |
- } |
- void Run() { |
- callback_->Run(OK); |
- } |
- private: |
- friend class base::RefCountedThreadSafe<CompletionCallbackRunner>; |
- |
- virtual ~CompletionCallbackRunner() {} |
- |
- CompletionCallback* callback_; |
- |
- DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner); |
- }; |
- |
- const AddressList& address_list_; |
- |
- CompletionCallback* callback_; |
- // True if waiting another websocket connection is established. |
- // False if the websocket is performing handshaking. |
- bool waiting_; |
- |
- // True if the websocket handshake is completed. |
- // If true, it will be removed from queue and deleted from the SocketStream |
- // UserData soon. |
- bool handshake_finished_; |
- |
- // Buffer for read data to check handshake response message. |
- scoped_refptr<GrowableIOBuffer> buffer_; |
- |
- DISALLOW_COPY_AND_ASSIGN(WebSocketState); |
-}; |
- |
-const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState"; |
- |
WebSocketThrottle::WebSocketThrottle() { |
- SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this); |
- SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this); |
} |
WebSocketThrottle::~WebSocketThrottle() { |
@@ -161,51 +50,9 @@ WebSocketThrottle::~WebSocketThrottle() { |
DCHECK(addr_map_.empty()); |
} |
-int WebSocketThrottle::OnStartOpenConnection( |
- SocketStream* socket, CompletionCallback* callback) { |
- WebSocketState* state = new WebSocketState(socket->address_list()); |
- PutInQueue(socket, state); |
- return state->OnStartOpenConnection(callback); |
-} |
- |
-int WebSocketThrottle::OnRead(SocketStream* socket, |
- const char* data, int len, |
- CompletionCallback* callback) { |
- WebSocketState* state = static_cast<WebSocketState*>( |
- socket->GetUserData(WebSocketState::kKeyName)); |
- // If no state, handshake was already completed. Do nothing. |
- if (!state) |
- return OK; |
- |
- int result = state->OnRead(data, len, callback); |
- if (state->HandshakeFinished()) { |
- RemoveFromQueue(socket, state); |
- WakeupSocketIfNecessary(); |
- } |
- return result; |
-} |
- |
-int WebSocketThrottle::OnWrite(SocketStream* socket, |
- const char* data, int len, |
- CompletionCallback* callback) { |
- // Do nothing. |
- return OK; |
-} |
- |
-void WebSocketThrottle::OnClose(SocketStream* socket) { |
- WebSocketState* state = static_cast<WebSocketState*>( |
- socket->GetUserData(WebSocketState::kKeyName)); |
- if (!state) |
- return; |
- RemoveFromQueue(socket, state); |
- WakeupSocketIfNecessary(); |
-} |
- |
-void WebSocketThrottle::PutInQueue(SocketStream* socket, |
- WebSocketState* state) { |
- socket->SetUserData(WebSocketState::kKeyName, state); |
- queue_.push_back(state); |
- const AddressList& address_list = socket->address_list(); |
+void WebSocketThrottle::PutInQueue(WebSocketJob* job) { |
+ queue_.push_back(job); |
+ const AddressList& address_list = job->address_list(); |
for (const struct addrinfo* addrinfo = address_list.head(); |
addrinfo != NULL; |
addrinfo = addrinfo->ai_next) { |
@@ -213,18 +60,29 @@ void WebSocketThrottle::PutInQueue(SocketStream* socket, |
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); |
if (iter == addr_map_.end()) { |
ConnectingQueue* queue = new ConnectingQueue(); |
- queue->push_back(state); |
+ queue->push_back(job); |
addr_map_[addrkey] = queue; |
} else { |
- iter->second->push_back(state); |
- state->SetWaiting(); |
+ iter->second->push_back(job); |
+ job->SetWaiting(); |
} |
} |
} |
-void WebSocketThrottle::RemoveFromQueue(SocketStream* socket, |
- WebSocketState* state) { |
- const AddressList& address_list = socket->address_list(); |
+void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { |
+ bool in_queue = false; |
+ for (ConnectingQueue::iterator iter = queue_.begin(); |
+ iter != queue_.end(); |
+ ++iter) { |
+ if (*iter == job) { |
+ queue_.erase(iter); |
+ in_queue = true; |
+ break; |
+ } |
+ } |
+ if (!in_queue) |
+ return; |
+ const AddressList& address_list = job->address_list(); |
for (const struct addrinfo* addrinfo = address_list.head(); |
addrinfo != NULL; |
addrinfo = addrinfo->ai_next) { |
@@ -232,34 +90,32 @@ void WebSocketThrottle::RemoveFromQueue(SocketStream* socket, |
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); |
DCHECK(iter != addr_map_.end()); |
ConnectingQueue* queue = iter->second; |
- DCHECK(state == queue->front()); |
- queue->pop_front(); |
+ // Job may not be front of queue when job is closed early while waiting. |
+ for (ConnectingQueue::iterator iter = queue->begin(); |
+ iter != queue->end(); |
+ ++iter) { |
+ if (*iter == job) { |
+ queue->erase(iter); |
+ break; |
+ } |
+ } |
if (queue->empty()) { |
delete queue; |
addr_map_.erase(iter); |
} |
} |
- for (ConnectingQueue::iterator iter = queue_.begin(); |
- iter != queue_.end(); |
- ++iter) { |
- if (*iter == state) { |
- queue_.erase(iter); |
- break; |
- } |
- } |
- socket->SetUserData(WebSocketState::kKeyName, NULL); |
} |
void WebSocketThrottle::WakeupSocketIfNecessary() { |
for (ConnectingQueue::iterator iter = queue_.begin(); |
iter != queue_.end(); |
++iter) { |
- WebSocketState* state = *iter; |
- if (!state->IsWaiting()) |
+ WebSocketJob* job = *iter; |
+ if (!job->IsWaiting()) |
continue; |
bool should_wakeup = true; |
- const AddressList& address_list = state->address_list(); |
+ const AddressList& address_list = job->address_list(); |
for (const struct addrinfo* addrinfo = address_list.head(); |
addrinfo != NULL; |
addrinfo = addrinfo->ai_next) { |
@@ -267,19 +123,14 @@ void WebSocketThrottle::WakeupSocketIfNecessary() { |
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); |
DCHECK(iter != addr_map_.end()); |
ConnectingQueue* queue = iter->second; |
- if (state != queue->front()) { |
+ if (job != queue->front()) { |
should_wakeup = false; |
break; |
} |
} |
if (should_wakeup) |
- state->Wakeup(); |
+ job->Wakeup(); |
} |
} |
-/* static */ |
-void WebSocketThrottle::Init() { |
- Singleton<WebSocketThrottle>::get(); |
-} |
- |
} // namespace net |