Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(977)

Unified Diff: extensions/browser/api/socket/tcp_socket.cc

Issue 494573002: A change for the setPause() api in chrome.sockets.tcp: Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Cosmetics and commentary. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: extensions/browser/api/socket/tcp_socket.cc
diff --git a/extensions/browser/api/socket/tcp_socket.cc b/extensions/browser/api/socket/tcp_socket.cc
index 8d5257b45c3007ba9e7c0bb2bde5dc6efb2792f6..764ed6ed7bafeecbdae8776db82cf9aba89cb99b 100644
--- a/extensions/browser/api/socket/tcp_socket.cc
+++ b/extensions/browser/api/socket/tcp_socket.cc
@@ -4,6 +4,11 @@
#include "extensions/browser/api/socket/tcp_socket.h"
+#include <ctype.h>
+#include <algorithm>
+#include <iomanip>
+#include <sstream>
+
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/macros.h"
@@ -42,6 +47,226 @@ ApiResourceManager<ResumableTCPServerSocket>::GetFactoryInstance() {
return g_server_factory.Pointer();
}
+SocketPauseBuffer::SocketPauseBuffer(
+ const SocketPauseBuffer::DownstreamReadCallback& callback)
+ : read_issued_(false),
+ downstream_read_cb_(callback),
+ downstream_read_buffer_(new net::GrowableIOBuffer),
+ upstream_data_returned_(0),
+ upstream_read_buffer_offset_(0),
+ prior_error_code_(0),
+ buffering_disabled_(false) {}
+
+SocketPauseBuffer::~SocketPauseBuffer() {}
+
+std::string SocketPauseBuffer::StatusDescription() {
+ std::stringstream status;
+ status << "\n";
+ status << static_cast<void*>(this);
+ status << " < " << upstream_data_returned_ << " | "
+ << downstream_read_buffer_->offset() << " | "
+ << downstream_read_buffer_->capacity() << ">: "
+ << ", buffered: " << BufferedDataCount() << ", "
+ << (read_issued_ ? "READING" : "not reading") << ", cb: "
+ << (upstream_read_callback_.is_null() ? "null" : "not null");
+ return status.str();
+}
+
+int SocketPauseBuffer::BufferedDataCount() const {
+ return downstream_read_buffer_->offset() - upstream_data_returned_;
+}
+
+void SocketPauseBuffer::CreditIncomingData(int count) {
+ DCHECK_GE(count, 0);
+ downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() +
+ count);
+}
+
+void SocketPauseBuffer::ReturnDataInBuffer(net::IOBuffer* dest, int count) {
+ DCHECK_GE(count, 0);
+ memcpy(dest->data(),
+ downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_,
+ count);
+ upstream_data_returned_ += count;
+}
+
+void SocketPauseBuffer::FreeReturnedBufferSpace() {
+ DCHECK(!read_issued_);
+ const int amount_to_free = upstream_data_returned_;
+
+ memmove(downstream_read_buffer_->StartOfBuffer(),
+ downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_,
+ amount_to_free);
+ downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() -
+ amount_to_free);
+ upstream_data_returned_ -= amount_to_free;
+}
+
+// Called whenever the client's callback (either upstream_read_callback_ or
+// the |callback| argument to Read()) is invoked. Clears out state relevant
+// to the Read.
+void SocketPauseBuffer::ResetUpstreamState() {
+ upstream_read_buffer_offset_ = 0;
+ upstream_read_buffer_ = NULL;
+ upstream_read_callback_.Reset();
+}
+
+void SocketPauseBuffer::InvokeCallback(int result) {
+ DCHECK(!upstream_read_callback_.is_null());
+ net::CompletionCallback cb = upstream_read_callback_;
+ ResetUpstreamState();
+ cb.Run(result);
+}
+
+void SocketPauseBuffer::InsureBufferCanHold(int num_bytes) {
+ if (num_bytes > (downstream_read_buffer_->capacity() -
+ downstream_read_buffer_->offset())) {
+ downstream_read_buffer_->SetCapacity(num_bytes +
+ downstream_read_buffer_->offset());
+ }
+}
+
+bool SocketPauseBuffer::Pause() {
+ if (buffering_disabled_) {
+ return false;
+ } else if (!upstream_read_callback_.is_null()) {
+ InvokeCallback(net::ERR_ABORTED);
+ }
+ return true;
+}
+
+void SocketPauseBuffer::DisableBuffering() {
+ buffering_disabled_ = true;
+}
+
+int SocketPauseBuffer::Read(net::IOBuffer* buffer,
+ int buf_len,
+ const net::CompletionCallback& callback) {
+ DVLOG(1) << "Read(" << buf_len << ") START " << StatusDescription();
+
+ if (buffering_disabled_ && BufferedDataCount() == 0) {
+ return downstream_read_cb_.Run(buffer, buf_len, callback);
+ }
+
+ // Can't buffer data and have a pending downstream Read() simultaneously.
+ DCHECK(BufferedDataCount() == 0 || !read_issued_);
+
+ // Only one upstream Read() is allowed at a time.
+ if (!upstream_read_callback_.is_null()) {
+ DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT "
+ << StatusDescription();
+ DCHECK(read_issued_);
+ return net::ERR_INVALID_ARGUMENT; // 'this' is the invalid argument.
+ }
+
+ if (buf_len < 0) {
+ DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT "
+ << StatusDescription();
+ return net::ERR_INVALID_ARGUMENT;
+ }
+
+ if (BufferedDataCount() > 0) {
+ const int num_returned_bytes = std::min(buf_len, BufferedDataCount());
+ ReturnDataInBuffer(buffer, num_returned_bytes);
+ // Release our internal buffer once we're no longer using it.
+ if (buffering_disabled_ && BufferedDataCount() == 0) {
+ downstream_read_buffer_ = nullptr;
+ }
+ return num_returned_bytes;
+ } else {
+ DCHECK(!buffering_disabled_);
+ upstream_read_callback_ = callback;
+ upstream_read_request_size_ = buf_len;
+ upstream_read_buffer_ = buffer;
+ if (!read_issued_) {
+ // Issue a new downstream read for this request.
+ FreeReturnedBufferSpace();
+ InsureBufferCanHold(buf_len);
+ read_issued_ = true;
+ int result = downstream_read_cb_.Run(
+ downstream_read_buffer_.get(), buf_len,
+ base::Bind(&SocketPauseBuffer::ReadComplete, base::Unretained(this)));
+ if (result > 0) {
+ read_issued_ = false;
+ CreditIncomingData(result);
+ ReturnDataInBuffer(buffer, result);
+ ResetUpstreamState();
+ }
+ return result;
+ } else {
+ // We already have a downstream read pending, and now it's plumbed to
+ // invoke the callback argument.
+ return net::ERR_IO_PENDING;
+ }
+ }
+}
+
+void SocketPauseBuffer::ReadComplete(int count) {
+ read_issued_ = false;
+ if (count > 0) {
+ CreditIncomingData(count);
+ }
+ if (upstream_read_callback_.is_null()) {
+ prior_error_code_ = std::min(0, count);
+ } else {
+ int num_returned_bytes = std::min(count, upstream_read_request_size_);
+ ReturnDataInBuffer(upstream_read_buffer_.get(), num_returned_bytes);
+ InvokeCallback(num_returned_bytes);
+ }
+}
+
+BufferingStreamSocket::BufferingStreamSocket(
+ scoped_ptr<net::StreamSocket> socket)
+ : Passthrough<net::StreamSocket>(socket.Pass()),
+ pause_buffer_(base::Bind(&net::StreamSocket::Read,
+ base::Unretained(base_socket()))) {}
+
+BufferingStreamSocket::~BufferingStreamSocket() {}
+
+bool BufferingStreamSocket::Pause() {
+ return pause_buffer_.Pause();
+}
+
+void BufferingStreamSocket::DisableBuffering() {
+ pause_buffer_.DisableBuffering();
+}
+
+int BufferingStreamSocket::Read(net::IOBuffer* buffer,
+ int buf_len,
+ const net::CompletionCallback& callback) {
+ return pause_buffer_.Read(buffer, buf_len, callback);
+}
+
+BufferingTCPClientSocket::BufferingTCPClientSocket(
+ scoped_ptr<net::TCPClientSocket> socket)
+ : Passthrough<net::TCPClientSocket>(socket.Pass()),
+ pause_buffer_(base::Bind(&net::TCPClientSocket::Read,
+ base::Unretained(base_socket()))) {}
+
+BufferingTCPClientSocket::~BufferingTCPClientSocket() {}
+
+bool BufferingTCPClientSocket::Pause() {
+ return pause_buffer_.Pause();
+}
+
+void BufferingTCPClientSocket::DisableBuffering() {
+ pause_buffer_.DisableBuffering();
+}
+
+int BufferingTCPClientSocket::Read(net::IOBuffer* buffer,
+ int buf_len,
+ const net::CompletionCallback& callback) {
+ return pause_buffer_.Read(buffer, buf_len, callback);
+}
+
+bool BufferingTCPClientSocket::SetKeepAlive(bool enable, int delay) {
+ return base_socket()->SetKeepAlive(enable, delay);
+}
+
+bool BufferingTCPClientSocket::SetNoDelay(bool no_delay) {
+ return base_socket()->SetNoDelay(no_delay);
+}
+
TCPSocket::TCPSocket(const std::string& owner_extension_id)
: Socket(owner_extension_id), socket_mode_(UNKNOWN) {}
@@ -49,8 +274,9 @@ TCPSocket::TCPSocket(net::TCPClientSocket* tcp_client_socket,
const std::string& owner_extension_id,
bool is_connected)
: Socket(owner_extension_id),
- socket_(tcp_client_socket),
socket_mode_(CLIENT) {
+ scoped_ptr<net::TCPClientSocket> p(tcp_client_socket);
+ socket_.reset(new BufferingTCPClientSocket(p.Pass()));
this->is_connected_ = is_connected;
}
@@ -94,9 +320,11 @@ void TCPSocket::Connect(const net::AddressList& address,
if (is_connected_)
break;
- socket_.reset(
+ scoped_ptr<net::TCPClientSocket> internal_sock(
new net::TCPClientSocket(address, NULL, net::NetLog::Source()));
+ socket_.reset(new BufferingTCPClientSocket(internal_sock.Pass()));
+
connect_callback_ = callback;
result = socket_->Connect(
base::Bind(&TCPSocket::OnConnectComplete, base::Unretained(this)));
@@ -316,7 +544,7 @@ void TCPSocket::Release() {
ignore_result(socket_.release());
}
-net::TCPClientSocket* TCPSocket::ClientStream() {
+BufferingTCPClientSocket* TCPSocket::ClientStream() {
if (socket_mode_ != CLIENT || GetSocketType() != TYPE_TCP)
return NULL;
return socket_.get();
@@ -340,8 +568,17 @@ ResumableTCPSocket::ResumableTCPSocket(net::TCPClientSocket* tcp_client_socket,
buffer_size_(0),
paused_(false) {}
+ResumableTCPSocket::~ResumableTCPSocket() {}
+
bool ResumableTCPSocket::IsPersistent() const { return persistent(); }
+void ResumableTCPSocket::ApiPauseComplete() {
+ if (!pause_callback_.is_null()) {
+ pause_callback_.Run();
+ pause_callback_.Reset();
+ }
+}
+
ResumableTCPServerSocket::ResumableTCPServerSocket(
const std::string& owner_extension_id)
: TCPSocket(owner_extension_id), persistent_(false), paused_(false) {}

Powered by Google App Engine
This is Rietveld 408576698