Index: jingle/glue/pseudotcp_adapter.cc |
diff --git a/jingle/glue/pseudotcp_adapter.cc b/jingle/glue/pseudotcp_adapter.cc |
index 6ea9fbd5bf19e53e1ad01ac4d689b3196562d525..4590dff5d8d1756fa4e07fc5e3afd9bfbf64734d 100644 |
--- a/jingle/glue/pseudotcp_adapter.cc |
+++ b/jingle/glue/pseudotcp_adapter.cc |
@@ -21,39 +21,106 @@ const uint16 kDefaultMtu = 1280; |
namespace jingle_glue { |
-PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket) |
- : socket_(socket), |
- ALLOW_THIS_IN_INITIALIZER_LIST(pseudotcp_(this, 0)), |
- connect_callback_(NULL), |
+class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify, |
+ public base::RefCounted<Core> { |
+ public: |
+ Core(net::Socket* socket); |
+ virtual ~Core(); |
+ |
+ // Functions used to implement net::StreamSocket. |
+ int Read(net::IOBuffer* buffer, int buffer_size, |
+ net::CompletionCallback* callback); |
+ int Write(net::IOBuffer* buffer, int buffer_size, |
+ net::CompletionCallback* callback); |
+ int Connect(net::CompletionCallback* callback); |
+ void Disconnect(); |
+ bool IsConnected() const; |
+ |
+ // cricket::IPseudoTcpNotify interface. |
+ // These notifications are triggered from NotifyPacket. |
+ virtual void OnTcpOpen(cricket::PseudoTcp* tcp) OVERRIDE; |
+ virtual void OnTcpReadable(cricket::PseudoTcp* tcp) OVERRIDE; |
+ virtual void OnTcpWriteable(cricket::PseudoTcp* tcp) OVERRIDE; |
+ // This is triggered by NotifyClock or NotifyPacket. |
+ virtual void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) OVERRIDE; |
+ // This is triggered by NotifyClock, NotifyPacket, Recv and Send. |
+ virtual WriteResult TcpWritePacket(cricket::PseudoTcp* tcp, |
+ const char* buffer, size_t len) OVERRIDE; |
+ private: |
+ // These are invoked by the underlying Socket, and may trigger callbacks. |
+ // They hold a reference to |this| while running, to protect from deletion. |
+ void OnRead(int result); |
+ void OnWritten(int result); |
+ |
+ // These may trigger callbacks, so the holder must hold a reference on |
+ // the stack while calling them. |
+ void DoReadFromSocket(); |
+ void HandleReadResults(int result); |
+ void HandleTcpClock(); |
+ |
+ // This re-sets |timer| without triggering callbacks. |
+ void AdjustClock(); |
+ |
+ net::CompletionCallback* connect_callback_; |
+ net::CompletionCallback* read_callback_; |
+ net::CompletionCallback* write_callback_; |
+ |
+ cricket::PseudoTcp pseudo_tcp_; |
+ scoped_ptr<net::Socket> socket_; |
+ |
+ scoped_refptr<net::IOBuffer> read_buffer_; |
+ int read_buffer_size_; |
+ scoped_refptr<net::IOBuffer> write_buffer_; |
+ int write_buffer_size_; |
+ |
+ bool socket_write_pending_; |
+ scoped_refptr<net::IOBuffer> socket_read_buffer_; |
+ |
+ net::CompletionCallbackImpl<Core> socket_read_callback_; |
+ net::CompletionCallbackImpl<Core> socket_write_callback_; |
+ |
+ base::OneShotTimer<Core> timer_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(Core); |
+}; |
+ |
+ |
+PseudoTcpAdapter::Core::Core(net::Socket* socket) |
+ : connect_callback_(NULL), |
read_callback_(NULL), |
write_callback_(NULL), |
+ ALLOW_THIS_IN_INITIALIZER_LIST(pseudo_tcp_(this, 0)), |
+ socket_(socket), |
socket_write_pending_(false), |
ALLOW_THIS_IN_INITIALIZER_LIST( |
- socket_read_callback_(this, &PseudoTcpAdapter::OnRead)), |
+ socket_read_callback_(this, &PseudoTcpAdapter::Core::OnRead)), |
ALLOW_THIS_IN_INITIALIZER_LIST( |
- socket_write_callback_(this, &PseudoTcpAdapter::OnWritten)) { |
- pseudotcp_.NotifyMTU(kDefaultMtu); |
+ socket_write_callback_(this, &PseudoTcpAdapter::Core::OnWritten)) { |
+ // Doesn't trigger callbacks. |
+ pseudo_tcp_.NotifyMTU(kDefaultMtu); |
} |
-PseudoTcpAdapter::~PseudoTcpAdapter() { |
+PseudoTcpAdapter::Core::~Core() { |
} |
-int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size, |
- net::CompletionCallback* callback) { |
- DCHECK(CalledOnValidThread()); |
+int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size, |
+ net::CompletionCallback* callback) { |
+ DCHECK(!read_callback_); |
- // Verify that there is no other pending read. |
- DCHECK(read_callback_ == NULL); |
+ // Reference the Core in case a callback deletes the adapter. |
+ scoped_refptr<Core> core(this); |
- PseudoTcp::TcpState state = pseudotcp_.State(); |
+ // TODO(wez): This is a hack for remoting. See JingleSession. |
+ PseudoTcp::TcpState state = pseudo_tcp_.State(); |
int result; |
if (state == PseudoTcp::TCP_SYN_SENT || |
state == PseudoTcp::TCP_SYN_RECEIVED) { |
result = net::ERR_IO_PENDING; |
+ |
} else { |
- result = pseudotcp_.Recv(buffer->data(), buffer_size); |
+ result = pseudo_tcp_.Recv(buffer->data(), buffer_size); |
if (result < 0) { |
- result = net::MapSystemError(pseudotcp_.GetError()); |
+ result = net::MapSystemError(pseudo_tcp_.GetError()); |
DCHECK(result < 0); |
} |
} |
@@ -69,22 +136,24 @@ int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size, |
return result; |
} |
-int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size, |
- net::CompletionCallback* callback) { |
- DCHECK(CalledOnValidThread()); |
+int PseudoTcpAdapter::Core::Write(net::IOBuffer* buffer, int buffer_size, |
+ net::CompletionCallback* callback) { |
+ DCHECK(!write_callback_); |
- // Verify that there is no other pending write. |
- DCHECK(write_callback_ == NULL); |
+ // Reference the Core in case a callback deletes the adapter. |
+ scoped_refptr<Core> core(this); |
- PseudoTcp::TcpState state = pseudotcp_.State(); |
+ // TODO(wez): This is a hack for remoting. See JingleSession. |
+ PseudoTcp::TcpState state = pseudo_tcp_.State(); |
int result; |
if (state == PseudoTcp::TCP_SYN_SENT || |
state == PseudoTcp::TCP_SYN_RECEIVED) { |
result = net::ERR_IO_PENDING; |
+ |
} else { |
- result = pseudotcp_.Send(buffer->data(), buffer_size); |
+ result = pseudo_tcp_.Send(buffer->data(), buffer_size); |
if (result < 0) { |
- result = net::MapSystemError(pseudotcp_.GetError()); |
+ result = net::MapSystemError(pseudo_tcp_.GetError()); |
DCHECK(result < 0); |
} |
} |
@@ -100,114 +169,68 @@ int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size, |
return result; |
} |
-bool PseudoTcpAdapter::SetReceiveBufferSize(int32 size) { |
- DCHECK(CalledOnValidThread()); |
- // TODO(sergeyu): Implement support for adjustable buffer size and |
- // used it here. |
- return false; |
-} |
- |
-bool PseudoTcpAdapter::SetSendBufferSize(int32 size) { |
- DCHECK(CalledOnValidThread()); |
- // TODO(sergeyu): Implement support for adjustable buffer size and |
- // used it here. |
- return false; |
-} |
- |
-int PseudoTcpAdapter::Connect(net::CompletionCallback* callback) { |
- DCHECK(CalledOnValidThread()); |
+int PseudoTcpAdapter::Core::Connect(net::CompletionCallback* callback) { |
+ DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN); |
- // Start reading from the socket. |
- DoReadFromSocket(); |
+ // Reference the Core in case a callback deletes the adapter. |
+ scoped_refptr<Core> core(this); |
- int result = pseudotcp_.Connect(); |
+ // Start the connection attempt. |
+ int result = pseudo_tcp_.Connect(); |
if (result < 0) |
return net::ERR_FAILED; |
AdjustClock(); |
connect_callback_ = callback; |
- return net::ERR_IO_PENDING; |
-} |
- |
-void PseudoTcpAdapter::Disconnect() { |
- DCHECK(CalledOnValidThread()); |
- pseudotcp_.Close(false); |
-} |
- |
-bool PseudoTcpAdapter::IsConnected() const { |
- DCHECK(CalledOnValidThread()); |
- return pseudotcp_.State() == PseudoTcp::TCP_ESTABLISHED; |
-} |
- |
-bool PseudoTcpAdapter::IsConnectedAndIdle() const { |
- DCHECK(CalledOnValidThread()); |
- NOTIMPLEMENTED(); |
- return false; |
-} |
- |
-int PseudoTcpAdapter::GetPeerAddress(net::AddressList* address) const { |
- DCHECK(CalledOnValidThread()); |
- |
- // We actually don't know the peer address. Returning so the upper layers |
- // won't complain. |
- net::IPAddressNumber ip_address(4); |
- *address = net::AddressList::CreateFromIPAddress(ip_address, 0); |
- return net::OK; |
-} |
- |
-int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const { |
- DCHECK(CalledOnValidThread()); |
- NOTIMPLEMENTED(); |
- return net::ERR_FAILED; |
-} |
+ DoReadFromSocket(); |
-const net::BoundNetLog& PseudoTcpAdapter::NetLog() const { |
- DCHECK(CalledOnValidThread()); |
- return net_log_; |
+ return net::ERR_IO_PENDING; |
} |
-void PseudoTcpAdapter::SetSubresourceSpeculation() { |
- DCHECK(CalledOnValidThread()); |
- NOTIMPLEMENTED(); |
-} |
+void PseudoTcpAdapter::Core::Disconnect() { |
+ // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket. |
+ read_callback_ = NULL; |
+ read_buffer_ = NULL; |
+ write_callback_ = NULL; |
+ write_buffer_ = NULL; |
+ connect_callback_ = NULL; |
-void PseudoTcpAdapter::SetOmniboxSpeculation() { |
- DCHECK(CalledOnValidThread()); |
- NOTIMPLEMENTED(); |
+ // TODO(wez): Connect should succeed if called after Disconnect, which |
+ // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp |
+ // and create a new one in Connect. |
+ // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other |
+ // effect. This should be addressed in PseudoTcp, really. |
+ // In the meantime we can fake OnTcpClosed notification and tear down the |
+ // PseudoTcp. |
+ pseudo_tcp_.Close(true); |
} |
-bool PseudoTcpAdapter::WasEverUsed() const { |
- DCHECK(CalledOnValidThread()); |
- NOTIMPLEMENTED(); |
- return true; |
+bool PseudoTcpAdapter::Core::IsConnected() const { |
+ return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED; |
} |
-bool PseudoTcpAdapter::UsingTCPFastOpen() const { |
- DCHECK(CalledOnValidThread()); |
- return false; |
-} |
+void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) { |
+ DCHECK(tcp == &pseudo_tcp_); |
-void PseudoTcpAdapter::OnTcpOpen(PseudoTcp* tcp) { |
- DCHECK(CalledOnValidThread()); |
- DCHECK(connect_callback_); |
- connect_callback_->Run(net::OK); |
- connect_callback_ = NULL; |
+ if (connect_callback_) { |
+ net::CompletionCallback* callback = connect_callback_; |
+ connect_callback_ = NULL; |
+ callback->Run(net::OK); |
+ } |
OnTcpReadable(tcp); |
OnTcpWriteable(tcp); |
} |
-void PseudoTcpAdapter::OnTcpReadable(PseudoTcp* tcp) { |
- DCHECK(CalledOnValidThread()); |
- |
- if (!read_buffer_) |
+void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) { |
+ DCHECK_EQ(tcp, &pseudo_tcp_); |
+ if (!read_callback_) |
return; |
- // Try to send the data we have pending. |
- int result = pseudotcp_.Recv(read_buffer_->data(), read_buffer_size_); |
+ int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_); |
if (result < 0) { |
- result = net::MapSystemError(pseudotcp_.GetError()); |
+ result = net::MapSystemError(pseudo_tcp_.GetError()); |
DCHECK(result < 0); |
if (result == net::ERR_IO_PENDING) |
return; |
@@ -215,22 +238,20 @@ void PseudoTcpAdapter::OnTcpReadable(PseudoTcp* tcp) { |
AdjustClock(); |
- net::CompletionCallback* cb = read_callback_; |
+ net::CompletionCallback* callback = read_callback_; |
read_callback_ = NULL; |
read_buffer_ = NULL; |
- cb->Run(result); |
+ callback->Run(result); |
} |
-void PseudoTcpAdapter::OnTcpWriteable(PseudoTcp* tcp) { |
- DCHECK(CalledOnValidThread()); |
- |
- if (!write_buffer_) |
+void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) { |
+ DCHECK_EQ(tcp, &pseudo_tcp_); |
+ if (!write_callback_) |
return; |
- // Try to send the data we have pending. |
- int result = pseudotcp_.Send(write_buffer_->data(), write_buffer_size_); |
+ int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_); |
if (result < 0) { |
- result = net::MapSystemError(pseudotcp_.GetError()); |
+ result = net::MapSystemError(pseudo_tcp_.GetError()); |
DCHECK(result < 0); |
if (result == net::ERR_IO_PENDING) |
return; |
@@ -238,36 +259,39 @@ void PseudoTcpAdapter::OnTcpWriteable(PseudoTcp* tcp) { |
AdjustClock(); |
- net::CompletionCallback* cb = write_callback_; |
+ net::CompletionCallback* callback = write_callback_; |
write_callback_ = NULL; |
write_buffer_ = NULL; |
- cb->Run(result); |
+ callback->Run(result); |
} |
-void PseudoTcpAdapter::OnTcpClosed(PseudoTcp* tcp, uint32 error) { |
- DCHECK(CalledOnValidThread()); |
+void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) { |
+ DCHECK_EQ(tcp, &pseudo_tcp_); |
if (connect_callback_) { |
- connect_callback_->Run(net::MapSystemError(error)); |
+ net::CompletionCallback* callback = connect_callback_; |
connect_callback_ = NULL; |
+ callback->Run(net::MapSystemError(error)); |
} |
if (read_callback_) { |
- read_callback_->Run(net::MapSystemError(error)); |
+ net::CompletionCallback* callback = read_callback_; |
read_callback_ = NULL; |
+ callback->Run(net::MapSystemError(error)); |
} |
if (write_callback_) { |
- write_callback_->Run(net::MapSystemError(error)); |
+ net::CompletionCallback* callback = write_callback_; |
write_callback_ = NULL; |
+ callback->Run(net::MapSystemError(error)); |
} |
} |
-cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::TcpWritePacket( |
+cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket( |
PseudoTcp* tcp, |
const char* buffer, |
size_t len) { |
- DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(tcp, &pseudo_tcp_); |
if (socket_write_pending_) |
return IPseudoTcpNotify::WR_SUCCESS; |
@@ -288,10 +312,9 @@ cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::TcpWritePacket( |
} |
} |
-void PseudoTcpAdapter::DoReadFromSocket() { |
- if (!socket_read_buffer_) { |
+void PseudoTcpAdapter::Core::DoReadFromSocket() { |
+ if (!socket_read_buffer_) |
socket_read_buffer_ = new net::IOBuffer(kReadBufferSize); |
- } |
while (true) { |
int result = socket_->Read(socket_read_buffer_, kReadBufferSize, |
@@ -303,41 +326,154 @@ void PseudoTcpAdapter::DoReadFromSocket() { |
} |
} |
-void PseudoTcpAdapter::HandleReadResults(int result) { |
+void PseudoTcpAdapter::Core::HandleReadResults(int result) { |
if (result <= 0) { |
LOG(ERROR) << "Read returned " << result; |
return; |
} |
- pseudotcp_.NotifyPacket(socket_read_buffer_->data(), result); |
+ // TODO(wez): Disconnect on failure of NotifyPacket? |
+ pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result); |
AdjustClock(); |
} |
-void PseudoTcpAdapter::OnRead(int result) { |
+void PseudoTcpAdapter::Core::OnRead(int result) { |
+ // Reference the Core in case a callback deletes the adapter. |
+ scoped_refptr<Core> core(this); |
+ |
HandleReadResults(result); |
if (result >= 0) |
DoReadFromSocket(); |
} |
-void PseudoTcpAdapter::OnWritten(int result) { |
+void PseudoTcpAdapter::Core::OnWritten(int result) { |
+ // Reference the Core in case a callback deletes the adapter. |
+ scoped_refptr<Core> core(this); |
+ |
socket_write_pending_ = false; |
if (result < 0) { |
LOG(WARNING) << "Write failed. Error code: " << result; |
} |
} |
-void PseudoTcpAdapter::AdjustClock() { |
+void PseudoTcpAdapter::Core::AdjustClock() { |
long timeout = 0; |
- if (pseudotcp_.GetNextClock(PseudoTcp::Now(), timeout)) { |
+ if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) { |
timer_.Stop(); |
timer_.Start(base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this, |
- &PseudoTcpAdapter::HandleTcpClock); |
+ &PseudoTcpAdapter::Core::HandleTcpClock); |
} |
} |
-void PseudoTcpAdapter::HandleTcpClock() { |
- pseudotcp_.NotifyClock(PseudoTcp::Now()); |
+void PseudoTcpAdapter::Core::HandleTcpClock() { |
+ // Reference the Core in case a callback deletes the adapter. |
+ scoped_refptr<Core> core(this); |
+ |
+ pseudo_tcp_.NotifyClock(PseudoTcp::Now()); |
AdjustClock(); |
} |
+// Public interface implemention. |
+ |
+PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket) |
+ : core_(new Core(socket)) { |
+} |
+ |
+PseudoTcpAdapter::~PseudoTcpAdapter() { |
+ Disconnect(); |
+} |
+ |
+int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size, |
+ net::CompletionCallback* callback) { |
+ DCHECK(CalledOnValidThread()); |
+ return core_->Read(buffer, buffer_size, callback); |
+} |
+ |
+int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size, |
+ net::CompletionCallback* callback) { |
+ DCHECK(CalledOnValidThread()); |
+ return core_->Write(buffer, buffer_size, callback); |
+} |
+ |
+bool PseudoTcpAdapter::SetReceiveBufferSize(int32 size) { |
+ DCHECK(CalledOnValidThread()); |
+ // TODO(sergeyu): Implement support for adjustable buffer size and |
+ // used it here. |
+ return false; |
+} |
+ |
+bool PseudoTcpAdapter::SetSendBufferSize(int32 size) { |
+ DCHECK(CalledOnValidThread()); |
+ // TODO(sergeyu): Implement support for adjustable buffer size and |
+ // used it here. |
+ return false; |
+} |
+ |
+int PseudoTcpAdapter::Connect(net::CompletionCallback* callback) { |
+ DCHECK(CalledOnValidThread()); |
+ |
+ // net::StreamSocket requires that Connect return OK if already connected. |
+ if (IsConnected()) |
+ return net::OK; |
+ |
+ return core_->Connect(callback); |
+} |
+ |
+void PseudoTcpAdapter::Disconnect() { |
+ DCHECK(CalledOnValidThread()); |
+ core_->Disconnect(); |
+} |
+ |
+bool PseudoTcpAdapter::IsConnected() const { |
+ return core_->IsConnected(); |
+} |
+ |
+bool PseudoTcpAdapter::IsConnectedAndIdle() const { |
+ DCHECK(CalledOnValidThread()); |
+ NOTIMPLEMENTED(); |
+ return false; |
+} |
+ |
+int PseudoTcpAdapter::GetPeerAddress(net::AddressList* address) const { |
+ DCHECK(CalledOnValidThread()); |
+ |
+ // We actually don't know the peer address. Returning so the upper layers |
+ // won't complain. |
+ net::IPAddressNumber ip_address(4); |
+ *address = net::AddressList::CreateFromIPAddress(ip_address, 0); |
+ return net::OK; |
+} |
+ |
+int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const { |
+ DCHECK(CalledOnValidThread()); |
+ NOTIMPLEMENTED(); |
+ return net::ERR_FAILED; |
+} |
+ |
+const net::BoundNetLog& PseudoTcpAdapter::NetLog() const { |
+ DCHECK(CalledOnValidThread()); |
+ return net_log_; |
+} |
+ |
+void PseudoTcpAdapter::SetSubresourceSpeculation() { |
+ DCHECK(CalledOnValidThread()); |
+ NOTIMPLEMENTED(); |
+} |
+ |
+void PseudoTcpAdapter::SetOmniboxSpeculation() { |
+ DCHECK(CalledOnValidThread()); |
+ NOTIMPLEMENTED(); |
+} |
+ |
+bool PseudoTcpAdapter::WasEverUsed() const { |
+ DCHECK(CalledOnValidThread()); |
+ NOTIMPLEMENTED(); |
+ return true; |
+} |
+ |
+bool PseudoTcpAdapter::UsingTCPFastOpen() const { |
+ DCHECK(CalledOnValidThread()); |
+ return false; |
+} |
+ |
} // namespace jingle_glue |