Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1090)

Unified Diff: net/websockets/websocket_throttle.cc

Issue 669157: Refactor WebSocket throttling feature. (Closed)
Patch Set: Fix for tyoshino's comment Created 10 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/websockets/websocket_throttle.h ('k') | net/websockets/websocket_throttle_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/websockets/websocket_throttle.cc
diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc
index 8d0d1fbff528493c17facf7e5a10b4e49f515398..e2e98c3ad0cd22b86d1779cae5253161d6527ed7 100644
--- a/net/websockets/websocket_throttle.cc
+++ b/net/websockets/websocket_throttle.cc
@@ -13,6 +13,7 @@
#include "net/base/io_buffer.h"
#include "net/base/sys_addrinfo.h"
#include "net/socket_stream/socket_stream.h"
+#include "net/websockets/websocket_job.h"
namespace net {
@@ -41,119 +42,7 @@ static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) {
}
}
-// State for WebSocket protocol on each SocketStream.
-// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName.
-// This is alive between connection starts and handshake is finished.
-// In this class, it doesn't check actual handshake finishes, but only checks
-// end of header is found in read data.
-class WebSocketThrottle::WebSocketState : public SocketStream::UserData {
- public:
- explicit WebSocketState(const AddressList& addrs)
- : address_list_(addrs),
- callback_(NULL),
- waiting_(false),
- handshake_finished_(false),
- buffer_(NULL) {
- }
- ~WebSocketState() {}
-
- int OnStartOpenConnection(CompletionCallback* callback) {
- DCHECK(!callback_);
- if (!waiting_)
- return OK;
- callback_ = callback;
- return ERR_IO_PENDING;
- }
-
- int OnRead(const char* data, int len, CompletionCallback* callback) {
- DCHECK(!waiting_);
- DCHECK(!callback_);
- DCHECK(!handshake_finished_);
- static const int kBufferSize = 8129;
-
- if (!buffer_) {
- // Fast path.
- int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0);
- if (eoh > 0) {
- handshake_finished_ = true;
- return OK;
- }
- buffer_ = new GrowableIOBuffer();
- buffer_->SetCapacity(kBufferSize);
- } else if (buffer_->RemainingCapacity() < len) {
- buffer_->SetCapacity(buffer_->capacity() + kBufferSize);
- }
- memcpy(buffer_->data(), data, len);
- buffer_->set_offset(buffer_->offset() + len);
-
- int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(),
- buffer_->offset(), 0);
- handshake_finished_ = (eoh > 0);
- return OK;
- }
-
- const AddressList& address_list() const { return address_list_; }
- void SetWaiting() { waiting_ = true; }
- bool IsWaiting() const { return waiting_; }
- bool HandshakeFinished() const { return handshake_finished_; }
- void Wakeup() {
- waiting_ = false;
- // We wrap |callback_| to keep this alive while this is released.
- scoped_refptr<CompletionCallbackRunner> runner =
- new CompletionCallbackRunner(callback_);
- callback_ = NULL;
- MessageLoopForIO::current()->PostTask(
- FROM_HERE,
- NewRunnableMethod(runner.get(),
- &CompletionCallbackRunner::Run));
- }
-
- static const char* kKeyName;
-
- private:
- class CompletionCallbackRunner
- : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
- public:
- explicit CompletionCallbackRunner(CompletionCallback* callback)
- : callback_(callback) {
- DCHECK(callback_);
- }
- void Run() {
- callback_->Run(OK);
- }
- private:
- friend class base::RefCountedThreadSafe<CompletionCallbackRunner>;
-
- virtual ~CompletionCallbackRunner() {}
-
- CompletionCallback* callback_;
-
- DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
- };
-
- const AddressList& address_list_;
-
- CompletionCallback* callback_;
- // True if waiting another websocket connection is established.
- // False if the websocket is performing handshaking.
- bool waiting_;
-
- // True if the websocket handshake is completed.
- // If true, it will be removed from queue and deleted from the SocketStream
- // UserData soon.
- bool handshake_finished_;
-
- // Buffer for read data to check handshake response message.
- scoped_refptr<GrowableIOBuffer> buffer_;
-
- DISALLOW_COPY_AND_ASSIGN(WebSocketState);
-};
-
-const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState";
-
WebSocketThrottle::WebSocketThrottle() {
- SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this);
- SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this);
}
WebSocketThrottle::~WebSocketThrottle() {
@@ -161,51 +50,9 @@ WebSocketThrottle::~WebSocketThrottle() {
DCHECK(addr_map_.empty());
}
-int WebSocketThrottle::OnStartOpenConnection(
- SocketStream* socket, CompletionCallback* callback) {
- WebSocketState* state = new WebSocketState(socket->address_list());
- PutInQueue(socket, state);
- return state->OnStartOpenConnection(callback);
-}
-
-int WebSocketThrottle::OnRead(SocketStream* socket,
- const char* data, int len,
- CompletionCallback* callback) {
- WebSocketState* state = static_cast<WebSocketState*>(
- socket->GetUserData(WebSocketState::kKeyName));
- // If no state, handshake was already completed. Do nothing.
- if (!state)
- return OK;
-
- int result = state->OnRead(data, len, callback);
- if (state->HandshakeFinished()) {
- RemoveFromQueue(socket, state);
- WakeupSocketIfNecessary();
- }
- return result;
-}
-
-int WebSocketThrottle::OnWrite(SocketStream* socket,
- const char* data, int len,
- CompletionCallback* callback) {
- // Do nothing.
- return OK;
-}
-
-void WebSocketThrottle::OnClose(SocketStream* socket) {
- WebSocketState* state = static_cast<WebSocketState*>(
- socket->GetUserData(WebSocketState::kKeyName));
- if (!state)
- return;
- RemoveFromQueue(socket, state);
- WakeupSocketIfNecessary();
-}
-
-void WebSocketThrottle::PutInQueue(SocketStream* socket,
- WebSocketState* state) {
- socket->SetUserData(WebSocketState::kKeyName, state);
- queue_.push_back(state);
- const AddressList& address_list = socket->address_list();
+void WebSocketThrottle::PutInQueue(WebSocketJob* job) {
+ queue_.push_back(job);
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -213,18 +60,29 @@ void WebSocketThrottle::PutInQueue(SocketStream* socket,
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
if (iter == addr_map_.end()) {
ConnectingQueue* queue = new ConnectingQueue();
- queue->push_back(state);
+ queue->push_back(job);
addr_map_[addrkey] = queue;
} else {
- iter->second->push_back(state);
- state->SetWaiting();
+ iter->second->push_back(job);
+ job->SetWaiting();
}
}
}
-void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
- WebSocketState* state) {
- const AddressList& address_list = socket->address_list();
+void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) {
+ bool in_queue = false;
+ for (ConnectingQueue::iterator iter = queue_.begin();
+ iter != queue_.end();
+ ++iter) {
+ if (*iter == job) {
+ queue_.erase(iter);
+ in_queue = true;
+ break;
+ }
+ }
+ if (!in_queue)
+ return;
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -232,34 +90,32 @@ void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
DCHECK(iter != addr_map_.end());
ConnectingQueue* queue = iter->second;
- DCHECK(state == queue->front());
- queue->pop_front();
+ // Job may not be front of queue when job is closed early while waiting.
+ for (ConnectingQueue::iterator iter = queue->begin();
+ iter != queue->end();
+ ++iter) {
+ if (*iter == job) {
+ queue->erase(iter);
+ break;
+ }
+ }
if (queue->empty()) {
delete queue;
addr_map_.erase(iter);
}
}
- for (ConnectingQueue::iterator iter = queue_.begin();
- iter != queue_.end();
- ++iter) {
- if (*iter == state) {
- queue_.erase(iter);
- break;
- }
- }
- socket->SetUserData(WebSocketState::kKeyName, NULL);
}
void WebSocketThrottle::WakeupSocketIfNecessary() {
for (ConnectingQueue::iterator iter = queue_.begin();
iter != queue_.end();
++iter) {
- WebSocketState* state = *iter;
- if (!state->IsWaiting())
+ WebSocketJob* job = *iter;
+ if (!job->IsWaiting())
continue;
bool should_wakeup = true;
- const AddressList& address_list = state->address_list();
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -267,19 +123,14 @@ void WebSocketThrottle::WakeupSocketIfNecessary() {
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
DCHECK(iter != addr_map_.end());
ConnectingQueue* queue = iter->second;
- if (state != queue->front()) {
+ if (job != queue->front()) {
should_wakeup = false;
break;
}
}
if (should_wakeup)
- state->Wakeup();
+ job->Wakeup();
}
}
-/* static */
-void WebSocketThrottle::Init() {
- Singleton<WebSocketThrottle>::get();
-}
-
} // namespace net
« no previous file with comments | « net/websockets/websocket_throttle.h ('k') | net/websockets/websocket_throttle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698