| Index: net/websockets/websocket_throttle.cc
|
| diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..fb320b647f092c19915ad00612e040adcea4d148
|
| --- /dev/null
|
| +++ b/net/websockets/websocket_throttle.cc
|
| @@ -0,0 +1,293 @@
|
| +// Copyright (c) 2009 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "net/websockets/websocket_throttle.h"
|
| +
|
| +#if defined(OS_WIN)
|
| +#include <ws2tcpip.h>
|
| +#else
|
| +#include <netdb.h>
|
| +#endif
|
| +
|
| +#include <string>
|
| +
|
| +#include "base/message_loop.h"
|
| +#include "base/ref_counted.h"
|
| +#include "base/singleton.h"
|
| +#include "base/string_util.h"
|
| +#include "net/base/io_buffer.h"
|
| +#include "net/socket_stream/socket_stream.h"
|
| +
|
| +namespace net {
|
| +
|
| +static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) {
|
| + switch (addrinfo->ai_family) {
|
| + case AF_INET: {
|
| + const struct sockaddr_in* const addr =
|
| + reinterpret_cast<const sockaddr_in*>(addrinfo->ai_addr);
|
| + return StringPrintf("%d:%s",
|
| + addrinfo->ai_family,
|
| + HexEncode(&addr->sin_addr, 4).c_str());
|
| + }
|
| + case AF_INET6: {
|
| + const struct sockaddr_in6* const addr6 =
|
| + reinterpret_cast<const sockaddr_in6*>(addrinfo->ai_addr);
|
| + return StringPrintf("%d:%s",
|
| + addrinfo->ai_family,
|
| + HexEncode(&addr6->sin6_addr,
|
| + sizeof(addr6->sin6_addr)).c_str());
|
| + }
|
| + default:
|
| + return StringPrintf("%d:%s",
|
| + addrinfo->ai_family,
|
| + HexEncode(addrinfo->ai_addr,
|
| + addrinfo->ai_addrlen).c_str());
|
| + }
|
| +}
|
| +
|
| +// State for WebSocket protocol on each SocketStream.
|
| +// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName.
|
| +// This is alive between connection starts and handshake is finished.
|
| +// In this class, it doesn't check actual handshake finishes, but only checks
|
| +// end of header is found in read data.
|
| +class WebSocketThrottle::WebSocketState : public SocketStream::UserData {
|
| + public:
|
| + explicit WebSocketState(const AddressList& addrs)
|
| + : address_list_(addrs),
|
| + callback_(NULL),
|
| + waiting_(false),
|
| + handshake_finished_(false),
|
| + buffer_(NULL) {
|
| + }
|
| + ~WebSocketState() {}
|
| +
|
| + int OnStartOpenConnection(CompletionCallback* callback) {
|
| + DCHECK(!callback_);
|
| + if (!waiting_)
|
| + return OK;
|
| + callback_ = callback;
|
| + return ERR_IO_PENDING;
|
| + }
|
| +
|
| + int OnRead(const char* data, int len, CompletionCallback* callback) {
|
| + DCHECK(!waiting_);
|
| + DCHECK(!callback_);
|
| + DCHECK(!handshake_finished_);
|
| + static const int kBufferSize = 8129;
|
| +
|
| + if (!buffer_) {
|
| + // Fast path.
|
| + int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0);
|
| + if (eoh > 0) {
|
| + handshake_finished_ = true;
|
| + return OK;
|
| + }
|
| + buffer_ = new GrowableIOBuffer();
|
| + buffer_->SetCapacity(kBufferSize);
|
| + } else {
|
| + if (buffer_->RemainingCapacity() < len) {
|
| + if (!buffer_->SetCapacity(buffer_->capacity() + kBufferSize)) {
|
| + // TODO(ukai): Check more correctly.
|
| + // Seek to the last CR or LF and reduce memory usage.
|
| + LOG(ERROR) << "Too large headers? capacity=" << buffer_->capacity();
|
| + handshake_finished_ = true;
|
| + return OK;
|
| + }
|
| + }
|
| + }
|
| + memcpy(buffer_->data(), data, len);
|
| + buffer_->set_offset(buffer_->offset() + len);
|
| +
|
| + int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(),
|
| + buffer_->offset(), 0);
|
| + handshake_finished_ = (eoh > 0);
|
| + return OK;
|
| + }
|
| +
|
| + const AddressList& address_list() const { return address_list_; }
|
| + void SetWaiting() { waiting_ = true; }
|
| + bool IsWaiting() const { return waiting_; }
|
| + bool HandshakeFinished() const { return handshake_finished_; }
|
| + void Wakeup() {
|
| + waiting_ = false;
|
| + // We wrap |callback_| to keep this alive while this is released.
|
| + scoped_refptr<CompletionCallbackRunner> runner =
|
| + new CompletionCallbackRunner(callback_);
|
| + callback_ = NULL;
|
| + MessageLoopForIO::current()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(runner.get(),
|
| + &CompletionCallbackRunner::Run));
|
| + }
|
| +
|
| + static const char* kKeyName;
|
| +
|
| + private:
|
| + class CompletionCallbackRunner
|
| + : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
|
| + public:
|
| + explicit CompletionCallbackRunner(CompletionCallback* callback)
|
| + : callback_(callback) {
|
| + DCHECK(callback_);
|
| + }
|
| + virtual ~CompletionCallbackRunner() {}
|
| + void Run() {
|
| + callback_->Run(OK);
|
| + }
|
| + private:
|
| + CompletionCallback* callback_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
|
| + };
|
| +
|
| + const AddressList& address_list_;
|
| +
|
| + CompletionCallback* callback_;
|
| + // True if waiting another websocket connection is established.
|
| + // False if the websocket is performing handshaking.
|
| + bool waiting_;
|
| +
|
| + // True if the websocket handshake is completed.
|
| + // If true, it will be removed from queue and deleted from the SocketStream
|
| + // UserData soon.
|
| + bool handshake_finished_;
|
| +
|
| + // Buffer for read data to check handshake response message.
|
| + scoped_refptr<GrowableIOBuffer> buffer_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(WebSocketState);
|
| +};
|
| +
|
| +const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState";
|
| +
|
| +WebSocketThrottle::WebSocketThrottle() {
|
| + SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this);
|
| + SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this);
|
| +}
|
| +
|
| +WebSocketThrottle::~WebSocketThrottle() {
|
| + DCHECK(queue_.empty());
|
| + DCHECK(addr_map_.empty());
|
| +}
|
| +
|
| +int WebSocketThrottle::OnStartOpenConnection(
|
| + SocketStream* socket, CompletionCallback* callback) {
|
| + WebSocketState* state = new WebSocketState(socket->address_list());
|
| + PutInQueue(socket, state);
|
| + return state->OnStartOpenConnection(callback);
|
| +}
|
| +
|
| +int WebSocketThrottle::OnRead(SocketStream* socket,
|
| + const char* data, int len,
|
| + CompletionCallback* callback) {
|
| + WebSocketState* state = static_cast<WebSocketState*>(
|
| + socket->GetUserData(WebSocketState::kKeyName));
|
| + // If no state, handshake was already completed. Do nothing.
|
| + if (!state)
|
| + return OK;
|
| +
|
| + int result = state->OnRead(data, len, callback);
|
| + if (state->HandshakeFinished()) {
|
| + RemoveFromQueue(socket, state);
|
| + WakeupSocketIfNecessary();
|
| + }
|
| + return result;
|
| +}
|
| +
|
| +int WebSocketThrottle::OnWrite(SocketStream* socket,
|
| + const char* data, int len,
|
| + CompletionCallback* callback) {
|
| + // Do nothing.
|
| + return OK;
|
| +}
|
| +
|
| +void WebSocketThrottle::OnClose(SocketStream* socket) {
|
| + WebSocketState* state = static_cast<WebSocketState*>(
|
| + socket->GetUserData(WebSocketState::kKeyName));
|
| + if (!state)
|
| + return;
|
| + RemoveFromQueue(socket, state);
|
| + WakeupSocketIfNecessary();
|
| +}
|
| +
|
| +void WebSocketThrottle::PutInQueue(SocketStream* socket,
|
| + WebSocketState* state) {
|
| + socket->SetUserData(WebSocketState::kKeyName, state);
|
| + queue_.push_back(state);
|
| + const AddressList& address_list = socket->address_list();
|
| + for (const struct addrinfo* addrinfo = address_list.head();
|
| + addrinfo != NULL;
|
| + addrinfo = addrinfo->ai_next) {
|
| + std::string addrkey = AddrinfoToHashkey(addrinfo);
|
| + ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
|
| + if (iter == addr_map_.end()) {
|
| + ConnectingQueue* queue = new ConnectingQueue();
|
| + queue->push_back(state);
|
| + addr_map_[addrkey] = queue;
|
| + } else {
|
| + iter->second->push_back(state);
|
| + state->SetWaiting();
|
| + }
|
| + }
|
| +}
|
| +
|
| +void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
|
| + WebSocketState* state) {
|
| + const AddressList& address_list = socket->address_list();
|
| + for (const struct addrinfo* addrinfo = address_list.head();
|
| + addrinfo != NULL;
|
| + addrinfo = addrinfo->ai_next) {
|
| + std::string addrkey = AddrinfoToHashkey(addrinfo);
|
| + ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
|
| + DCHECK(iter != addr_map_.end());
|
| + ConnectingQueue* queue = iter->second;
|
| + DCHECK(state == queue->front());
|
| + queue->pop_front();
|
| + if (queue->empty())
|
| + addr_map_.erase(iter);
|
| + }
|
| + for (ConnectingQueue::iterator iter = queue_.begin();
|
| + iter != queue_.end();
|
| + ++iter) {
|
| + if (*iter == state) {
|
| + queue_.erase(iter);
|
| + break;
|
| + }
|
| + }
|
| + socket->SetUserData(WebSocketState::kKeyName, NULL);
|
| +}
|
| +
|
| +void WebSocketThrottle::WakeupSocketIfNecessary() {
|
| + for (ConnectingQueue::iterator iter = queue_.begin();
|
| + iter != queue_.end();
|
| + ++iter) {
|
| + WebSocketState* state = *iter;
|
| + if (!state->IsWaiting())
|
| + continue;
|
| +
|
| + bool should_wakeup = true;
|
| + const AddressList& address_list = state->address_list();
|
| + for (const struct addrinfo* addrinfo = address_list.head();
|
| + addrinfo != NULL;
|
| + addrinfo = addrinfo->ai_next) {
|
| + std::string addrkey = AddrinfoToHashkey(addrinfo);
|
| + ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
|
| + DCHECK(iter != addr_map_.end());
|
| + ConnectingQueue* queue = iter->second;
|
| + if (state != queue->front()) {
|
| + should_wakeup = false;
|
| + break;
|
| + }
|
| + }
|
| + if (should_wakeup)
|
| + state->Wakeup();
|
| + }
|
| +}
|
| +
|
| +/* static */
|
| +void WebSocketThrottle::Init() {
|
| + Singleton<WebSocketThrottle>::get();
|
| +}
|
| +
|
| +} // namespace net
|
|
|