| Index: net/websockets/websocket_throttle.cc
|
| diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc
|
| index 8d0d1fbff528493c17facf7e5a10b4e49f515398..e2e98c3ad0cd22b86d1779cae5253161d6527ed7 100644
|
| --- a/net/websockets/websocket_throttle.cc
|
| +++ b/net/websockets/websocket_throttle.cc
|
| @@ -13,6 +13,7 @@
|
| #include "net/base/io_buffer.h"
|
| #include "net/base/sys_addrinfo.h"
|
| #include "net/socket_stream/socket_stream.h"
|
| +#include "net/websockets/websocket_job.h"
|
|
|
| namespace net {
|
|
|
| @@ -41,119 +42,7 @@ static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) {
|
| }
|
| }
|
|
|
| -// State for WebSocket protocol on each SocketStream.
|
| -// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName.
|
| -// This is alive between connection starts and handshake is finished.
|
| -// In this class, it doesn't check actual handshake finishes, but only checks
|
| -// end of header is found in read data.
|
| -class WebSocketThrottle::WebSocketState : public SocketStream::UserData {
|
| - public:
|
| - explicit WebSocketState(const AddressList& addrs)
|
| - : address_list_(addrs),
|
| - callback_(NULL),
|
| - waiting_(false),
|
| - handshake_finished_(false),
|
| - buffer_(NULL) {
|
| - }
|
| - ~WebSocketState() {}
|
| -
|
| - int OnStartOpenConnection(CompletionCallback* callback) {
|
| - DCHECK(!callback_);
|
| - if (!waiting_)
|
| - return OK;
|
| - callback_ = callback;
|
| - return ERR_IO_PENDING;
|
| - }
|
| -
|
| - int OnRead(const char* data, int len, CompletionCallback* callback) {
|
| - DCHECK(!waiting_);
|
| - DCHECK(!callback_);
|
| - DCHECK(!handshake_finished_);
|
| - static const int kBufferSize = 8129;
|
| -
|
| - if (!buffer_) {
|
| - // Fast path.
|
| - int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0);
|
| - if (eoh > 0) {
|
| - handshake_finished_ = true;
|
| - return OK;
|
| - }
|
| - buffer_ = new GrowableIOBuffer();
|
| - buffer_->SetCapacity(kBufferSize);
|
| - } else if (buffer_->RemainingCapacity() < len) {
|
| - buffer_->SetCapacity(buffer_->capacity() + kBufferSize);
|
| - }
|
| - memcpy(buffer_->data(), data, len);
|
| - buffer_->set_offset(buffer_->offset() + len);
|
| -
|
| - int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(),
|
| - buffer_->offset(), 0);
|
| - handshake_finished_ = (eoh > 0);
|
| - return OK;
|
| - }
|
| -
|
| - const AddressList& address_list() const { return address_list_; }
|
| - void SetWaiting() { waiting_ = true; }
|
| - bool IsWaiting() const { return waiting_; }
|
| - bool HandshakeFinished() const { return handshake_finished_; }
|
| - void Wakeup() {
|
| - waiting_ = false;
|
| - // We wrap |callback_| to keep this alive while this is released.
|
| - scoped_refptr<CompletionCallbackRunner> runner =
|
| - new CompletionCallbackRunner(callback_);
|
| - callback_ = NULL;
|
| - MessageLoopForIO::current()->PostTask(
|
| - FROM_HERE,
|
| - NewRunnableMethod(runner.get(),
|
| - &CompletionCallbackRunner::Run));
|
| - }
|
| -
|
| - static const char* kKeyName;
|
| -
|
| - private:
|
| - class CompletionCallbackRunner
|
| - : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
|
| - public:
|
| - explicit CompletionCallbackRunner(CompletionCallback* callback)
|
| - : callback_(callback) {
|
| - DCHECK(callback_);
|
| - }
|
| - void Run() {
|
| - callback_->Run(OK);
|
| - }
|
| - private:
|
| - friend class base::RefCountedThreadSafe<CompletionCallbackRunner>;
|
| -
|
| - virtual ~CompletionCallbackRunner() {}
|
| -
|
| - CompletionCallback* callback_;
|
| -
|
| - DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
|
| - };
|
| -
|
| - const AddressList& address_list_;
|
| -
|
| - CompletionCallback* callback_;
|
| - // True if waiting another websocket connection is established.
|
| - // False if the websocket is performing handshaking.
|
| - bool waiting_;
|
| -
|
| - // True if the websocket handshake is completed.
|
| - // If true, it will be removed from queue and deleted from the SocketStream
|
| - // UserData soon.
|
| - bool handshake_finished_;
|
| -
|
| - // Buffer for read data to check handshake response message.
|
| - scoped_refptr<GrowableIOBuffer> buffer_;
|
| -
|
| - DISALLOW_COPY_AND_ASSIGN(WebSocketState);
|
| -};
|
| -
|
| -const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState";
|
| -
|
| WebSocketThrottle::WebSocketThrottle() {
|
| - SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this);
|
| - SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this);
|
| }
|
|
|
| WebSocketThrottle::~WebSocketThrottle() {
|
| @@ -161,51 +50,9 @@ WebSocketThrottle::~WebSocketThrottle() {
|
| DCHECK(addr_map_.empty());
|
| }
|
|
|
| -int WebSocketThrottle::OnStartOpenConnection(
|
| - SocketStream* socket, CompletionCallback* callback) {
|
| - WebSocketState* state = new WebSocketState(socket->address_list());
|
| - PutInQueue(socket, state);
|
| - return state->OnStartOpenConnection(callback);
|
| -}
|
| -
|
| -int WebSocketThrottle::OnRead(SocketStream* socket,
|
| - const char* data, int len,
|
| - CompletionCallback* callback) {
|
| - WebSocketState* state = static_cast<WebSocketState*>(
|
| - socket->GetUserData(WebSocketState::kKeyName));
|
| - // If no state, handshake was already completed. Do nothing.
|
| - if (!state)
|
| - return OK;
|
| -
|
| - int result = state->OnRead(data, len, callback);
|
| - if (state->HandshakeFinished()) {
|
| - RemoveFromQueue(socket, state);
|
| - WakeupSocketIfNecessary();
|
| - }
|
| - return result;
|
| -}
|
| -
|
| -int WebSocketThrottle::OnWrite(SocketStream* socket,
|
| - const char* data, int len,
|
| - CompletionCallback* callback) {
|
| - // Do nothing.
|
| - return OK;
|
| -}
|
| -
|
| -void WebSocketThrottle::OnClose(SocketStream* socket) {
|
| - WebSocketState* state = static_cast<WebSocketState*>(
|
| - socket->GetUserData(WebSocketState::kKeyName));
|
| - if (!state)
|
| - return;
|
| - RemoveFromQueue(socket, state);
|
| - WakeupSocketIfNecessary();
|
| -}
|
| -
|
| -void WebSocketThrottle::PutInQueue(SocketStream* socket,
|
| - WebSocketState* state) {
|
| - socket->SetUserData(WebSocketState::kKeyName, state);
|
| - queue_.push_back(state);
|
| - const AddressList& address_list = socket->address_list();
|
| +void WebSocketThrottle::PutInQueue(WebSocketJob* job) {
|
| + queue_.push_back(job);
|
| + const AddressList& address_list = job->address_list();
|
| for (const struct addrinfo* addrinfo = address_list.head();
|
| addrinfo != NULL;
|
| addrinfo = addrinfo->ai_next) {
|
| @@ -213,18 +60,29 @@ void WebSocketThrottle::PutInQueue(SocketStream* socket,
|
| ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
|
| if (iter == addr_map_.end()) {
|
| ConnectingQueue* queue = new ConnectingQueue();
|
| - queue->push_back(state);
|
| + queue->push_back(job);
|
| addr_map_[addrkey] = queue;
|
| } else {
|
| - iter->second->push_back(state);
|
| - state->SetWaiting();
|
| + iter->second->push_back(job);
|
| + job->SetWaiting();
|
| }
|
| }
|
| }
|
|
|
| -void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
|
| - WebSocketState* state) {
|
| - const AddressList& address_list = socket->address_list();
|
| +void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) {
|
| + bool in_queue = false;
|
| + for (ConnectingQueue::iterator iter = queue_.begin();
|
| + iter != queue_.end();
|
| + ++iter) {
|
| + if (*iter == job) {
|
| + queue_.erase(iter);
|
| + in_queue = true;
|
| + break;
|
| + }
|
| + }
|
| + if (!in_queue)
|
| + return;
|
| + const AddressList& address_list = job->address_list();
|
| for (const struct addrinfo* addrinfo = address_list.head();
|
| addrinfo != NULL;
|
| addrinfo = addrinfo->ai_next) {
|
| @@ -232,34 +90,32 @@ void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
|
| ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
|
| DCHECK(iter != addr_map_.end());
|
| ConnectingQueue* queue = iter->second;
|
| - DCHECK(state == queue->front());
|
| - queue->pop_front();
|
| + // Job may not be front of queue when job is closed early while waiting.
|
| + for (ConnectingQueue::iterator iter = queue->begin();
|
| + iter != queue->end();
|
| + ++iter) {
|
| + if (*iter == job) {
|
| + queue->erase(iter);
|
| + break;
|
| + }
|
| + }
|
| if (queue->empty()) {
|
| delete queue;
|
| addr_map_.erase(iter);
|
| }
|
| }
|
| - for (ConnectingQueue::iterator iter = queue_.begin();
|
| - iter != queue_.end();
|
| - ++iter) {
|
| - if (*iter == state) {
|
| - queue_.erase(iter);
|
| - break;
|
| - }
|
| - }
|
| - socket->SetUserData(WebSocketState::kKeyName, NULL);
|
| }
|
|
|
| void WebSocketThrottle::WakeupSocketIfNecessary() {
|
| for (ConnectingQueue::iterator iter = queue_.begin();
|
| iter != queue_.end();
|
| ++iter) {
|
| - WebSocketState* state = *iter;
|
| - if (!state->IsWaiting())
|
| + WebSocketJob* job = *iter;
|
| + if (!job->IsWaiting())
|
| continue;
|
|
|
| bool should_wakeup = true;
|
| - const AddressList& address_list = state->address_list();
|
| + const AddressList& address_list = job->address_list();
|
| for (const struct addrinfo* addrinfo = address_list.head();
|
| addrinfo != NULL;
|
| addrinfo = addrinfo->ai_next) {
|
| @@ -267,19 +123,14 @@ void WebSocketThrottle::WakeupSocketIfNecessary() {
|
| ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
|
| DCHECK(iter != addr_map_.end());
|
| ConnectingQueue* queue = iter->second;
|
| - if (state != queue->front()) {
|
| + if (job != queue->front()) {
|
| should_wakeup = false;
|
| break;
|
| }
|
| }
|
| if (should_wakeup)
|
| - state->Wakeup();
|
| + job->Wakeup();
|
| }
|
| }
|
|
|
| -/* static */
|
| -void WebSocketThrottle::Init() {
|
| - Singleton<WebSocketThrottle>::get();
|
| -}
|
| -
|
| } // namespace net
|
|
|