| Index: mojo/services/network/tcp_connected_socket_impl.cc
|
| diff --git a/mojo/services/network/tcp_connected_socket_impl.cc b/mojo/services/network/tcp_connected_socket_impl.cc
|
| index 0c6f2ebd4874684fccdc263b363f6d8479beb105..ecd174c6e5bdbe2d71686549e96859f97d5ebfdd 100644
|
| --- a/mojo/services/network/tcp_connected_socket_impl.cc
|
| +++ b/mojo/services/network/tcp_connected_socket_impl.cc
|
| @@ -4,6 +4,10 @@
|
|
|
| #include "mojo/services/network/tcp_connected_socket_impl.h"
|
|
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "mojo/services/network/net_adapters.h"
|
| +#include "net/base/net_errors.h"
|
| +
|
| namespace mojo {
|
|
|
| TCPConnectedSocketImpl::TCPConnectedSocketImpl(
|
| @@ -12,10 +16,148 @@ TCPConnectedSocketImpl::TCPConnectedSocketImpl(
|
| ScopedDataPipeProducerHandle receive_stream)
|
| : socket_(socket.Pass()),
|
| send_stream_(send_stream.Pass()),
|
| - receive_stream_(receive_stream.Pass()) {
|
| + receive_stream_(receive_stream.Pass()),
|
| + weak_ptr_factory_(this) {
|
| + // Queue up async communication.
|
| + ReceiveMore();
|
| + SendMore();
|
| }
|
|
|
| TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
|
| }
|
|
|
| +void TCPConnectedSocketImpl::ReceiveMore() {
|
| + DCHECK(!pending_receive_.get());
|
| +
|
| + uint32_t num_bytes;
|
| + MojoResult result = NetToMojoPendingBuffer::BeginWrite(
|
| + &receive_stream_, &pending_receive_, &num_bytes);
|
| + if (result == MOJO_RESULT_SHOULD_WAIT) {
|
| + // The pipe is full. We need to wait for it to have more space.
|
| + receive_handle_watcher_.Start(
|
| + receive_stream_.get(),
|
| + MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE,
|
| + base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + return;
|
| + } else if (result != MOJO_RESULT_OK) {
|
| + // The receive stream is in a bad state.
|
| + // TODO(darin): How should this be communicated to our client?
|
| + socket_->Close();
|
| + return;
|
| + }
|
| +
|
| + // Mojo is ready for the receive.
|
| + CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
|
| + scoped_refptr<net::IOBuffer> buf(
|
| + new NetToMojoIOBuffer(pending_receive_.get()));
|
| + int read_result = socket_->Read(
|
| + buf.get(), static_cast<int>(num_bytes),
|
| + base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this),
|
| + false));
|
| + if (read_result == net::ERR_IO_PENDING) {
|
| + // Pending I/O, wait for result in DidReceive().
|
| + } else if (read_result >= 0) {
|
| + // Synchronous data ready.
|
| + DidReceive(true, read_result);
|
| + } else {
|
| + // Some kind of error.
|
| + // TODO(brettw) notify caller of error.
|
| + socket_->Close();
|
| + }
|
| +}
|
| +
|
| +void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
|
| + // TODO(darin): Handle a bad |result| value.
|
| + ReceiveMore();
|
| +}
|
| +
|
| +void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
|
| + int result) {
|
| + if (result < 0) {
|
| + // Error.
|
| + pending_receive_ = NULL; // Closes the pipe (owned by the pending write).
|
| + // TODO(brettw) notify the caller of an error?
|
| + socket_->Close();
|
| + return;
|
| + }
|
| +
|
| + receive_stream_ = pending_receive_->Complete(result);
|
| + pending_receive_ = NULL;
|
| +
|
| + // Schedule more reading.
|
| + if (completed_synchronously) {
|
| + // Don't recursively call ReceiveMore if this is a sync read.
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + } else {
|
| + ReceiveMore();
|
| + }
|
| +}
|
| +
|
| +void TCPConnectedSocketImpl::SendMore() {
|
| + uint32_t num_bytes = 0;
|
| + MojoResult result = MojoToNetPendingBuffer::BeginRead(
|
| + &send_stream_, &pending_send_, &num_bytes);
|
| + if (result == MOJO_RESULT_SHOULD_WAIT) {
|
| + // Data not ready, wait for it.
|
| + send_handle_watcher_.Start(
|
| + send_stream_.get(),
|
| + MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE,
|
| + base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + return;
|
| + } else if (result != MOJO_RESULT_OK) {
|
| + // TODO(brettw) notify caller of error.
|
| + socket_->Close();
|
| + return;
|
| + }
|
| +
|
| + // Got a buffer from Mojo, give it to the socket. Note that the sockets may
|
| + // do partial writes.
|
| + scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
|
| + int write_result = socket_->Write(
|
| + buf.get(), static_cast<int>(num_bytes),
|
| + base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this),
|
| + false));
|
| + if (write_result == net::ERR_IO_PENDING) {
|
| + // Pending I/O, wait for result in DidSend().
|
| + } else if (write_result >= 0) {
|
| + // Synchronous data consumed.
|
| + DidSend(true, write_result);
|
| + }
|
| +}
|
| +
|
| +void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
|
| + // TODO(brettw): Handle a bad |result| value.
|
| + SendMore();
|
| +}
|
| +
|
| +void TCPConnectedSocketImpl::DidSend(bool completed_synchronously,
|
| + int result) {
|
| + if (result < 0) {
|
| + // TODO(brettw) report error.
|
| + pending_send_ = NULL;
|
| + socket_->Close();
|
| + return;
|
| + }
|
| +
|
| + // Take back ownership of the stream and free the IOBuffer.
|
| + send_stream_ = pending_send_->Complete(result);
|
| + pending_send_ = NULL;
|
| +
|
| + // Schedule more writing.
|
| + if (completed_synchronously) {
|
| + // Don't recursively call SendMore if this is a sync read.
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&TCPConnectedSocketImpl::SendMore,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + } else {
|
| + SendMore();
|
| + }
|
| +}
|
| +
|
| } // namespace mojo
|
|
|