Index: extensions/browser/api/socket/tcp_socket.h |
diff --git a/extensions/browser/api/socket/tcp_socket.h b/extensions/browser/api/socket/tcp_socket.h |
index 40dd5fd7fd9163ce1e1de75937c689e7b2e61445..4216d467c0dc9e7943cc7a7d04449555ee4ba96a 100644 |
--- a/extensions/browser/api/socket/tcp_socket.h |
+++ b/extensions/browser/api/socket/tcp_socket.h |
@@ -7,6 +7,7 @@ |
#include <string> |
+#include "extensions/browser/api/socket/passthrough.h" |
#include "extensions/browser/api/socket/socket.h" |
// This looks like it should be forward-declarable, but it does some tricky |
@@ -19,6 +20,180 @@ class Socket; |
} |
namespace extensions { |
+typedef base::Callback<void(void)> PauseCompletionCallback; |
Ken Rockot(use gerrit already)
2015/12/15 17:17:49
nit: please use a type alias instead of typedef no
|
+ |
+// A wrapper around an existing TCPClientSocket with an internal read buffer |
+// to support immediate pausing. If this socket is paused while it still has |
+// a pending Read() on its wrapped socket, this socket will buffer the data |
+// returned. Further invocations of BufferingStreamSocket::Read() will |
+// first return all data from the internal buffer, before causing Read()s on |
+// the underlying TCP socket. Other than the buffering Read() behavior, |
+// BufferingStreamSocket acts identically to the net::TCPClientSocket that |
+// it wraps. |
+class SocketPauseBuffer { |
Ken Rockot(use gerrit already)
2015/12/15 17:17:49
This class should be in its own header and cc file
|
+ public: |
+ typedef base::Callback< |
Ken Rockot(use gerrit already)
2015/12/15 17:17:49
nit: type alias
|
+ int(net::IOBuffer*, int, const net::CompletionCallback&)> |
+ DownstreamReadCallback; |
+ |
+ // Pass in the Read() method of the underlying socket that needs pausing |
+ // added to it. |
+ explicit SocketPauseBuffer( |
+ const DownstreamReadCallback& downstream_read_callback); |
+ ~SocketPauseBuffer(); |
+ |
+ // Return from any pending Read() immediately, invoking the callback. |
+ // BufferingStreamSocket will then buffer data received from any pending |
+ // Read() that it may have invoked on the underlying socket. Subsequent |
+ // Read() calls will have their callbacks posted immediately with data from |
+ // the buffer until it is drained. When there is no buffered data, Read() |
+ // acts exactly as net::TCPClientSocket::Read() until there is another |
+ // invocation to Pause(). Returns true unless DisableBuffering() has been |
+ // called. |
+ bool Pause(); |
+ |
+ // Turn into a pure pass-through object, with no internal buffer and no |
+ // ability to Pause() successfully. Any pending Read() invocation is |
+ // unaffected, preserving its internal buffer until that Read() completes. |
+ void DisableBuffering(); |
+ |
+ // This Read() can return early when pausing via Pause(). It will return |
+ // (via callback) ERR_ABORTED. Without interference from Pause(), Read() |
+ // acts identically to net::TCPClientSocket::Read(). |
+ int Read(net::IOBuffer* buffer, |
+ int buf_len, |
+ const net::CompletionCallback& callback); |
+ |
+ private: |
+ // Callback passed to downstream socket's Read(). |
+ void ReadComplete(int count); |
+ |
+ // Buffer state printer. |
+ std::string StatusDescription(); |
+ |
+ // 'Buffered' in the diagram below. |
+ int BufferedDataCount() const; |
+ |
+ // Records that |num_incoming_bytes| has been added to the buffered amount. |
+ void CreditIncomingData(int num_incoming_bytes); |
+ |
+ // Copies data from downstream_read_buffer_ to |dest|, and marks that data |
+ // as having been returned upstream. |
+ void ReturnDataInBuffer(net::IOBuffer* dest, int num_outgoing_bytes); |
+ |
+ // Shift the buffered data left, freeing buffer space of data that's |
+ // already been given back upstream. |
+ void FreeReturnedBufferSpace(); |
+ |
+ // Expands buffer if there isn't |num_bytes| of open space left. |
+ void InsureBufferCanHold(int num_bytes); |
Ken Rockot(use gerrit already)
2015/12/15 17:17:49
nit: s/Insure/Ensure/
|
+ |
+ // Flush upstream state, usually after having returned a value without |
+ // invoking the callback. |
+ void ResetUpstreamState(); |
+ |
+ // Will also reset upstream state, and re-entrantly invoke the callback. |
+ void InvokeCallback(int result); |
+ |
+ // Whether a downstream read has been issued with an expected invocation of |
+ // ReadComplete(). |
+ bool read_issued_; |
+ |
+ // Terminology: clients of BufferingStreamSocket are 'upstream', and |
+ // BufferingStreamSocket is a client of downstream_socket_. |
Ken Rockot(use gerrit already)
2015/12/15 17:17:49
downstream_socket_ doesn't exist
|
+ DownstreamReadCallback downstream_read_cb_; |
+ // scoped_ptr<net::StreamSocket> downstream_socket_; |
Ken Rockot(use gerrit already)
2015/12/15 17:17:49
Remove this?
|
+ |
+ // A GrowableIOBuffer that BufferingStreamSocket::Read passes |
+ // downstream. The offset state is used to store where in the buffer has |
+ // already been stored, and where data from the next pending read will go. |
+ // |
+ // The buffer's state has this structure: |
+ // 0 offset() |
+ // +------------------+----------+--------------------------------------+ |
+ // | already returned | buffered | open space for other downstream reads| |
+ // +------------------+----------+--------------------------------------+ |
+ // ^ upstream_data_returned_ |
+ // |
+ // These are represented as integers, counting bytes of buffer. The amount |
+ // already returned is kept in upstream_data_returned_. The end of the |
+ // buffered region is stored in downstream_read_buffer_->offset(). The |
+ // remaining open space is the remaining |capacity| of |
+ // downstream_read_buffer_. The buffer is compacted (e.g., already |
+ // returned data flushed) before every downstream Read(). |
+ scoped_refptr<net::GrowableIOBuffer> downstream_read_buffer_; |
+ |
+ // The amount of data in downstream_read_buffer_ that's already been |
+ // returned upstream. |
+ int upstream_data_returned_; |
+ int upstream_read_buffer_offset_; |
+ // A saved error code when a downstream read returns one, and there is no |
+ // callback to pass it back to. |
+ int prior_error_code_; |
+ |
+ // The callback, IOBuffer, and requested read amount passed into |
+ // BufferingStreamSocket::Read. |
+ net::CompletionCallback upstream_read_callback_; |
+ scoped_refptr<net::IOBuffer> upstream_read_buffer_; |
+ int upstream_read_request_size_; |
+ bool buffering_disabled_; |
+}; |
+ |
+// Passthrough to a StreamSocket, but integrates a SocketPauseBuffer for |
+// reading. |
+class BufferingStreamSocket : public Passthrough<net::StreamSocket> { |
Ken Rockot(use gerrit already)
2015/12/15 17:17:48
This class should have its own header and cc
|
+ public: |
+ explicit BufferingStreamSocket(scoped_ptr<net::StreamSocket> socket); |
+ ~BufferingStreamSocket() override; |
+ |
+ bool Pause(); |
+ void DisableBuffering(); |
+ |
+ // This Read() can return early when pausing via Pause(). It will return |
+ // (via callback) ERR_ABORTED. Without interference from Pause(), Read() |
+ // acts identically to net::TCPClientSocket::Read(). |
+ int Read(net::IOBuffer* buffer, |
+ int buf_len, |
+ const net::CompletionCallback& callback) override; |
+ |
+ private: |
+ SocketPauseBuffer pause_buffer_; |
+}; |
+ |
+template <> |
+class DefaultInitializer<net::TCPClientSocket> : public net::TCPClientSocket { |
+ public: |
+ DefaultInitializer() |
+ : net::TCPClientSocket(net::AddressList(), |
+ nullptr, |
+ net::NetLog::Source()) {} |
+}; |
+ |
+// Passthrough to a TCPClientSocket, but integrates a SocketPauseBuffer for |
+// reading. |
+class BufferingTCPClientSocket : public Passthrough<net::TCPClientSocket> { |
Ken Rockot(use gerrit already)
2015/12/15 17:17:49
This class should have its own header and cc
|
+ public: |
+ explicit BufferingTCPClientSocket(scoped_ptr<net::TCPClientSocket> socket); |
+ ~BufferingTCPClientSocket() override; |
+ |
+ bool Pause(); |
+ void DisableBuffering(); |
+ |
+ // This Read() can return early when pausing via Pause(). It will return |
+ // (via callback) ERR_ABORTED. Without interference from Pause(), Read() |
+ // acts identically to net::TCPClientSocket::Read(). |
+ int Read(net::IOBuffer* buffer, |
+ int buf_len, |
+ const net::CompletionCallback& callback) override; |
+ |
+ // Additional pass-through APIs for TCPClientSocket (vs Passthrough<>'s |
+ // implementation of just StreamSocket. |
+ bool SetKeepAlive(bool enable, int delay) override; |
+ bool SetNoDelay(bool no_delay) override; |
+ |
+ private: |
+ SocketPauseBuffer pause_buffer_; |
+}; |
class TCPSocket : public Socket { |
public: |
@@ -69,7 +244,7 @@ class TCPSocket : public Socket { |
// Returns NULL if GetSocketType() isn't TYPE_TCP or if the connection |
// wasn't set up via Connect() (vs Listen()/Accept()). |
- net::TCPClientSocket* ClientStream(); |
+ virtual BufferingTCPClientSocket* ClientStream(); |
// Whether a Read() has been issued, that hasn't come back yet. |
bool HasPendingRead() const; |
@@ -88,7 +263,7 @@ class TCPSocket : public Socket { |
TCPSocket(net::TCPServerSocket* tcp_server_socket, |
const std::string& owner_extension_id); |
- scoped_ptr<net::TCPClientSocket> socket_; |
+ scoped_ptr<BufferingTCPClientSocket> socket_; |
scoped_ptr<net::TCPServerSocket> server_socket_; |
enum SocketMode { UNKNOWN = 0, CLIENT, SERVER, }; |
@@ -112,6 +287,8 @@ class ResumableTCPSocket : public TCPSocket { |
const std::string& owner_extension_id, |
bool is_connected); |
+ ~ResumableTCPSocket() override; |
+ |
// Overriden from ApiResource |
bool IsPersistent() const override; |
@@ -127,6 +304,15 @@ class ResumableTCPSocket : public TCPSocket { |
bool paused() const { return paused_; } |
void set_paused(bool paused) { paused_ = paused; } |
+ void set_api_pause_complete_callback( |
+ const PauseCompletionCallback& callback) { |
+ pause_callback_ = callback; |
+ } |
+ |
+ // Tell this socket that it's pause has actually finished, and it can |
+ // invoke pause callback. |
+ void ApiPauseComplete(); |
Ken Rockot(use gerrit already)
2015/12/15 17:17:49
nit: How about NotifyPauseComplete?
|
+ |
private: |
friend class ApiResourceManager<ResumableTCPSocket>; |
static const char* service_name() { return "ResumableTCPSocketManager"; } |
@@ -141,6 +327,8 @@ class ResumableTCPSocket : public TCPSocket { |
// Flag indicating whether a connected socket blocks its peer from sending |
// more data - see sockets_tcp.idl. |
bool paused_; |
+ // An optional callback to save and invoke from ResumableTCPServerSocket. |
+ PauseCompletionCallback pause_callback_; |
}; |
// TCP Socket instances from the "sockets.tcpServer" namespace. These are |