Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Unified Diff: mojo/services/network/tcp_connected_socket_impl.cc

Issue 657113002: Add basic TCP socket mojo implementation (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/services/network/tcp_connected_socket_impl.cc
diff --git a/mojo/services/network/tcp_connected_socket_impl.cc b/mojo/services/network/tcp_connected_socket_impl.cc
index 0c6f2ebd4874684fccdc263b363f6d8479beb105..ecd174c6e5bdbe2d71686549e96859f97d5ebfdd 100644
--- a/mojo/services/network/tcp_connected_socket_impl.cc
+++ b/mojo/services/network/tcp_connected_socket_impl.cc
@@ -4,6 +4,10 @@
#include "mojo/services/network/tcp_connected_socket_impl.h"
+#include "base/message_loop/message_loop.h"
+#include "mojo/services/network/net_adapters.h"
+#include "net/base/net_errors.h"
+
namespace mojo {
TCPConnectedSocketImpl::TCPConnectedSocketImpl(
@@ -12,10 +16,148 @@ TCPConnectedSocketImpl::TCPConnectedSocketImpl(
ScopedDataPipeProducerHandle receive_stream)
: socket_(socket.Pass()),
send_stream_(send_stream.Pass()),
- receive_stream_(receive_stream.Pass()) {
+ receive_stream_(receive_stream.Pass()),
+ weak_ptr_factory_(this) {
+ // Queue up async communication.
+ ReceiveMore();
+ SendMore();
}
TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
}
+void TCPConnectedSocketImpl::ReceiveMore() {
+ DCHECK(!pending_receive_.get());
+
+ uint32_t num_bytes;
+ MojoResult result = NetToMojoPendingBuffer::BeginWrite(
+ &receive_stream_, &pending_receive_, &num_bytes);
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ // The pipe is full. We need to wait for it to have more space.
+ receive_handle_watcher_.Start(
+ receive_stream_.get(),
+ MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE,
+ base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
+ weak_ptr_factory_.GetWeakPtr()));
+ return;
+ } else if (result != MOJO_RESULT_OK) {
+ // The receive stream is in a bad state.
+ // TODO(darin): How should this be communicated to our client?
+ socket_->Close();
+ return;
+ }
+
+ // Mojo is ready for the receive.
+ CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
+ scoped_refptr<net::IOBuffer> buf(
+ new NetToMojoIOBuffer(pending_receive_.get()));
+ int read_result = socket_->Read(
+ buf.get(), static_cast<int>(num_bytes),
+ base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this),
+ false));
+ if (read_result == net::ERR_IO_PENDING) {
+ // Pending I/O, wait for result in DidReceive().
+ } else if (read_result >= 0) {
mmenke 2014/10/17 16:49:56 Early return is generally preferred over using "el
+ // Synchronous data ready.
+ DidReceive(true, read_result);
mmenke 2014/10/17 16:49:55 Can't we call this on the read_result < 0 case, to
+ } else {
+ // Some kind of error.
+ // TODO(brettw) notify caller of error.
mmenke 2014/10/17 16:49:56 Is there a reason not to have "pending_receive_ =
+ socket_->Close();
+ }
+}
+
+void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
+ // TODO(darin): Handle a bad |result| value.
+ ReceiveMore();
+}
+
+void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
+ int result) {
+ if (result < 0) {
mmenke 2014/10/17 16:49:56 A 0-byte read means the socket was closed.
+ // Error.
+ pending_receive_ = NULL; // Closes the pipe (owned by the pending write).
+ // TODO(brettw) notify the caller of an error?
+ socket_->Close();
+ return;
+ }
+
+ receive_stream_ = pending_receive_->Complete(result);
+ pending_receive_ = NULL;
+
+ // Schedule more reading.
+ if (completed_synchronously) {
mmenke 2014/10/17 16:49:56 optional: Just a matter of personal preference, b
+ // Don't recursively call ReceiveMore if this is a sync read.
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
+ weak_ptr_factory_.GetWeakPtr()));
+ } else {
+ ReceiveMore();
+ }
+}
+
+void TCPConnectedSocketImpl::SendMore() {
+ uint32_t num_bytes = 0;
+ MojoResult result = MojoToNetPendingBuffer::BeginRead(
+ &send_stream_, &pending_send_, &num_bytes);
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ // Data not ready, wait for it.
+ send_handle_watcher_.Start(
+ send_stream_.get(),
+ MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE,
+ base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
+ weak_ptr_factory_.GetWeakPtr()));
+ return;
+ } else if (result != MOJO_RESULT_OK) {
+ // TODO(brettw) notify caller of error.
+ socket_->Close();
+ return;
+ }
+
+ // Got a buffer from Mojo, give it to the socket. Note that the sockets may
+ // do partial writes.
+ scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
+ int write_result = socket_->Write(
+ buf.get(), static_cast<int>(num_bytes),
+ base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this),
+ false));
+ if (write_result == net::ERR_IO_PENDING) {
+ // Pending I/O, wait for result in DidSend().
mmenke 2014/10/17 16:49:56 Again, early return is generally preferred.
+ } else if (write_result >= 0) {
+ // Synchronous data consumed.
+ DidSend(true, write_result);
mmenke 2014/10/17 16:49:56 Should handle error case here.
+ }
+}
+
+void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
+ // TODO(brettw): Handle a bad |result| value.
+ SendMore();
+}
+
+void TCPConnectedSocketImpl::DidSend(bool completed_synchronously,
+ int result) {
+ if (result < 0) {
+ // TODO(brettw) report error.
+ pending_send_ = NULL;
+ socket_->Close();
+ return;
+ }
+
+ // Take back ownership of the stream and free the IOBuffer.
+ send_stream_ = pending_send_->Complete(result);
+ pending_send_ = NULL;
+
+ // Schedule more writing.
+ if (completed_synchronously) {
+ // Don't recursively call SendMore if this is a sync read.
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&TCPConnectedSocketImpl::SendMore,
+ weak_ptr_factory_.GetWeakPtr()));
+ } else {
+ SendMore();
+ }
+}
+
} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698