Index: net/server/http_connection.cc |
diff --git a/net/server/http_connection.cc b/net/server/http_connection.cc |
index d433012cd651c7b7462972c3835cb818a9a78d8d..5b1ada243d66949d13a419ddc76fae54afa23f01 100644 |
--- a/net/server/http_connection.cc |
+++ b/net/server/http_connection.cc |
@@ -4,44 +4,283 @@ |
#include "net/server/http_connection.h" |
+#include <queue> |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/message_loop/message_loop_proxy.h" |
+#include "net/base/net_errors.h" |
#include "net/server/http_server.h" |
#include "net/server/http_server_response_info.h" |
#include "net/server/web_socket.h" |
-#include "net/socket/stream_listen_socket.h" |
+#include "net/socket/stream_socket.h" |
namespace net { |
-int HttpConnection::last_id_ = 0; |
+namespace { |
+ |
+const size_t kInitialReadBufSize = 1024; |
+const size_t kMinimumReadBufSize = 128; |
+const int kCapacityIncreaseFactor = 2; |
+ |
+const size_t kPendingDataLimit = 1 * 1024 * 1024; // 1 Mbytes. |
+ |
+} // namespace |
+ |
+HttpConnection::ReadIOBuffer::ReadIOBuffer() |
+ : base_(new GrowableIOBuffer()) { |
+ SetCapacity(kInitialReadBufSize); |
+} |
+ |
+HttpConnection::ReadIOBuffer::~ReadIOBuffer() { |
+ data_ = NULL; // base_ owns data_. |
+} |
+ |
+IOBuffer* HttpConnection::ReadIOBuffer::GetUnusedIOBuffer() const { |
+ return base_.get(); |
+} |
+ |
+size_t HttpConnection::ReadIOBuffer::GetUnusedCapacity() const { |
+ return static_cast<size_t>(base_->RemainingCapacity()); |
+} |
+ |
+size_t HttpConnection::ReadIOBuffer::GetCapacity() const { |
+ return static_cast<size_t>(base_->capacity()); |
+} |
+ |
+void HttpConnection::ReadIOBuffer::SetCapacity(size_t capacity) { |
+ base_->SetCapacity(static_cast<int>(capacity)); |
+ data_ = base_->StartOfBuffer(); |
+} |
+ |
+size_t HttpConnection::ReadIOBuffer::GetUnconsumedSize() const { |
+ DCHECK_GE(base_->data(), data_); |
+ return static_cast<size_t>(base_->data() - data_); |
+} |
+ |
+void HttpConnection::ReadIOBuffer::DidRead(size_t bytes) { |
+ DCHECK_LE(bytes, GetUnusedCapacity()); |
+ base_->set_offset(base_->offset() + bytes); |
+} |
+ |
+void HttpConnection::ReadIOBuffer::DidConsume(size_t bytes) { |
+ DCHECK_LE(bytes, GetUnconsumedSize()); |
+ if (bytes < GetUnconsumedSize()) { |
+ data_ += bytes; |
+ return; |
+ } |
+ // No need to keep consumed data because no data will be moved. |
+ // If capacity is too big, reduce it. |
+ if (base_->capacity() > static_cast<int>(kMinimumReadBufSize) |
+ && base_->capacity() > base_->offset() * kCapacityIncreaseFactor) { |
+ SetCapacity(base_->capacity() / kCapacityIncreaseFactor); |
+ } |
+ base_->set_offset(0); |
+} |
+ |
+// IOBuffer of pending data which has a queue of pending data. Each pending data |
+// is stored in std::string. data() is the data of first std::string stored. |
+class HttpConnection::PendingWriteIOBuffer : public IOBuffer { |
+ public: |
+ PendingWriteIOBuffer(); |
+ |
+ // Whether or not pending data exists. |
+ bool IsEmpty() const; |
+ |
+ // Appends new pending data. Changes data() if this is the first pending data. |
+ void Append(const std::string& data); |
+ // Consumes data and changes data() accordingly. |
+ void DidConsume(size_t size); |
+ |
+ // Gets size of data to write this time. It is NOT total data size. |
+ size_t GetSizeToWrite() const; |
+ |
+ size_t total_size() const { return total_size_; } |
+ |
+ private: |
+ virtual ~PendingWriteIOBuffer(); |
+ |
+ std::queue<std::string> pending_data_; |
+ size_t total_size_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(PendingWriteIOBuffer); |
+}; |
+ |
+HttpConnection::PendingWriteIOBuffer::PendingWriteIOBuffer() |
+ : total_size_(0) { |
+} |
+ |
+HttpConnection::PendingWriteIOBuffer::~PendingWriteIOBuffer() { |
+} |
+ |
+bool HttpConnection::PendingWriteIOBuffer::IsEmpty() const { |
+ return pending_data_.empty(); |
+} |
+ |
+void HttpConnection::PendingWriteIOBuffer::Append(const std::string& data) { |
+ if (data.empty()) { |
+ return; |
+ } |
+ |
+ pending_data_.push(data); |
mmenke
2014/05/23 19:20:58
Can't we just take IOBuffers instead, to avoid thi
byungchul
2014/05/28 01:19:35
To construct IOBuffer from std::string in any poin
Ryan Sleevi
2014/05/28 01:36:26
Copy-on-write is forbidden behaviour in C++11, and
byungchul
2014/05/30 00:19:02
Now, HttpServer uses this buffer directly. Buildin
|
+ total_size_ += data.size(); |
+ |
+ // If new data is the first pending data, updates data_. |
+ if (pending_data_.size() == 1) { |
+ data_ = const_cast<char*>(pending_data_.front().data()); |
+ } |
+} |
+ |
+void HttpConnection::PendingWriteIOBuffer::DidConsume(size_t size) { |
+ DCHECK_LE(size, total_size_); |
+ DCHECK_LE(size, GetSizeToWrite()); |
+ if (size == 0) { |
+ return; |
+ } |
+ |
+ if (size < GetSizeToWrite()) { |
+ data_ += size; |
+ } else { // size == GetSizeToWrite(). Updates data_ to next pending data. |
+ pending_data_.pop(); |
+ data_ = IsEmpty() ? NULL : const_cast<char*>(pending_data_.front().data()); |
+ } |
+ total_size_ -= size; |
+} |
+ |
+size_t HttpConnection::PendingWriteIOBuffer::GetSizeToWrite() const { |
+ if (IsEmpty()) { |
+ DCHECK_EQ(total_size_, 0U); |
+ return 0; |
+ } |
+ DCHECK_GE(data_, pending_data_.front().data()); |
+ size_t consumed = static_cast<size_t>(data_ - pending_data_.front().data()); |
+ DCHECK_GT(pending_data_.front().size(), consumed); |
mmenke
2014/05/23 19:20:58
This class may be may effort than it's worth, as-i
mmenke
2014/05/23 19:20:58
Actually...Why don't we do something simpler, like
byungchul
2014/05/28 01:19:35
Any one needs to store date to write anyway. My im
Ryan Sleevi
2014/05/28 01:36:26
Normally, in net/ code, we leave buffering up to t
byungchul
2014/05/28 05:01:33
While keeping current HttpServer apis, I will try
byungchul
2014/05/30 00:19:02
Done.
|
+ return pending_data_.front().size() - consumed; |
+} |
+ |
+HttpConnection::HttpConnection(int id, |
+ scoped_ptr<StreamSocket> socket, |
+ Delegate* delegate) |
+ : id_(id), |
+ socket_(socket.Pass()), |
+ delegate_(delegate), |
+ read_buf_(new ReadIOBuffer()), |
+ pending_write_buf_(new PendingWriteIOBuffer()) { |
+ DoReadLoop(OK); |
+} |
+ |
+HttpConnection::~HttpConnection() { |
+} |
+ |
+void HttpConnection::DoReadLoop(int rv) { |
mmenke
2014/05/23 19:20:58
Taking in an "rv" that's always OK seems a little
byungchul
2014/05/30 00:19:02
Done and moved to HttpServer.
|
+ while (rv == OK) { |
+ if (read_buf_->GetUnusedCapacity() == 0) { |
+ if (read_buf_->GetCapacity() > kPendingDataLimit) { |
+ LOG(ERROR) << "Too large read data is pending: capacity=" |
+ << read_buf_->GetCapacity() |
+ << ", consumed=" |
+ << read_buf_->GetCapacity() - read_buf_->GetUnconsumedSize(); |
+ Close(); |
+ return; |
+ } |
+ read_buf_->SetCapacity(read_buf_->GetCapacity() * kMinimumReadBufSize); |
+ } |
+ rv = socket_->Read(read_buf_->GetUnusedIOBuffer(), |
+ read_buf_->GetUnusedCapacity(), |
+ base::Bind(&HttpConnection::OnReadCompleted, |
+ AsWeakPtr())); |
mmenke
2014/05/23 19:20:58
We don't need weak pointers here - we own the sock
byungchul
2014/05/30 00:19:02
Replaced with connection id in http server.
|
+ if (rv == ERR_IO_PENDING) { |
+ break; |
+ } |
+ rv = DidRead(rv); |
+ } |
+} |
+ |
+void HttpConnection::OnReadCompleted(int rv) { |
+ if (DidRead(rv) == OK) { |
+ DoReadLoop(OK); |
+ } |
+} |
+ |
+int HttpConnection::DidRead(int rv) { |
+ if (rv <= 0) { |
+ Close(); |
+ return rv == 0 ? ERR_CONNECTION_CLOSED : rv; |
+ } |
+ |
+ read_buf_->DidRead(static_cast<size_t>(rv)); |
+ delegate_->DidRead(this); |
+ return OK; |
+} |
void HttpConnection::Send(const std::string& data) { |
- if (!socket_.get()) |
+ if (pending_write_buf_->total_size() + data.size() > kPendingDataLimit) { |
pfeldman
2014/05/26 09:21:31
Before we go further, could you elaborate on this
mmenke
2014/05/27 14:32:08
It's buffering 100 MB payloads in RAM? That just
byungchul
2014/05/30 00:19:02
Increased websocket buffer to 100Mb in cast of dev
|
+ LOG(ERROR) << "Too large write data is pending: size=" |
+ << pending_write_buf_->total_size() + data.size(); |
+ Close(); |
return; |
- socket_->Send(data); |
+ } |
+ bool writing_in_progress = !pending_write_buf_->IsEmpty(); |
+ pending_write_buf_->Append(data); |
+ if (!writing_in_progress) { |
+ DoWriteLoop(OK); |
+ } |
} |
void HttpConnection::Send(const char* bytes, int len) { |
- if (!socket_.get()) |
- return; |
- socket_->Send(bytes, len); |
+ Send(std::string(bytes, len)); |
mmenke
2014/05/23 19:20:58
This copy really shouldn't be needed.
byungchul
2014/05/30 00:19:02
This is used only by net/server/web_socket.cc. One
|
} |
void HttpConnection::Send(const HttpServerResponseInfo& response) { |
Send(response.Serialize()); |
} |
-HttpConnection::HttpConnection(HttpServer* server, |
- scoped_ptr<StreamListenSocket> sock) |
- : server_(server), |
- socket_(sock.Pass()) { |
- id_ = last_id_++; |
+void HttpConnection::DoWriteLoop(int rv) { |
mmenke
2014/05/23 19:20:58
Again, rv is always "OK", so doesn't seem like it
byungchul
2014/05/30 00:19:02
Done.
|
+ while (rv == OK && pending_write_buf_->GetSizeToWrite() > 0) { |
+ rv = socket_->Write(pending_write_buf_.get(), |
+ pending_write_buf_->GetSizeToWrite(), |
+ base::Bind(&HttpConnection::OnWriteCompleted, |
+ AsWeakPtr())); |
mmenke
2014/05/23 19:20:58
WeakPtr not needed.
byungchul
2014/05/30 00:19:02
Done.
|
+ if (rv == ERR_IO_PENDING || rv == OK) { |
+ break; |
+ } |
+ rv = DidWrite(rv); |
+ } |
} |
-HttpConnection::~HttpConnection() { |
- server_->delegate_->OnClose(id_); |
+void HttpConnection::OnWriteCompleted(int rv) { |
+ if (DidWrite(rv) == OK) { |
+ DoWriteLoop(OK); |
+ } |
+} |
+ |
+int HttpConnection::DidWrite(int rv) { |
+ if (rv < 0) { |
+ Close(); |
+ return rv; |
+ } |
+ |
+ pending_write_buf_->DidConsume(rv); |
+ return OK; |
+} |
+ |
+void HttpConnection::Close() { |
mmenke
2014/05/23 19:20:58
This function can end up being called twice: Once
byungchul
2014/05/28 01:19:35
That's why I use a weak ptr.
Ryan Sleevi
2014/05/28 01:36:26
I don't really think that's an ideal solution.
No
byungchul
2014/05/30 00:19:02
HttpServer handles it and/so never calls Close() t
|
+ // DidClose() may delete this object. Call it in next run loop to make sure |
+ // any DidRead()/DidWrite() callbacks in the stack return. |
+ base::MessageLoopProxy::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&HttpConnection::CloseInNextRunLoop, AsWeakPtr())); |
mmenke
2014/05/23 19:20:58
So we always call back Delegate::DidClose asynchro
byungchul
2014/05/28 01:19:35
They are different. DidClose() could be called for
|
+} |
+ |
+void HttpConnection::CloseInNextRunLoop() { |
mmenke
2014/05/23 19:20:58
I think this and Close() are misnamed. Close() sh
byungchul
2014/05/30 00:19:02
Not necessary. Removed.
|
+ socket_->Disconnect(); |
+ delegate_->DidClose(this); |
} |
-void HttpConnection::Shift(int num_bytes) { |
- recv_data_ = recv_data_.substr(num_bytes); |
+void HttpConnection::UpgradeToWebSocket(const HttpServerRequestInfo& request, |
+ size_t* pos) { |
+ DCHECK(!web_socket_); |
+ web_socket_.reset(WebSocket::CreateWebSocket(this, request, pos)); |
} |
} // namespace net |