Chromium Code Reviews| Index: net/server/http_connection.cc |
| diff --git a/net/server/http_connection.cc b/net/server/http_connection.cc |
| index d433012cd651c7b7462972c3835cb818a9a78d8d..5b1ada243d66949d13a419ddc76fae54afa23f01 100644 |
| --- a/net/server/http_connection.cc |
| +++ b/net/server/http_connection.cc |
| @@ -4,44 +4,283 @@ |
| #include "net/server/http_connection.h" |
| +#include <queue> |
| + |
| +#include "base/bind.h" |
| +#include "base/location.h" |
| +#include "base/message_loop/message_loop_proxy.h" |
| +#include "net/base/net_errors.h" |
| #include "net/server/http_server.h" |
| #include "net/server/http_server_response_info.h" |
| #include "net/server/web_socket.h" |
| -#include "net/socket/stream_listen_socket.h" |
| +#include "net/socket/stream_socket.h" |
| namespace net { |
| -int HttpConnection::last_id_ = 0; |
| +namespace { |
| + |
| +const size_t kInitialReadBufSize = 1024; |
| +const size_t kMinimumReadBufSize = 128; |
| +const int kCapacityIncreaseFactor = 2; |
| + |
| +const size_t kPendingDataLimit = 1 * 1024 * 1024; // 1 Mbytes. |
| + |
| +} // namespace |
| + |
| +HttpConnection::ReadIOBuffer::ReadIOBuffer() |
| + : base_(new GrowableIOBuffer()) { |
| + SetCapacity(kInitialReadBufSize); |
| +} |
| + |
| +HttpConnection::ReadIOBuffer::~ReadIOBuffer() { |
| + data_ = NULL; // base_ owns data_. |
| +} |
| + |
| +IOBuffer* HttpConnection::ReadIOBuffer::GetUnusedIOBuffer() const { |
| + return base_.get(); |
| +} |
| + |
| +size_t HttpConnection::ReadIOBuffer::GetUnusedCapacity() const { |
| + return static_cast<size_t>(base_->RemainingCapacity()); |
| +} |
| + |
| +size_t HttpConnection::ReadIOBuffer::GetCapacity() const { |
| + return static_cast<size_t>(base_->capacity()); |
| +} |
| + |
| +void HttpConnection::ReadIOBuffer::SetCapacity(size_t capacity) { |
| + base_->SetCapacity(static_cast<int>(capacity)); |
| + data_ = base_->StartOfBuffer(); |
| +} |
| + |
| +size_t HttpConnection::ReadIOBuffer::GetUnconsumedSize() const { |
| + DCHECK_GE(base_->data(), data_); |
| + return static_cast<size_t>(base_->data() - data_); |
| +} |
| + |
| +void HttpConnection::ReadIOBuffer::DidRead(size_t bytes) { |
| + DCHECK_LE(bytes, GetUnusedCapacity()); |
| + base_->set_offset(base_->offset() + bytes); |
| +} |
| + |
| +void HttpConnection::ReadIOBuffer::DidConsume(size_t bytes) { |
| + DCHECK_LE(bytes, GetUnconsumedSize()); |
| + if (bytes < GetUnconsumedSize()) { |
| + data_ += bytes; |
| + return; |
| + } |
| + // No need to keep consumed data because no data will be moved. |
| + // If capacity is too big, reduce it. |
| + if (base_->capacity() > static_cast<int>(kMinimumReadBufSize) |
| + && base_->capacity() > base_->offset() * kCapacityIncreaseFactor) { |
| + SetCapacity(base_->capacity() / kCapacityIncreaseFactor); |
| + } |
| + base_->set_offset(0); |
| +} |
| + |
| +// IOBuffer of pending data which has a queue of pending data. Each pending data |
| +// is stored in std::string. data() is the data of first std::string stored. |
| +class HttpConnection::PendingWriteIOBuffer : public IOBuffer { |
| + public: |
| + PendingWriteIOBuffer(); |
| + |
| + // Whether or not pending data exists. |
| + bool IsEmpty() const; |
| + |
| + // Appends new pending data. Changes data() if this is the first pending data. |
| + void Append(const std::string& data); |
| + // Consumes data and changes data() accordingly. |
| + void DidConsume(size_t size); |
| + |
| + // Gets size of data to write this time. It is NOT total data size. |
| + size_t GetSizeToWrite() const; |
| + |
| + size_t total_size() const { return total_size_; } |
| + |
| + private: |
| + virtual ~PendingWriteIOBuffer(); |
| + |
| + std::queue<std::string> pending_data_; |
| + size_t total_size_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(PendingWriteIOBuffer); |
| +}; |
| + |
| +HttpConnection::PendingWriteIOBuffer::PendingWriteIOBuffer() |
| + : total_size_(0) { |
| +} |
| + |
| +HttpConnection::PendingWriteIOBuffer::~PendingWriteIOBuffer() { |
| +} |
| + |
| +bool HttpConnection::PendingWriteIOBuffer::IsEmpty() const { |
| + return pending_data_.empty(); |
| +} |
| + |
| +void HttpConnection::PendingWriteIOBuffer::Append(const std::string& data) { |
| + if (data.empty()) { |
| + return; |
| + } |
| + |
| + pending_data_.push(data); |
|
mmenke
2014/05/23 19:20:58
Can't we just take IOBuffers instead, to avoid thi
byungchul
2014/05/28 01:19:35
To construct IOBuffer from std::string in any poin
Ryan Sleevi
2014/05/28 01:36:26
Copy-on-write is forbidden behaviour in C++11, and
byungchul
2014/05/30 00:19:02
Now, HttpServer uses this buffer directly. Buildin
|
| + total_size_ += data.size(); |
| + |
| + // If new data is the first pending data, updates data_. |
| + if (pending_data_.size() == 1) { |
| + data_ = const_cast<char*>(pending_data_.front().data()); |
| + } |
| +} |
| + |
| +void HttpConnection::PendingWriteIOBuffer::DidConsume(size_t size) { |
| + DCHECK_LE(size, total_size_); |
| + DCHECK_LE(size, GetSizeToWrite()); |
| + if (size == 0) { |
| + return; |
| + } |
| + |
| + if (size < GetSizeToWrite()) { |
| + data_ += size; |
| + } else { // size == GetSizeToWrite(). Updates data_ to next pending data. |
| + pending_data_.pop(); |
| + data_ = IsEmpty() ? NULL : const_cast<char*>(pending_data_.front().data()); |
| + } |
| + total_size_ -= size; |
| +} |
| + |
| +size_t HttpConnection::PendingWriteIOBuffer::GetSizeToWrite() const { |
| + if (IsEmpty()) { |
| + DCHECK_EQ(total_size_, 0U); |
| + return 0; |
| + } |
| + DCHECK_GE(data_, pending_data_.front().data()); |
| + size_t consumed = static_cast<size_t>(data_ - pending_data_.front().data()); |
| + DCHECK_GT(pending_data_.front().size(), consumed); |
|
mmenke
2014/05/23 19:20:58
This class may be may effort than it's worth, as-i
mmenke
2014/05/23 19:20:58
Actually...Why don't we do something simpler, like
byungchul
2014/05/28 01:19:35
Any one needs to store date to write anyway. My im
Ryan Sleevi
2014/05/28 01:36:26
Normally, in net/ code, we leave buffering up to t
byungchul
2014/05/28 05:01:33
While keeping current HttpServer apis, I will try
byungchul
2014/05/30 00:19:02
Done.
|
| + return pending_data_.front().size() - consumed; |
| +} |
| + |
| +HttpConnection::HttpConnection(int id, |
| + scoped_ptr<StreamSocket> socket, |
| + Delegate* delegate) |
| + : id_(id), |
| + socket_(socket.Pass()), |
| + delegate_(delegate), |
| + read_buf_(new ReadIOBuffer()), |
| + pending_write_buf_(new PendingWriteIOBuffer()) { |
| + DoReadLoop(OK); |
| +} |
| + |
| +HttpConnection::~HttpConnection() { |
| +} |
| + |
| +void HttpConnection::DoReadLoop(int rv) { |
|
mmenke
2014/05/23 19:20:58
Taking in an "rv" that's always OK seems a little
byungchul
2014/05/30 00:19:02
Done and moved to HttpServer.
|
| + while (rv == OK) { |
| + if (read_buf_->GetUnusedCapacity() == 0) { |
| + if (read_buf_->GetCapacity() > kPendingDataLimit) { |
| + LOG(ERROR) << "Too large read data is pending: capacity=" |
| + << read_buf_->GetCapacity() |
| + << ", consumed=" |
| + << read_buf_->GetCapacity() - read_buf_->GetUnconsumedSize(); |
| + Close(); |
| + return; |
| + } |
| + read_buf_->SetCapacity(read_buf_->GetCapacity() * kMinimumReadBufSize); |
| + } |
| + rv = socket_->Read(read_buf_->GetUnusedIOBuffer(), |
| + read_buf_->GetUnusedCapacity(), |
| + base::Bind(&HttpConnection::OnReadCompleted, |
| + AsWeakPtr())); |
|
mmenke
2014/05/23 19:20:58
We don't need weak pointers here - we own the sock
byungchul
2014/05/30 00:19:02
Replaced with connection id in http server.
|
| + if (rv == ERR_IO_PENDING) { |
| + break; |
| + } |
| + rv = DidRead(rv); |
| + } |
| +} |
| + |
| +void HttpConnection::OnReadCompleted(int rv) { |
| + if (DidRead(rv) == OK) { |
| + DoReadLoop(OK); |
| + } |
| +} |
| + |
| +int HttpConnection::DidRead(int rv) { |
| + if (rv <= 0) { |
| + Close(); |
| + return rv == 0 ? ERR_CONNECTION_CLOSED : rv; |
| + } |
| + |
| + read_buf_->DidRead(static_cast<size_t>(rv)); |
| + delegate_->DidRead(this); |
| + return OK; |
| +} |
| void HttpConnection::Send(const std::string& data) { |
| - if (!socket_.get()) |
| + if (pending_write_buf_->total_size() + data.size() > kPendingDataLimit) { |
|
pfeldman
2014/05/26 09:21:31
Before we go further, could you elaborate on this
mmenke
2014/05/27 14:32:08
It's buffering 100 MB payloads in RAM? That just
byungchul
2014/05/30 00:19:02
Increased websocket buffer to 100Mb in cast of dev
|
| + LOG(ERROR) << "Too large write data is pending: size=" |
| + << pending_write_buf_->total_size() + data.size(); |
| + Close(); |
| return; |
| - socket_->Send(data); |
| + } |
| + bool writing_in_progress = !pending_write_buf_->IsEmpty(); |
| + pending_write_buf_->Append(data); |
| + if (!writing_in_progress) { |
| + DoWriteLoop(OK); |
| + } |
| } |
| void HttpConnection::Send(const char* bytes, int len) { |
| - if (!socket_.get()) |
| - return; |
| - socket_->Send(bytes, len); |
| + Send(std::string(bytes, len)); |
|
mmenke
2014/05/23 19:20:58
This copy really shouldn't be needed.
byungchul
2014/05/30 00:19:02
This is used only by net/server/web_socket.cc. One
|
| } |
| void HttpConnection::Send(const HttpServerResponseInfo& response) { |
| Send(response.Serialize()); |
| } |
| -HttpConnection::HttpConnection(HttpServer* server, |
| - scoped_ptr<StreamListenSocket> sock) |
| - : server_(server), |
| - socket_(sock.Pass()) { |
| - id_ = last_id_++; |
| +void HttpConnection::DoWriteLoop(int rv) { |
|
mmenke
2014/05/23 19:20:58
Again, rv is always "OK", so doesn't seem like it
byungchul
2014/05/30 00:19:02
Done.
|
| + while (rv == OK && pending_write_buf_->GetSizeToWrite() > 0) { |
| + rv = socket_->Write(pending_write_buf_.get(), |
| + pending_write_buf_->GetSizeToWrite(), |
| + base::Bind(&HttpConnection::OnWriteCompleted, |
| + AsWeakPtr())); |
|
mmenke
2014/05/23 19:20:58
WeakPtr not needed.
byungchul
2014/05/30 00:19:02
Done.
|
| + if (rv == ERR_IO_PENDING || rv == OK) { |
| + break; |
| + } |
| + rv = DidWrite(rv); |
| + } |
| } |
| -HttpConnection::~HttpConnection() { |
| - server_->delegate_->OnClose(id_); |
| +void HttpConnection::OnWriteCompleted(int rv) { |
| + if (DidWrite(rv) == OK) { |
| + DoWriteLoop(OK); |
| + } |
| +} |
| + |
| +int HttpConnection::DidWrite(int rv) { |
| + if (rv < 0) { |
| + Close(); |
| + return rv; |
| + } |
| + |
| + pending_write_buf_->DidConsume(rv); |
| + return OK; |
| +} |
| + |
| +void HttpConnection::Close() { |
|
mmenke
2014/05/23 19:20:58
This function can end up being called twice: Once
byungchul
2014/05/28 01:19:35
That's why I use a weak ptr.
Ryan Sleevi
2014/05/28 01:36:26
I don't really think that's an ideal solution.
No
byungchul
2014/05/30 00:19:02
HttpServer handles it and/so never calls Close() t
|
| + // DidClose() may delete this object. Call it in next run loop to make sure |
| + // any DidRead()/DidWrite() callbacks in the stack return. |
| + base::MessageLoopProxy::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&HttpConnection::CloseInNextRunLoop, AsWeakPtr())); |
|
mmenke
2014/05/23 19:20:58
So we always call back Delegate::DidClose asynchro
byungchul
2014/05/28 01:19:35
They are different. DidClose() could be called for
|
| +} |
| + |
| +void HttpConnection::CloseInNextRunLoop() { |
|
mmenke
2014/05/23 19:20:58
I think this and Close() are misnamed. Close() sh
byungchul
2014/05/30 00:19:02
Not necessary. Removed.
|
| + socket_->Disconnect(); |
| + delegate_->DidClose(this); |
| } |
| -void HttpConnection::Shift(int num_bytes) { |
| - recv_data_ = recv_data_.substr(num_bytes); |
| +void HttpConnection::UpgradeToWebSocket(const HttpServerRequestInfo& request, |
| + size_t* pos) { |
| + DCHECK(!web_socket_); |
| + web_socket_.reset(WebSocket::CreateWebSocket(this, request, pos)); |
| } |
| } // namespace net |