Index: mojo/services/network/tcp_connected_socket_impl.cc |
diff --git a/mojo/services/network/tcp_connected_socket_impl.cc b/mojo/services/network/tcp_connected_socket_impl.cc |
index 0c6f2ebd4874684fccdc263b363f6d8479beb105..ecd174c6e5bdbe2d71686549e96859f97d5ebfdd 100644 |
--- a/mojo/services/network/tcp_connected_socket_impl.cc |
+++ b/mojo/services/network/tcp_connected_socket_impl.cc |
@@ -4,6 +4,10 @@ |
#include "mojo/services/network/tcp_connected_socket_impl.h" |
+#include "base/message_loop/message_loop.h" |
+#include "mojo/services/network/net_adapters.h" |
+#include "net/base/net_errors.h" |
+ |
namespace mojo { |
TCPConnectedSocketImpl::TCPConnectedSocketImpl( |
@@ -12,10 +16,148 @@ TCPConnectedSocketImpl::TCPConnectedSocketImpl( |
ScopedDataPipeProducerHandle receive_stream) |
: socket_(socket.Pass()), |
send_stream_(send_stream.Pass()), |
- receive_stream_(receive_stream.Pass()) { |
+ receive_stream_(receive_stream.Pass()), |
+ weak_ptr_factory_(this) { |
+ // Queue up async communication. |
+ ReceiveMore(); |
+ SendMore(); |
} |
TCPConnectedSocketImpl::~TCPConnectedSocketImpl() { |
} |
+void TCPConnectedSocketImpl::ReceiveMore() { |
+ DCHECK(!pending_receive_.get()); |
+ |
+ uint32_t num_bytes; |
+ MojoResult result = NetToMojoPendingBuffer::BeginWrite( |
+ &receive_stream_, &pending_receive_, &num_bytes); |
+ if (result == MOJO_RESULT_SHOULD_WAIT) { |
+ // The pipe is full. We need to wait for it to have more space. |
+ receive_handle_watcher_.Start( |
+ receive_stream_.get(), |
+ MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, |
+ base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ return; |
+ } else if (result != MOJO_RESULT_OK) { |
+ // The receive stream is in a bad state. |
+ // TODO(darin): How should this be communicated to our client? |
+ socket_->Close(); |
+ return; |
+ } |
+ |
+ // Mojo is ready for the receive. |
+ CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); |
+ scoped_refptr<net::IOBuffer> buf( |
+ new NetToMojoIOBuffer(pending_receive_.get())); |
+ int read_result = socket_->Read( |
+ buf.get(), static_cast<int>(num_bytes), |
+ base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), |
+ false)); |
+ if (read_result == net::ERR_IO_PENDING) { |
+ // Pending I/O, wait for result in DidReceive(). |
+ } else if (read_result >= 0) { |
mmenke
2014/10/17 16:49:56
Early return is generally preferred over using "el
|
+ // Synchronous data ready. |
+ DidReceive(true, read_result); |
mmenke
2014/10/17 16:49:55
Can't we call this on the read_result < 0 case, to
|
+ } else { |
+ // Some kind of error. |
+ // TODO(brettw) notify caller of error. |
mmenke
2014/10/17 16:49:56
Is there a reason not to have "pending_receive_ =
|
+ socket_->Close(); |
+ } |
+} |
+ |
+void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { |
+ // TODO(darin): Handle a bad |result| value. |
+ ReceiveMore(); |
+} |
+ |
+void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, |
+ int result) { |
+ if (result < 0) { |
mmenke
2014/10/17 16:49:56
A 0-byte read means the socket was closed.
|
+ // Error. |
+ pending_receive_ = NULL; // Closes the pipe (owned by the pending write). |
+ // TODO(brettw) notify the caller of an error? |
+ socket_->Close(); |
+ return; |
+ } |
+ |
+ receive_stream_ = pending_receive_->Complete(result); |
+ pending_receive_ = NULL; |
+ |
+ // Schedule more reading. |
+ if (completed_synchronously) { |
mmenke
2014/10/17 16:49:56
optional: Just a matter of personal preference, b
|
+ // Don't recursively call ReceiveMore if this is a sync read. |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&TCPConnectedSocketImpl::ReceiveMore, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ } else { |
+ ReceiveMore(); |
+ } |
+} |
+ |
+void TCPConnectedSocketImpl::SendMore() { |
+ uint32_t num_bytes = 0; |
+ MojoResult result = MojoToNetPendingBuffer::BeginRead( |
+ &send_stream_, &pending_send_, &num_bytes); |
+ if (result == MOJO_RESULT_SHOULD_WAIT) { |
+ // Data not ready, wait for it. |
+ send_handle_watcher_.Start( |
+ send_stream_.get(), |
+ MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, |
+ base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ return; |
+ } else if (result != MOJO_RESULT_OK) { |
+ // TODO(brettw) notify caller of error. |
+ socket_->Close(); |
+ return; |
+ } |
+ |
+ // Got a buffer from Mojo, give it to the socket. Note that the sockets may |
+ // do partial writes. |
+ scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); |
+ int write_result = socket_->Write( |
+ buf.get(), static_cast<int>(num_bytes), |
+ base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), |
+ false)); |
+ if (write_result == net::ERR_IO_PENDING) { |
+ // Pending I/O, wait for result in DidSend(). |
mmenke
2014/10/17 16:49:56
Again, early return is generally preferred.
|
+ } else if (write_result >= 0) { |
+ // Synchronous data consumed. |
+ DidSend(true, write_result); |
mmenke
2014/10/17 16:49:56
Should handle error case here.
|
+ } |
+} |
+ |
+void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { |
+ // TODO(brettw): Handle a bad |result| value. |
+ SendMore(); |
+} |
+ |
+void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, |
+ int result) { |
+ if (result < 0) { |
+ // TODO(brettw) report error. |
+ pending_send_ = NULL; |
+ socket_->Close(); |
+ return; |
+ } |
+ |
+ // Take back ownership of the stream and free the IOBuffer. |
+ send_stream_ = pending_send_->Complete(result); |
+ pending_send_ = NULL; |
+ |
+ // Schedule more writing. |
+ if (completed_synchronously) { |
+ // Don't recursively call SendMore if this is a sync read. |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&TCPConnectedSocketImpl::SendMore, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ } else { |
+ SendMore(); |
+ } |
+} |
+ |
} // namespace mojo |