Chromium Code Reviews| Index: mojo/services/network/tcp_connected_socket_impl.cc |
| diff --git a/mojo/services/network/tcp_connected_socket_impl.cc b/mojo/services/network/tcp_connected_socket_impl.cc |
| index 0c6f2ebd4874684fccdc263b363f6d8479beb105..d4242a3786ff6630106429ff99675a444ef46730 100644 |
| --- a/mojo/services/network/tcp_connected_socket_impl.cc |
| +++ b/mojo/services/network/tcp_connected_socket_impl.cc |
| @@ -4,6 +4,10 @@ |
| #include "mojo/services/network/tcp_connected_socket_impl.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "mojo/services/network/net_adapters.h" |
| +#include "net/base/net_errors.h" |
| + |
| namespace mojo { |
| TCPConnectedSocketImpl::TCPConnectedSocketImpl( |
| @@ -12,10 +16,141 @@ TCPConnectedSocketImpl::TCPConnectedSocketImpl( |
| ScopedDataPipeProducerHandle receive_stream) |
| : socket_(socket.Pass()), |
| send_stream_(send_stream.Pass()), |
| - receive_stream_(receive_stream.Pass()) { |
| + receive_stream_(receive_stream.Pass()), |
| + weak_ptr_factory_(this) { |
| + // Queue up async communication. |
| + ReceiveMore(); |
| + SendMore(); |
| } |
| TCPConnectedSocketImpl::~TCPConnectedSocketImpl() { |
| } |
| +void TCPConnectedSocketImpl::ReceiveMore() { |
| + DCHECK(!pending_receive_.get()); |
| + |
| + uint32_t num_bytes; |
| + MojoResult result = NetToMojoPendingBuffer::BeginWrite( |
| + &receive_stream_, &pending_receive_, &num_bytes); |
| + if (result == MOJO_RESULT_SHOULD_WAIT) { |
| + // The pipe is full. We need to wait for it to have more space. |
| + receive_handle_watcher_.Start( |
| + receive_stream_.get(), |
| + MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, |
| + base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + return; |
| + } else if (result != MOJO_RESULT_OK) { |
| + // The receive stream is in a bad state. |
| + // TODO(darin): How should this be communicated to our client? |
| + return; |
| + } |
| + |
| + // Mojo is ready for the receive. |
| + CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); |
| + scoped_refptr<net::IOBuffer> buf( |
| + new NetToMojoIOBuffer(pending_receive_.get())); |
| + int read_result = socket_->Read( |
| + buf.get(), static_cast<int>(num_bytes), |
| + base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), |
| + false)); |
| + if (read_result == net::ERR_IO_PENDING) { |
| + // Pending I/O, wait for result in DidReceive(). |
| + } else if (read_result >= 0) { |
| + // Synchronous data ready. |
| + DidReceive(true, read_result); |
| + } else { |
| + // Some kind of error. |
| + } |
| +} |
| + |
| +void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { |
| + // TODO(darin): Handle a bad |result| value. |
| + ReceiveMore(); |
| +} |
| + |
| +void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, |
| + int result) { |
| + if (result < 0) { |
| + // Error. |
| + pending_receive_ = NULL; // Closes the pipe (owned by the pending write). |
| + // TODO(brettw) notify the caller of an error? |
| + return; |
| + } |
| + |
| + receive_stream_ = pending_receive_->Complete(result); |
| + pending_receive_ = NULL; |
| + |
| + // Schedule more reading. |
| + if (completed_synchronously) { |
| + // Don't recursively call ReceiveMore if this is a sync read. |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&TCPConnectedSocketImpl::ReceiveMore, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + } else { |
| + ReceiveMore(); |
| + } |
| +} |
| + |
| +void TCPConnectedSocketImpl::SendMore() { |
| + uint32_t num_bytes = 0; |
| + MojoResult result = MojoToNetPendingBuffer::BeginRead( |
| + &send_stream_, &pending_send_, &num_bytes); |
| + if (result == MOJO_RESULT_SHOULD_WAIT) { |
| + // Data not ready, wait for it. |
| + send_handle_watcher_.Start( |
| + send_stream_.get(), |
| + MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, |
| + base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + return; |
| + } else if (result != MOJO_RESULT_OK) { |
| + // TODO(brettw) notify caller of error. |
| + return; |
| + } |
| + |
| + // Got a buffer from Mojo, give it to the socket. Note that the sockets may |
| + // do partial writes. |
| + scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); |
| + int write_result = socket_->Write( |
| + buf.get(), static_cast<int>(num_bytes), |
| + base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), |
| + false)); |
| + if (write_result == net::ERR_IO_PENDING) { |
| + // Pending I/O, wait for result in DidSend(). |
| + } else if (write_result >= 0) { |
| + // Synchronous data consumed. |
| + DidSend(true, write_result); |
| + } |
| +} |
| + |
| +void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { |
| + // TODO(brettw): Handle a bad |result| value. |
| + SendMore(); |
| +} |
| + |
| +void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, |
| + int result) { |
| + if (result < 0) { |
| + // TODO(brettw) report error. |
|
yzshen1
2014/10/15 21:07:44
Do we want to close the pipe in this case? We don'
|
| + return; |
| + } |
| + |
| + // Take back ownership of the stream and free the IOBuffer. |
| + send_stream_ = pending_send_->Complete(result); |
| + pending_send_ = NULL; |
| + |
| + // Schedule more writing. |
| + if (completed_synchronously) { |
| + // Don't recursively call SendMore if this is a sync read. |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&TCPConnectedSocketImpl::SendMore, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + } else { |
| + SendMore(); |
| + } |
| +} |
| + |
| } // namespace mojo |