| Index: extensions/browser/api/socket/tcp_socket.cc
|
| diff --git a/extensions/browser/api/socket/tcp_socket.cc b/extensions/browser/api/socket/tcp_socket.cc
|
| index 8d5257b45c3007ba9e7c0bb2bde5dc6efb2792f6..764ed6ed7bafeecbdae8776db82cf9aba89cb99b 100644
|
| --- a/extensions/browser/api/socket/tcp_socket.cc
|
| +++ b/extensions/browser/api/socket/tcp_socket.cc
|
| @@ -4,6 +4,11 @@
|
|
|
| #include "extensions/browser/api/socket/tcp_socket.h"
|
|
|
| +#include <ctype.h>
|
| +#include <algorithm>
|
| +#include <iomanip>
|
| +#include <sstream>
|
| +
|
| #include "base/lazy_instance.h"
|
| #include "base/logging.h"
|
| #include "base/macros.h"
|
| @@ -42,6 +47,226 @@ ApiResourceManager<ResumableTCPServerSocket>::GetFactoryInstance() {
|
| return g_server_factory.Pointer();
|
| }
|
|
|
| +SocketPauseBuffer::SocketPauseBuffer(
|
| + const SocketPauseBuffer::DownstreamReadCallback& callback)
|
| + : read_issued_(false),
|
| + downstream_read_cb_(callback),
|
| + downstream_read_buffer_(new net::GrowableIOBuffer),
|
| + upstream_data_returned_(0),
|
| + upstream_read_buffer_offset_(0),
|
| + prior_error_code_(0),
|
| + buffering_disabled_(false) {}
|
| +
|
| +SocketPauseBuffer::~SocketPauseBuffer() {}
|
| +
|
| +std::string SocketPauseBuffer::StatusDescription() {
|
| + std::stringstream status;
|
| + status << "\n";
|
| + status << static_cast<void*>(this);
|
| + status << " < " << upstream_data_returned_ << " | "
|
| + << downstream_read_buffer_->offset() << " | "
|
| + << downstream_read_buffer_->capacity() << ">: "
|
| + << ", buffered: " << BufferedDataCount() << ", "
|
| + << (read_issued_ ? "READING" : "not reading") << ", cb: "
|
| + << (upstream_read_callback_.is_null() ? "null" : "not null");
|
| + return status.str();
|
| +}
|
| +
|
| +int SocketPauseBuffer::BufferedDataCount() const {
|
| + return downstream_read_buffer_->offset() - upstream_data_returned_;
|
| +}
|
| +
|
| +void SocketPauseBuffer::CreditIncomingData(int count) {
|
| + DCHECK_GE(count, 0);
|
| + downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() +
|
| + count);
|
| +}
|
| +
|
| +void SocketPauseBuffer::ReturnDataInBuffer(net::IOBuffer* dest, int count) {
|
| + DCHECK_GE(count, 0);
|
| + memcpy(dest->data(),
|
| + downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_,
|
| + count);
|
| + upstream_data_returned_ += count;
|
| +}
|
| +
|
| +void SocketPauseBuffer::FreeReturnedBufferSpace() {
|
| + DCHECK(!read_issued_);
|
| + const int amount_to_free = upstream_data_returned_;
|
| +
|
| + memmove(downstream_read_buffer_->StartOfBuffer(),
|
| + downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_,
|
| + amount_to_free);
|
| + downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() -
|
| + amount_to_free);
|
| + upstream_data_returned_ -= amount_to_free;
|
| +}
|
| +
|
| +// Called whenever the client's callback (either upstream_read_callback_ or
|
| +// the |callback| argument to Read()) is invoked. Clears out state relevant
|
| +// to the Read.
|
| +void SocketPauseBuffer::ResetUpstreamState() {
|
| + upstream_read_buffer_offset_ = 0;
|
| + upstream_read_buffer_ = NULL;
|
| + upstream_read_callback_.Reset();
|
| +}
|
| +
|
| +void SocketPauseBuffer::InvokeCallback(int result) {
|
| + DCHECK(!upstream_read_callback_.is_null());
|
| + net::CompletionCallback cb = upstream_read_callback_;
|
| + ResetUpstreamState();
|
| + cb.Run(result);
|
| +}
|
| +
|
| +void SocketPauseBuffer::InsureBufferCanHold(int num_bytes) {
|
| + if (num_bytes > (downstream_read_buffer_->capacity() -
|
| + downstream_read_buffer_->offset())) {
|
| + downstream_read_buffer_->SetCapacity(num_bytes +
|
| + downstream_read_buffer_->offset());
|
| + }
|
| +}
|
| +
|
| +bool SocketPauseBuffer::Pause() {
|
| + if (buffering_disabled_) {
|
| + return false;
|
| + } else if (!upstream_read_callback_.is_null()) {
|
| + InvokeCallback(net::ERR_ABORTED);
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +void SocketPauseBuffer::DisableBuffering() {
|
| + buffering_disabled_ = true;
|
| +}
|
| +
|
| +int SocketPauseBuffer::Read(net::IOBuffer* buffer,
|
| + int buf_len,
|
| + const net::CompletionCallback& callback) {
|
| + DVLOG(1) << "Read(" << buf_len << ") START " << StatusDescription();
|
| +
|
| + if (buffering_disabled_ && BufferedDataCount() == 0) {
|
| + return downstream_read_cb_.Run(buffer, buf_len, callback);
|
| + }
|
| +
|
| + // Can't buffer data and have a pending downstream Read() simultaneously.
|
| + DCHECK(BufferedDataCount() == 0 || !read_issued_);
|
| +
|
| + // Only one upstream Read() is allowed at a time.
|
| + if (!upstream_read_callback_.is_null()) {
|
| + DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT "
|
| + << StatusDescription();
|
| + DCHECK(read_issued_);
|
| + return net::ERR_INVALID_ARGUMENT; // 'this' is the invalid argument.
|
| + }
|
| +
|
| + if (buf_len < 0) {
|
| + DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT "
|
| + << StatusDescription();
|
| + return net::ERR_INVALID_ARGUMENT;
|
| + }
|
| +
|
| + if (BufferedDataCount() > 0) {
|
| + const int num_returned_bytes = std::min(buf_len, BufferedDataCount());
|
| + ReturnDataInBuffer(buffer, num_returned_bytes);
|
| + // Release our internal buffer once we're no longer using it.
|
| + if (buffering_disabled_ && BufferedDataCount() == 0) {
|
| + downstream_read_buffer_ = nullptr;
|
| + }
|
| + return num_returned_bytes;
|
| + } else {
|
| + DCHECK(!buffering_disabled_);
|
| + upstream_read_callback_ = callback;
|
| + upstream_read_request_size_ = buf_len;
|
| + upstream_read_buffer_ = buffer;
|
| + if (!read_issued_) {
|
| + // Issue a new downstream read for this request.
|
| + FreeReturnedBufferSpace();
|
| + InsureBufferCanHold(buf_len);
|
| + read_issued_ = true;
|
| + int result = downstream_read_cb_.Run(
|
| + downstream_read_buffer_.get(), buf_len,
|
| + base::Bind(&SocketPauseBuffer::ReadComplete, base::Unretained(this)));
|
| + if (result > 0) {
|
| + read_issued_ = false;
|
| + CreditIncomingData(result);
|
| + ReturnDataInBuffer(buffer, result);
|
| + ResetUpstreamState();
|
| + }
|
| + return result;
|
| + } else {
|
| + // We already have a downstream read pending, and now it's plumbed to
|
| + // invoke the callback argument.
|
| + return net::ERR_IO_PENDING;
|
| + }
|
| + }
|
| +}
|
| +
|
| +void SocketPauseBuffer::ReadComplete(int count) {
|
| + read_issued_ = false;
|
| + if (count > 0) {
|
| + CreditIncomingData(count);
|
| + }
|
| + if (upstream_read_callback_.is_null()) {
|
| + prior_error_code_ = std::min(0, count);
|
| + } else {
|
| + int num_returned_bytes = std::min(count, upstream_read_request_size_);
|
| + ReturnDataInBuffer(upstream_read_buffer_.get(), num_returned_bytes);
|
| + InvokeCallback(num_returned_bytes);
|
| + }
|
| +}
|
| +
|
| +BufferingStreamSocket::BufferingStreamSocket(
|
| + scoped_ptr<net::StreamSocket> socket)
|
| + : Passthrough<net::StreamSocket>(socket.Pass()),
|
| + pause_buffer_(base::Bind(&net::StreamSocket::Read,
|
| + base::Unretained(base_socket()))) {}
|
| +
|
| +BufferingStreamSocket::~BufferingStreamSocket() {}
|
| +
|
| +bool BufferingStreamSocket::Pause() {
|
| + return pause_buffer_.Pause();
|
| +}
|
| +
|
| +void BufferingStreamSocket::DisableBuffering() {
|
| + pause_buffer_.DisableBuffering();
|
| +}
|
| +
|
| +int BufferingStreamSocket::Read(net::IOBuffer* buffer,
|
| + int buf_len,
|
| + const net::CompletionCallback& callback) {
|
| + return pause_buffer_.Read(buffer, buf_len, callback);
|
| +}
|
| +
|
| +BufferingTCPClientSocket::BufferingTCPClientSocket(
|
| + scoped_ptr<net::TCPClientSocket> socket)
|
| + : Passthrough<net::TCPClientSocket>(socket.Pass()),
|
| + pause_buffer_(base::Bind(&net::TCPClientSocket::Read,
|
| + base::Unretained(base_socket()))) {}
|
| +
|
| +BufferingTCPClientSocket::~BufferingTCPClientSocket() {}
|
| +
|
| +bool BufferingTCPClientSocket::Pause() {
|
| + return pause_buffer_.Pause();
|
| +}
|
| +
|
| +void BufferingTCPClientSocket::DisableBuffering() {
|
| + pause_buffer_.DisableBuffering();
|
| +}
|
| +
|
| +int BufferingTCPClientSocket::Read(net::IOBuffer* buffer,
|
| + int buf_len,
|
| + const net::CompletionCallback& callback) {
|
| + return pause_buffer_.Read(buffer, buf_len, callback);
|
| +}
|
| +
|
| +bool BufferingTCPClientSocket::SetKeepAlive(bool enable, int delay) {
|
| + return base_socket()->SetKeepAlive(enable, delay);
|
| +}
|
| +
|
| +bool BufferingTCPClientSocket::SetNoDelay(bool no_delay) {
|
| + return base_socket()->SetNoDelay(no_delay);
|
| +}
|
| +
|
| TCPSocket::TCPSocket(const std::string& owner_extension_id)
|
| : Socket(owner_extension_id), socket_mode_(UNKNOWN) {}
|
|
|
| @@ -49,8 +274,9 @@ TCPSocket::TCPSocket(net::TCPClientSocket* tcp_client_socket,
|
| const std::string& owner_extension_id,
|
| bool is_connected)
|
| : Socket(owner_extension_id),
|
| - socket_(tcp_client_socket),
|
| socket_mode_(CLIENT) {
|
| + scoped_ptr<net::TCPClientSocket> p(tcp_client_socket);
|
| + socket_.reset(new BufferingTCPClientSocket(p.Pass()));
|
| this->is_connected_ = is_connected;
|
| }
|
|
|
| @@ -94,9 +320,11 @@ void TCPSocket::Connect(const net::AddressList& address,
|
| if (is_connected_)
|
| break;
|
|
|
| - socket_.reset(
|
| + scoped_ptr<net::TCPClientSocket> internal_sock(
|
| new net::TCPClientSocket(address, NULL, net::NetLog::Source()));
|
|
|
| + socket_.reset(new BufferingTCPClientSocket(internal_sock.Pass()));
|
| +
|
| connect_callback_ = callback;
|
| result = socket_->Connect(
|
| base::Bind(&TCPSocket::OnConnectComplete, base::Unretained(this)));
|
| @@ -316,7 +544,7 @@ void TCPSocket::Release() {
|
| ignore_result(socket_.release());
|
| }
|
|
|
| -net::TCPClientSocket* TCPSocket::ClientStream() {
|
| +BufferingTCPClientSocket* TCPSocket::ClientStream() {
|
| if (socket_mode_ != CLIENT || GetSocketType() != TYPE_TCP)
|
| return NULL;
|
| return socket_.get();
|
| @@ -340,8 +568,17 @@ ResumableTCPSocket::ResumableTCPSocket(net::TCPClientSocket* tcp_client_socket,
|
| buffer_size_(0),
|
| paused_(false) {}
|
|
|
| +ResumableTCPSocket::~ResumableTCPSocket() {}
|
| +
|
| bool ResumableTCPSocket::IsPersistent() const { return persistent(); }
|
|
|
| +void ResumableTCPSocket::ApiPauseComplete() {
|
| + if (!pause_callback_.is_null()) {
|
| + pause_callback_.Run();
|
| + pause_callback_.Reset();
|
| + }
|
| +}
|
| +
|
| ResumableTCPServerSocket::ResumableTCPServerSocket(
|
| const std::string& owner_extension_id)
|
| : TCPSocket(owner_extension_id), persistent_(false), paused_(false) {}
|
|
|