| OLD | NEW | 
|    1 // Copyright 2014 The Chromium Authors. All rights reserved. |    1 // Copyright 2014 The Chromium Authors. All rights reserved. | 
|    2 // Use of this source code is governed by a BSD-style license that can be |    2 // Use of this source code is governed by a BSD-style license that can be | 
|    3 // found in the LICENSE file. |    3 // found in the LICENSE file. | 
|    4  |    4  | 
|    5 #include "mojo/services/network/tcp_connected_socket_impl.h" |    5 #include "mojo/services/network/tcp_connected_socket_impl.h" | 
|    6  |    6  | 
|    7 #include "base/message_loop/message_loop.h" |    7 #include "base/message_loop/message_loop.h" | 
|    8 #include "mojo/services/network/net_adapters.h" |    8 #include "mojo/services/network/net_adapters.h" | 
|    9 #include "net/base/net_errors.h" |    9 #include "net/base/net_errors.h" | 
|   10  |   10  | 
| (...skipping 22 matching lines...) Expand all  Loading... | 
|   33   MojoResult result = NetToMojoPendingBuffer::BeginWrite( |   33   MojoResult result = NetToMojoPendingBuffer::BeginWrite( | 
|   34       &receive_stream_, &pending_receive_, &num_bytes); |   34       &receive_stream_, &pending_receive_, &num_bytes); | 
|   35   if (result == MOJO_RESULT_SHOULD_WAIT) { |   35   if (result == MOJO_RESULT_SHOULD_WAIT) { | 
|   36     // The pipe is full. We need to wait for it to have more space. |   36     // The pipe is full. We need to wait for it to have more space. | 
|   37     receive_handle_watcher_.Start( |   37     receive_handle_watcher_.Start( | 
|   38         receive_stream_.get(), |   38         receive_stream_.get(), | 
|   39         MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, |   39         MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, | 
|   40         base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, |   40         base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, | 
|   41                    weak_ptr_factory_.GetWeakPtr())); |   41                    weak_ptr_factory_.GetWeakPtr())); | 
|   42     return; |   42     return; | 
|   43   } else if (result != MOJO_RESULT_OK) { |   43   } | 
|   44     // The receive stream is in a bad state. |   44  | 
|   45     // TODO(darin): How should this be communicated to our client? |   45   if (result == MOJO_RESULT_FAILED_PRECONDITION) { | 
|   46     socket_->Close(); |   46     // It's valid that the user of this class consumed the data they care about | 
 |   47     // and closed their data pipe handles after writing data. This class should | 
 |   48     // still write out all the data. | 
|   47     return; |   49     return; | 
|   48   } |   50   } | 
|   49  |   51  | 
 |   52   if (result != MOJO_RESULT_OK) { | 
 |   53     // The receive stream is in a bad state. | 
 |   54     // TODO(darin): How should this be communicated to our client? | 
 |   55     return; | 
 |   56   } | 
 |   57  | 
