Index: extensions/browser/api/socket/tcp_socket.cc |
diff --git a/extensions/browser/api/socket/tcp_socket.cc b/extensions/browser/api/socket/tcp_socket.cc |
index 8d5257b45c3007ba9e7c0bb2bde5dc6efb2792f6..764ed6ed7bafeecbdae8776db82cf9aba89cb99b 100644 |
--- a/extensions/browser/api/socket/tcp_socket.cc |
+++ b/extensions/browser/api/socket/tcp_socket.cc |
@@ -4,6 +4,11 @@ |
#include "extensions/browser/api/socket/tcp_socket.h" |
+#include <ctype.h> |
+#include <algorithm> |
+#include <iomanip> |
+#include <sstream> |
+ |
#include "base/lazy_instance.h" |
#include "base/logging.h" |
#include "base/macros.h" |
@@ -42,6 +47,226 @@ ApiResourceManager<ResumableTCPServerSocket>::GetFactoryInstance() { |
return g_server_factory.Pointer(); |
} |
+SocketPauseBuffer::SocketPauseBuffer( |
+ const SocketPauseBuffer::DownstreamReadCallback& callback) |
+ : read_issued_(false), |
+ downstream_read_cb_(callback), |
+ downstream_read_buffer_(new net::GrowableIOBuffer), |
+ upstream_data_returned_(0), |
+ upstream_read_buffer_offset_(0), |
+ prior_error_code_(0), |
+ buffering_disabled_(false) {} |
+ |
+SocketPauseBuffer::~SocketPauseBuffer() {} |
+ |
+std::string SocketPauseBuffer::StatusDescription() { |
+ std::stringstream status; |
+ status << "\n"; |
+ status << static_cast<void*>(this); |
+ status << " < " << upstream_data_returned_ << " | " |
+ << downstream_read_buffer_->offset() << " | " |
+ << downstream_read_buffer_->capacity() << ">: " |
+ << ", buffered: " << BufferedDataCount() << ", " |
+ << (read_issued_ ? "READING" : "not reading") << ", cb: " |
+ << (upstream_read_callback_.is_null() ? "null" : "not null"); |
+ return status.str(); |
+} |
+ |
+int SocketPauseBuffer::BufferedDataCount() const { |
+ return downstream_read_buffer_->offset() - upstream_data_returned_; |
+} |
+ |
+void SocketPauseBuffer::CreditIncomingData(int count) { |
+ DCHECK_GE(count, 0); |
+ downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() + |
+ count); |
+} |
+ |
+void SocketPauseBuffer::ReturnDataInBuffer(net::IOBuffer* dest, int count) { |
+ DCHECK_GE(count, 0); |
+ memcpy(dest->data(), |
+ downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_, |
+ count); |
+ upstream_data_returned_ += count; |
+} |
+ |
+void SocketPauseBuffer::FreeReturnedBufferSpace() { |
+ DCHECK(!read_issued_); |
+ const int amount_to_free = upstream_data_returned_; |
+ |
+ memmove(downstream_read_buffer_->StartOfBuffer(), |
+ downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_, |
+ amount_to_free); |
+ downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() - |
+ amount_to_free); |
+ upstream_data_returned_ -= amount_to_free; |
+} |
+ |
+// Called whenever the client's callback (either upstream_read_callback_ or |
+// the |callback| argument to Read()) is invoked. Clears out state relevant |
+// to the Read. |
+void SocketPauseBuffer::ResetUpstreamState() { |
+ upstream_read_buffer_offset_ = 0; |
+ upstream_read_buffer_ = NULL; |
+ upstream_read_callback_.Reset(); |
+} |
+ |
+void SocketPauseBuffer::InvokeCallback(int result) { |
+ DCHECK(!upstream_read_callback_.is_null()); |
+ net::CompletionCallback cb = upstream_read_callback_; |
+ ResetUpstreamState(); |
+ cb.Run(result); |
+} |
+ |
+void SocketPauseBuffer::InsureBufferCanHold(int num_bytes) { |
+ if (num_bytes > (downstream_read_buffer_->capacity() - |
+ downstream_read_buffer_->offset())) { |
+ downstream_read_buffer_->SetCapacity(num_bytes + |
+ downstream_read_buffer_->offset()); |
+ } |
+} |
+ |
+bool SocketPauseBuffer::Pause() { |
+ if (buffering_disabled_) { |
+ return false; |
+ } else if (!upstream_read_callback_.is_null()) { |
+ InvokeCallback(net::ERR_ABORTED); |
+ } |
+ return true; |
+} |
+ |
+void SocketPauseBuffer::DisableBuffering() { |
+ buffering_disabled_ = true; |
+} |
+ |
+int SocketPauseBuffer::Read(net::IOBuffer* buffer, |
+ int buf_len, |
+ const net::CompletionCallback& callback) { |
+ DVLOG(1) << "Read(" << buf_len << ") START " << StatusDescription(); |
+ |
+ if (buffering_disabled_ && BufferedDataCount() == 0) { |
+ return downstream_read_cb_.Run(buffer, buf_len, callback); |
+ } |
+ |
+ // Can't buffer data and have a pending downstream Read() simultaneously. |
+ DCHECK(BufferedDataCount() == 0 || !read_issued_); |
+ |
+ // Only one upstream Read() is allowed at a time. |
+ if (!upstream_read_callback_.is_null()) { |
+ DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT " |
+ << StatusDescription(); |
+ DCHECK(read_issued_); |
+ return net::ERR_INVALID_ARGUMENT; // 'this' is the invalid argument. |
+ } |
+ |
+ if (buf_len < 0) { |
+ DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT " |
+ << StatusDescription(); |
+ return net::ERR_INVALID_ARGUMENT; |
+ } |
+ |
+ if (BufferedDataCount() > 0) { |
+ const int num_returned_bytes = std::min(buf_len, BufferedDataCount()); |
+ ReturnDataInBuffer(buffer, num_returned_bytes); |
+ // Release our internal buffer once we're no longer using it. |
+ if (buffering_disabled_ && BufferedDataCount() == 0) { |
+ downstream_read_buffer_ = nullptr; |
+ } |
+ return num_returned_bytes; |
+ } else { |
+ DCHECK(!buffering_disabled_); |
+ upstream_read_callback_ = callback; |
+ upstream_read_request_size_ = buf_len; |
+ upstream_read_buffer_ = buffer; |
+ if (!read_issued_) { |
+ // Issue a new downstream read for this request. |
+ FreeReturnedBufferSpace(); |
+ InsureBufferCanHold(buf_len); |
+ read_issued_ = true; |
+ int result = downstream_read_cb_.Run( |
+ downstream_read_buffer_.get(), buf_len, |
+ base::Bind(&SocketPauseBuffer::ReadComplete, base::Unretained(this))); |
+ if (result > 0) { |
+ read_issued_ = false; |
+ CreditIncomingData(result); |
+ ReturnDataInBuffer(buffer, result); |
+ ResetUpstreamState(); |
+ } |
+ return result; |
+ } else { |
+ // We already have a downstream read pending, and now it's plumbed to |
+ // invoke the callback argument. |
+ return net::ERR_IO_PENDING; |
+ } |
+ } |
+} |
+ |
+void SocketPauseBuffer::ReadComplete(int count) { |
+ read_issued_ = false; |
+ if (count > 0) { |
+ CreditIncomingData(count); |
+ } |
+ if (upstream_read_callback_.is_null()) { |
+ prior_error_code_ = std::min(0, count); |
+ } else { |
+ int num_returned_bytes = std::min(count, upstream_read_request_size_); |
+ ReturnDataInBuffer(upstream_read_buffer_.get(), num_returned_bytes); |
+ InvokeCallback(num_returned_bytes); |
+ } |
+} |
+ |
+BufferingStreamSocket::BufferingStreamSocket( |
+ scoped_ptr<net::StreamSocket> socket) |
+ : Passthrough<net::StreamSocket>(socket.Pass()), |
+ pause_buffer_(base::Bind(&net::StreamSocket::Read, |
+ base::Unretained(base_socket()))) {} |
+ |
+BufferingStreamSocket::~BufferingStreamSocket() {} |
+ |
+bool BufferingStreamSocket::Pause() { |
+ return pause_buffer_.Pause(); |
+} |
+ |
+void BufferingStreamSocket::DisableBuffering() { |
+ pause_buffer_.DisableBuffering(); |
+} |
+ |
+int BufferingStreamSocket::Read(net::IOBuffer* buffer, |
+ int buf_len, |
+ const net::CompletionCallback& callback) { |
+ return pause_buffer_.Read(buffer, buf_len, callback); |
+} |
+ |
+BufferingTCPClientSocket::BufferingTCPClientSocket( |
+ scoped_ptr<net::TCPClientSocket> socket) |
+ : Passthrough<net::TCPClientSocket>(socket.Pass()), |
+ pause_buffer_(base::Bind(&net::TCPClientSocket::Read, |
+ base::Unretained(base_socket()))) {} |
+ |
+BufferingTCPClientSocket::~BufferingTCPClientSocket() {} |
+ |
+bool BufferingTCPClientSocket::Pause() { |
+ return pause_buffer_.Pause(); |
+} |
+ |
+void BufferingTCPClientSocket::DisableBuffering() { |
+ pause_buffer_.DisableBuffering(); |
+} |
+ |
+int BufferingTCPClientSocket::Read(net::IOBuffer* buffer, |
+ int buf_len, |
+ const net::CompletionCallback& callback) { |
+ return pause_buffer_.Read(buffer, buf_len, callback); |
+} |
+ |
+bool BufferingTCPClientSocket::SetKeepAlive(bool enable, int delay) { |
+ return base_socket()->SetKeepAlive(enable, delay); |
+} |
+ |
+bool BufferingTCPClientSocket::SetNoDelay(bool no_delay) { |
+ return base_socket()->SetNoDelay(no_delay); |
+} |
+ |
TCPSocket::TCPSocket(const std::string& owner_extension_id) |
: Socket(owner_extension_id), socket_mode_(UNKNOWN) {} |
@@ -49,8 +274,9 @@ TCPSocket::TCPSocket(net::TCPClientSocket* tcp_client_socket, |
const std::string& owner_extension_id, |
bool is_connected) |
: Socket(owner_extension_id), |
- socket_(tcp_client_socket), |
socket_mode_(CLIENT) { |
+ scoped_ptr<net::TCPClientSocket> p(tcp_client_socket); |
+ socket_.reset(new BufferingTCPClientSocket(p.Pass())); |
this->is_connected_ = is_connected; |
} |
@@ -94,9 +320,11 @@ void TCPSocket::Connect(const net::AddressList& address, |
if (is_connected_) |
break; |
- socket_.reset( |
+ scoped_ptr<net::TCPClientSocket> internal_sock( |
new net::TCPClientSocket(address, NULL, net::NetLog::Source())); |
+ socket_.reset(new BufferingTCPClientSocket(internal_sock.Pass())); |
+ |
connect_callback_ = callback; |
result = socket_->Connect( |
base::Bind(&TCPSocket::OnConnectComplete, base::Unretained(this))); |
@@ -316,7 +544,7 @@ void TCPSocket::Release() { |
ignore_result(socket_.release()); |
} |
-net::TCPClientSocket* TCPSocket::ClientStream() { |
+BufferingTCPClientSocket* TCPSocket::ClientStream() { |
if (socket_mode_ != CLIENT || GetSocketType() != TYPE_TCP) |
return NULL; |
return socket_.get(); |
@@ -340,8 +568,17 @@ ResumableTCPSocket::ResumableTCPSocket(net::TCPClientSocket* tcp_client_socket, |
buffer_size_(0), |
paused_(false) {} |
+ResumableTCPSocket::~ResumableTCPSocket() {} |
+ |
bool ResumableTCPSocket::IsPersistent() const { return persistent(); } |
+void ResumableTCPSocket::ApiPauseComplete() { |
+ if (!pause_callback_.is_null()) { |
+ pause_callback_.Run(); |
+ pause_callback_.Reset(); |
+ } |
+} |
+ |
ResumableTCPServerSocket::ResumableTCPServerSocket( |
const std::string& owner_extension_id) |
: TCPSocket(owner_extension_id), persistent_(false), paused_(false) {} |