| OLD | NEW | 
|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 #include "mojo/services/network/tcp_connected_socket_impl.h" | 5 #include "mojo/services/network/tcp_connected_socket_impl.h" | 
| 6 | 6 | 
| 7 #include "base/message_loop/message_loop.h" | 7 #include "base/message_loop/message_loop.h" | 
| 8 #include "mojo/services/network/net_adapters.h" | 8 #include "mojo/services/network/net_adapters.h" | 
| 9 #include "net/base/net_errors.h" | 9 #include "net/base/net_errors.h" | 
| 10 | 10 | 
| (...skipping 18 matching lines...) Expand all  Loading... | 
| 29 void TCPConnectedSocketImpl::ReceiveMore() { | 29 void TCPConnectedSocketImpl::ReceiveMore() { | 
| 30   DCHECK(!pending_receive_.get()); | 30   DCHECK(!pending_receive_.get()); | 
| 31 | 31 | 
| 32   uint32_t num_bytes; | 32   uint32_t num_bytes; | 
| 33   MojoResult result = NetToMojoPendingBuffer::BeginWrite( | 33   MojoResult result = NetToMojoPendingBuffer::BeginWrite( | 
| 34       &receive_stream_, &pending_receive_, &num_bytes); | 34       &receive_stream_, &pending_receive_, &num_bytes); | 
| 35   if (result == MOJO_RESULT_SHOULD_WAIT) { | 35   if (result == MOJO_RESULT_SHOULD_WAIT) { | 
| 36     // The pipe is full. We need to wait for it to have more space. | 36     // The pipe is full. We need to wait for it to have more space. | 
| 37     receive_handle_watcher_.Start( | 37     receive_handle_watcher_.Start( | 
| 38         receive_stream_.get(), | 38         receive_stream_.get(), | 
| 39         MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, | 39         MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, | 
|  | 40         MOJO_DEADLINE_INDEFINITE, | 
| 40         base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, | 41         base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, | 
| 41                    weak_ptr_factory_.GetWeakPtr())); | 42                    weak_ptr_factory_.GetWeakPtr())); | 
| 42     return; | 43     return; | 
| 43   } | 44   } | 
| 44 | 45 | 
| 45   if (result == MOJO_RESULT_FAILED_PRECONDITION) { | 46   if (result == MOJO_RESULT_FAILED_PRECONDITION) { | 
| 46     // It's valid that the user of this class consumed the data they care about | 47     // It's valid that the user of this class consumed the data they care about | 
| 47     // and closed their data pipe handles after writing data. This class should | 48     // and closed their data pipe handles after writing data. This class should | 
| 48     // still write out all the data. | 49     // still write out all the data. | 
|  | 50     ShutdownReceive(); | 
|  | 51     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 52     // net_result and mojo_result. | 
| 49     return; | 53     return; | 
| 50   } | 54   } | 
| 51 | 55 | 
| 52   if (result != MOJO_RESULT_OK) { | 56   if (result != MOJO_RESULT_OK) { | 
| 53     // The receive stream is in a bad state. | 57     // The receive stream is in a bad state. | 
| 54     // TODO(darin): How should this be communicated to our client? | 58     ShutdownReceive(); | 
|  | 59     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 60     // net_result and mojo_result. | 
| 55     return; | 61     return; | 
| 56   } | 62   } | 
| 57 | 63 | 
| 58   // Mojo is ready for the receive. | 64   // Mojo is ready for the receive. | 
| 59   CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); | 65   CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); | 
| 60   scoped_refptr<net::IOBuffer> buf( | 66   scoped_refptr<net::IOBuffer> buf( | 
| 61       new NetToMojoIOBuffer(pending_receive_.get())); | 67       new NetToMojoIOBuffer(pending_receive_.get())); | 
| 62   int read_result = socket_->Read( | 68   int read_result = socket_->Read( | 
| 63       buf.get(), static_cast<int>(num_bytes), | 69       buf.get(), static_cast<int>(num_bytes), | 
| 64       base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), | 70       base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), | 
| 65                  false)); | 71                  false)); | 
| 66   if (read_result == net::ERR_IO_PENDING) { | 72   if (read_result == net::ERR_IO_PENDING) { | 
| 67     // Pending I/O, wait for result in DidReceive(). | 73     // Pending I/O, wait for result in DidReceive(). | 
| 68   } else if (read_result >= 0) { | 74   } else if (read_result > 0) { | 
| 69     // Synchronous data ready. | 75     // Synchronous data ready. | 
| 70     DidReceive(true, read_result); | 76     DidReceive(true, read_result); | 
| 71   } else { | 77   } else { | 
| 72     // Some kind of error. | 78     // read_result == 0 indicates EOF. | 
| 73     // TODO(brettw) notify caller of error. | 79     // read_result < 0 indicates error. | 
|  | 80     ShutdownReceive(); | 
|  | 81     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 82     // net_result and mojo_result. | 
| 74   } | 83   } | 
| 75 } | 84 } | 
| 76 | 85 | 
| 77 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { | 86 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { | 
| 78   // TODO(darin): Handle a bad |result| value. | 87   if (result != MOJO_RESULT_OK) { | 
|  | 88     ShutdownReceive(); | 
|  | 89     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 90     // net_result and mojo_result. | 
|  | 91     return; | 
|  | 92   } | 
| 79   ReceiveMore(); | 93   ReceiveMore(); | 
| 80 } | 94 } | 
| 81 | 95 | 
| 82 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, | 96 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, | 
| 83                                         int result) { | 97                                         int result) { | 
| 84   if (result < 0) { | 98   if (result < 0) { | 
| 85     // Error. | 99     // Error. | 
| 86     pending_receive_ = NULL;  // Closes the pipe (owned by the pending write). | 100     ShutdownReceive(); | 
| 87     // TODO(brettw) notify the caller of an error? | 101     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 102     // net_result and mojo_result. | 
| 88     return; | 103     return; | 
| 89   } | 104   } | 
| 90 | 105 | 
| 91   receive_stream_ = pending_receive_->Complete(result); | 106   receive_stream_ = pending_receive_->Complete(result); | 
| 92   pending_receive_ = NULL; | 107   pending_receive_ = nullptr; | 
| 93 | 108 | 
| 94   // Schedule more reading. | 109   // Schedule more reading. | 
| 95   if (completed_synchronously) { | 110   if (completed_synchronously) { | 
| 96     // Don't recursively call ReceiveMore if this is a sync read. | 111     // Don't recursively call ReceiveMore if this is a sync read. | 
| 97     base::MessageLoop::current()->PostTask( | 112     base::MessageLoop::current()->PostTask( | 
| 98         FROM_HERE, | 113         FROM_HERE, | 
| 99         base::Bind(&TCPConnectedSocketImpl::ReceiveMore, | 114         base::Bind(&TCPConnectedSocketImpl::ReceiveMore, | 
| 100                    weak_ptr_factory_.GetWeakPtr())); | 115                    weak_ptr_factory_.GetWeakPtr())); | 
| 101   } else { | 116   } else { | 
| 102     ReceiveMore(); | 117     ReceiveMore(); | 
| 103   } | 118   } | 
| 104 } | 119 } | 
| 105 | 120 | 
|  | 121 void TCPConnectedSocketImpl::ShutdownReceive() { | 
|  | 122   pending_receive_ = nullptr; | 
|  | 123   receive_stream_.reset(); | 
|  | 124 } | 
|  | 125 | 
| 106 void TCPConnectedSocketImpl::SendMore() { | 126 void TCPConnectedSocketImpl::SendMore() { | 
| 107   uint32_t num_bytes = 0; | 127   uint32_t num_bytes = 0; | 
| 108   MojoResult result = MojoToNetPendingBuffer::BeginRead( | 128   MojoResult result = MojoToNetPendingBuffer::BeginRead( | 
| 109       &send_stream_, &pending_send_, &num_bytes); | 129       &send_stream_, &pending_send_, &num_bytes); | 
| 110   if (result == MOJO_RESULT_SHOULD_WAIT) { | 130   if (result == MOJO_RESULT_SHOULD_WAIT) { | 
| 111     // Data not ready, wait for it. | 131     // Data not ready, wait for it. | 
| 112     send_handle_watcher_.Start( | 132     send_handle_watcher_.Start( | 
| 113         send_stream_.get(), | 133         send_stream_.get(), | 
| 114         MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, | 134         MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, | 
|  | 135         MOJO_DEADLINE_INDEFINITE, | 
| 115         base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, | 136         base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, | 
| 116                    weak_ptr_factory_.GetWeakPtr())); | 137                    weak_ptr_factory_.GetWeakPtr())); | 
| 117     return; | 138     return; | 
| 118   } else if (result != MOJO_RESULT_OK) { | 139   } else if (result != MOJO_RESULT_OK) { | 
| 119     // TODO(brettw) notify caller of error. | 140     ShutdownSend(); | 
|  | 141     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 142     // net_result and mojo_result. | 
| 120     return; | 143     return; | 
| 121   } | 144   } | 
| 122 | 145 | 
| 123   // Got a buffer from Mojo, give it to the socket. Note that the sockets may | 146   // Got a buffer from Mojo, give it to the socket. Note that the sockets may | 
| 124   // do partial writes. | 147   // do partial writes. | 
| 125   scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); | 148   scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); | 
| 126   int write_result = socket_->Write( | 149   int write_result = socket_->Write( | 
| 127       buf.get(), static_cast<int>(num_bytes), | 150       buf.get(), static_cast<int>(num_bytes), | 
| 128       base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), | 151       base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), | 
| 129                  false)); | 152                  false)); | 
| 130   if (write_result == net::ERR_IO_PENDING) { | 153   if (write_result == net::ERR_IO_PENDING) { | 
| 131     // Pending I/O, wait for result in DidSend(). | 154     // Pending I/O, wait for result in DidSend(). | 
| 132   } else if (write_result >= 0) { | 155   } else if (write_result >= 0) { | 
| 133     // Synchronous data consumed. | 156     // Synchronous data consumed. | 
| 134     DidSend(true, write_result); | 157     DidSend(true, write_result); | 
|  | 158   } else { | 
|  | 159     // write_result < 0 indicates error. | 
|  | 160     ShutdownSend(); | 
|  | 161     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 162     // net_result and mojo_result. | 
| 135   } | 163   } | 
| 136 } | 164 } | 
| 137 | 165 | 
| 138 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { | 166 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { | 
| 139   // TODO(brettw): Handle a bad |result| value. | 167   if (result != MOJO_RESULT_OK) { | 
|  | 168     ShutdownSend(); | 
|  | 169     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 170     // net_result and mojo_result. | 
|  | 171     return; | 
|  | 172   } | 
| 140   SendMore(); | 173   SendMore(); | 
| 141 } | 174 } | 
| 142 | 175 | 
| 143 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, | 176 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, | 
| 144                                      int result) { | 177                                      int result) { | 
| 145   if (result < 0) { | 178   if (result < 0) { | 
| 146     // TODO(brettw) report error. | 179     ShutdownSend(); | 
| 147     pending_send_ = NULL; | 180     // TODO(johnmccutchan): Notify socket direction is closed along with | 
|  | 181     // net_result and mojo_result. | 
| 148     return; | 182     return; | 
| 149   } | 183   } | 
| 150 | 184 | 
| 151   // Take back ownership of the stream and free the IOBuffer. | 185   // Take back ownership of the stream and free the IOBuffer. | 
| 152   send_stream_ = pending_send_->Complete(result); | 186   send_stream_ = pending_send_->Complete(result); | 
| 153   pending_send_ = NULL; | 187   pending_send_ = nullptr; | 
| 154 | 188 | 
| 155   // Schedule more writing. | 189   // Schedule more writing. | 
| 156   if (completed_synchronously) { | 190   if (completed_synchronously) { | 
| 157     // Don't recursively call SendMore if this is a sync read. | 191     // Don't recursively call SendMore if this is a sync read. | 
| 158     base::MessageLoop::current()->PostTask( | 192     base::MessageLoop::current()->PostTask( | 
| 159         FROM_HERE, | 193         FROM_HERE, | 
| 160         base::Bind(&TCPConnectedSocketImpl::SendMore, | 194         base::Bind(&TCPConnectedSocketImpl::SendMore, | 
| 161                    weak_ptr_factory_.GetWeakPtr())); | 195                    weak_ptr_factory_.GetWeakPtr())); | 
| 162   } else { | 196   } else { | 
| 163     SendMore(); | 197     SendMore(); | 
| 164   } | 198   } | 
| 165 } | 199 } | 
| 166 | 200 | 
|  | 201 void TCPConnectedSocketImpl::ShutdownSend() { | 
|  | 202   pending_send_ = nullptr; | 
|  | 203   send_stream_.reset(); | 
|  | 204 } | 
|  | 205 | 
| 167 }  // namespace mojo | 206 }  // namespace mojo | 
| OLD | NEW | 
|---|