|   50   // Mojo is ready for the receive. |   58   // Mojo is ready for the receive. | 
|   51   CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); |   59   CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); | 
|   52   scoped_refptr<net::IOBuffer> buf( |   60   scoped_refptr<net::IOBuffer> buf( | 
|   53       new NetToMojoIOBuffer(pending_receive_.get())); |   61       new NetToMojoIOBuffer(pending_receive_.get())); | 
|   54   int read_result = socket_->Read( |   62   int read_result = socket_->Read( | 
|   55       buf.get(), static_cast<int>(num_bytes), |   63       buf.get(), static_cast<int>(num_bytes), | 
|   56       base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), |   64       base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), | 
|   57                  false)); |   65                  false)); | 
|   58   if (read_result == net::ERR_IO_PENDING) { |   66   if (read_result == net::ERR_IO_PENDING) { | 
|   59     // Pending I/O, wait for result in DidReceive(). |   67     // Pending I/O, wait for result in DidReceive(). | 
|   60   } else if (read_result >= 0) { |   68   } else if (read_result >= 0) { | 
|   61     // Synchronous data ready. |   69     // Synchronous data ready. | 
|   62     DidReceive(true, read_result); |   70     DidReceive(true, read_result); | 
|   63   } else { |   71   } else { | 
|   64     // Some kind of error. |   72     // Some kind of error. | 
|   65     // TODO(brettw) notify caller of error. |   73     // TODO(brettw) notify caller of error. | 
|   66     socket_->Close(); |  | 
|   67   } |   74   } | 
|   68 } |   75 } | 
|   69  |   76  | 
|   70 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { |   77 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { | 
|   71   // TODO(darin): Handle a bad |result| value. |   78   // TODO(darin): Handle a bad |result| value. | 
|   72   ReceiveMore(); |   79   ReceiveMore(); | 
|   73 } |   80 } | 
|   74  |   81  | 
|   75 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, |   82 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, | 
|   76                                         int result) { |   83                                         int result) { | 
|   77   if (result < 0) { |   84   if (result < 0) { | 
|   78     // Error. |   85     // Error. | 
|   79     pending_receive_ = NULL;  // Closes the pipe (owned by the pending write). |   86     pending_receive_ = NULL;  // Closes the pipe (owned by the pending write). | 
|   80     // TODO(brettw) notify the caller of an error? |   87     // TODO(brettw) notify the caller of an error? | 
|   81     socket_->Close(); |  | 
|   82     return; |   88     return; | 
|   83   } |   89   } | 
|   84  |   90  | 
|   85   receive_stream_ = pending_receive_->Complete(result); |   91   receive_stream_ = pending_receive_->Complete(result); | 
|   86   pending_receive_ = NULL; |   92   pending_receive_ = NULL; | 
|   87  |   93  | 
|   88   // Schedule more reading. |   94   // Schedule more reading. | 
|   89   if (completed_synchronously) { |   95   if (completed_synchronously) { | 
|   90     // Don't recursively call ReceiveMore if this is a sync read. |   96     // Don't recursively call ReceiveMore if this is a sync read. | 
|   91     base::MessageLoop::current()->PostTask( |   97     base::MessageLoop::current()->PostTask( | 
| (...skipping 12 matching lines...) Expand all  Loading... | 
|  104   if (result == MOJO_RESULT_SHOULD_WAIT) { |  110   if (result == MOJO_RESULT_SHOULD_WAIT) { | 
|  105     // Data not ready, wait for it. |  111     // Data not ready, wait for it. | 
|  106     send_handle_watcher_.Start( |  112     send_handle_watcher_.Start( | 
|  107         send_stream_.get(), |  113         send_stream_.get(), | 
|  108         MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, |  114         MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, | 
|  109         base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, |  115         base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, | 
|  110                    weak_ptr_factory_.GetWeakPtr())); |  116                    weak_ptr_factory_.GetWeakPtr())); | 
|  111     return; |  117     return; | 
|  112   } else if (result != MOJO_RESULT_OK) { |  118   } else if (result != MOJO_RESULT_OK) { | 
|  113     // TODO(brettw) notify caller of error. |  119     // TODO(brettw) notify caller of error. | 
|  114     socket_->Close(); |  | 
|  115     return; |  120     return; | 
|  116   } |  121   } | 
|  117  |  122  | 
|  118   // Got a buffer from Mojo, give it to the socket. Note that the sockets may |  123   // Got a buffer from Mojo, give it to the socket. Note that the sockets may | 
|  119   // do partial writes. |  124   // do partial writes. | 
|  120   scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); |  125   scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); | 
|  121   int write_result = socket_->Write( |  126   int write_result = socket_->Write( | 
|  122       buf.get(), static_cast<int>(num_bytes), |  127       buf.get(), static_cast<int>(num_bytes), | 
|  123       base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), |  128       base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), | 
|  124                  false)); |  129                  false)); | 
|  125   if (write_result == net::ERR_IO_PENDING) { |  130   if (write_result == net::ERR_IO_PENDING) { | 
|  126     // Pending I/O, wait for result in DidSend(). |  131     // Pending I/O, wait for result in DidSend(). | 
|  127   } else if (write_result >= 0) { |  132   } else if (write_result >= 0) { | 
|  128     // Synchronous data consumed. |  133     // Synchronous data consumed. | 
|  129     DidSend(true, write_result); |  134     DidSend(true, write_result); | 
|  130   } |  135   } | 
|  131 } |  136 } | 
|  132  |  137  | 
|  133 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { |  138 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { | 
|  134   // TODO(brettw): Handle a bad |result| value. |  139   // TODO(brettw): Handle a bad |result| value. | 
|  135   SendMore(); |  140   SendMore(); | 
|  136 } |  141 } | 
|  137  |  142  | 
|  138 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, |  143 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, | 
|  139                                      int result) { |  144                                      int result) { | 
|  140   if (result < 0) { |  145   if (result < 0) { | 
|  141     // TODO(brettw) report error. |  146     // TODO(brettw) report error. | 
|  142     pending_send_ = NULL; |  147     pending_send_ = NULL; | 
|  143     socket_->Close(); |  | 
|  144     return; |  148     return; | 
|  145   } |  149   } | 
|  146  |  150  | 
|  147   // Take back ownership of the stream and free the IOBuffer. |  151   // Take back ownership of the stream and free the IOBuffer. | 
|  148   send_stream_ = pending_send_->Complete(result); |  152   send_stream_ = pending_send_->Complete(result); | 
|  149   pending_send_ = NULL; |  153   pending_send_ = NULL; | 
|  150  |  154  | 
|  151   // Schedule more writing. |  155   // Schedule more writing. | 
|  152   if (completed_synchronously) { |  156   if (completed_synchronously) { | 
|  153     // Don't recursively call SendMore if this is a sync read. |  157     // Don't recursively call SendMore if this is a sync read. | 
|  154     base::MessageLoop::current()->PostTask( |  158     base::MessageLoop::current()->PostTask( | 
|  155         FROM_HERE, |  159         FROM_HERE, | 
|  156         base::Bind(&TCPConnectedSocketImpl::SendMore, |  160         base::Bind(&TCPConnectedSocketImpl::SendMore, | 
|  157                    weak_ptr_factory_.GetWeakPtr())); |  161                    weak_ptr_factory_.GetWeakPtr())); | 
|  158   } else { |  162   } else { | 
|  159     SendMore(); |  163     SendMore(); | 
|  160   } |  164   } | 
|  161 } |  165 } | 
|  162  |  166  | 
|  163 }  // namespace mojo |  167 }  // namespace mojo | 
| OLD | NEW |