 Chromium Code Reviews
 Chromium Code Reviews Issue 1015733002:
  Close data pipes when TCP network connection goes down.  (Closed) 
  Base URL: https://chromium.googlesource.com/chromium/src.git@master
    
  
    Issue 1015733002:
  Close data pipes when TCP network connection goes down.  (Closed) 
  Base URL: https://chromium.googlesource.com/chromium/src.git@master| OLD | NEW | 
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 #include "mojo/services/network/tcp_connected_socket_impl.h" | 5 #include "mojo/services/network/tcp_connected_socket_impl.h" | 
| 6 | 6 | 
| 7 #include "base/message_loop/message_loop.h" | 7 #include "base/message_loop/message_loop.h" | 
| 8 #include "mojo/services/network/net_adapters.h" | 8 #include "mojo/services/network/net_adapters.h" | 
| 9 #include "net/base/net_errors.h" | 9 #include "net/base/net_errors.h" | 
| 10 | 10 | 
| (...skipping 18 matching lines...) Expand all Loading... | |
| 29 void TCPConnectedSocketImpl::ReceiveMore() { | 29 void TCPConnectedSocketImpl::ReceiveMore() { | 
| 30 DCHECK(!pending_receive_.get()); | 30 DCHECK(!pending_receive_.get()); | 
| 31 | 31 | 
| 32 uint32_t num_bytes; | 32 uint32_t num_bytes; | 
| 33 MojoResult result = NetToMojoPendingBuffer::BeginWrite( | 33 MojoResult result = NetToMojoPendingBuffer::BeginWrite( | 
| 34 &receive_stream_, &pending_receive_, &num_bytes); | 34 &receive_stream_, &pending_receive_, &num_bytes); | 
| 35 if (result == MOJO_RESULT_SHOULD_WAIT) { | 35 if (result == MOJO_RESULT_SHOULD_WAIT) { | 
| 36 // The pipe is full. We need to wait for it to have more space. | 36 // The pipe is full. We need to wait for it to have more space. | 
| 37 receive_handle_watcher_.Start( | 37 receive_handle_watcher_.Start( | 
| 38 receive_stream_.get(), | 38 receive_stream_.get(), | 
| 39 MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, | 39 MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, | 
| 40 MOJO_DEADLINE_INDEFINITE, | |
| 40 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, | 41 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, | 
| 41 weak_ptr_factory_.GetWeakPtr())); | 42 weak_ptr_factory_.GetWeakPtr())); | 
| 42 return; | 43 return; | 
| 43 } | 44 } | 
| 44 | 45 | 
| 45 if (result == MOJO_RESULT_FAILED_PRECONDITION) { | 46 if (result == MOJO_RESULT_FAILED_PRECONDITION) { | 
| 46 // It's valid that the user of this class consumed the data they care about | 47 // It's valid that the user of this class consumed the data they care about | 
| 47 // and closed their data pipe handles after writing data. This class should | 48 // and closed their data pipe handles after writing data. This class should | 
| 48 // still write out all the data. | 49 // still write out all the data. | 
| 50 ShutdownReceive(); | |
| 51 // TODO(johnmccutchan): Notify socket direction is closed along with | |
| 52 // net_result and mojo_result. | |
| 49 return; | 53 return; | 
| 50 } | 54 } | 
| 51 | 55 | 
| 52 if (result != MOJO_RESULT_OK) { | 56 if (result != MOJO_RESULT_OK) { | 
| 53 // The receive stream is in a bad state. | 57 // The receive stream is in a bad state. | 
| 54 // TODO(darin): How should this be communicated to our client? | 58 ShutdownReceive(); | 
| 59 // TODO(johnmccutchan): Notify socket direction is closed along with | |
| 60 // net_result and mojo_result. | |
| 55 return; | 61 return; | 
| 56 } | 62 } | 
| 57 | 63 | 
| 58 // Mojo is ready for the receive. | 64 // Mojo is ready for the receive. | 
| 59 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); | 65 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); | 
| 60 scoped_refptr<net::IOBuffer> buf( | 66 scoped_refptr<net::IOBuffer> buf( | 
| 61 new NetToMojoIOBuffer(pending_receive_.get())); | 67 new NetToMojoIOBuffer(pending_receive_.get())); | 
| 62 int read_result = socket_->Read( | 68 int read_result = socket_->Read( | 
| 63 buf.get(), static_cast<int>(num_bytes), | 69 buf.get(), static_cast<int>(num_bytes), | 
| 64 base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), | 70 base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), | 
| 65 false)); | 71 false)); | 
| 66 if (read_result == net::ERR_IO_PENDING) { | 72 if (read_result == net::ERR_IO_PENDING) { | 
| 67 // Pending I/O, wait for result in DidReceive(). | 73 // Pending I/O, wait for result in DidReceive(). | 
| 68 } else if (read_result >= 0) { | 74 } else if (read_result > 0) { | 
| 69 // Synchronous data ready. | 75 // Synchronous data ready. | 
| 70 DidReceive(true, read_result); | 76 DidReceive(true, read_result); | 
| 71 } else { | 77 } else { | 
| 72 // Some kind of error. | 78 // read_result == 0 indicates EOF. | 
| 73 // TODO(brettw) notify caller of error. | 79 // read_result < 0 indicates error. | 
| 80 ShutdownReceive(); | |
| 81 // TODO(johnmccutchan): Notify socket direction is closed along with | |
| 82 // net_result and mojo_result. | |
| 74 } | 83 } | 
| 75 } | 84 } | 
| 76 | 85 | 
| 77 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { | 86 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { | 
| 78 // TODO(darin): Handle a bad |result| value. | 87 if (result != MOJO_RESULT_OK) { | 
| 88 ShutdownReceive(); | |
| 89 // TODO(johnmccutchan): Notify socket direction is closed along with | |
| 90 // net_result and mojo_result. | |
| 91 return; | |
| 92 } | |
| 79 ReceiveMore(); | 93 ReceiveMore(); | 
| 80 } | 94 } | 
| 81 | 95 | 
| 82 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, | 96 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, | 
| 83 int result) { | 97 int result) { | 
| 84 if (result < 0) { | 98 if (result < 0) { | 
| 85 // Error. | 99 // Error. | 
| 86 pending_receive_ = NULL; // Closes the pipe (owned by the pending write). | 100 ShutdownReceive(); | 
| 87 // TODO(brettw) notify the caller of an error? | 101 // TODO(johnmccutchan): Notify socket direction is closed along with | 
| 102 // net_result and mojo_result. | |
| 88 return; | 103 return; | 
| 89 } | 104 } | 
| 90 | 105 | 
| 91 receive_stream_ = pending_receive_->Complete(result); | 106 receive_stream_ = pending_receive_->Complete(result); | 
| 92 pending_receive_ = NULL; | 107 pending_receive_ = NULL; | 
| 93 | 108 | 
| 94 // Schedule more reading. | 109 // Schedule more reading. | 
| 95 if (completed_synchronously) { | 110 if (completed_synchronously) { | 
| 96 // Don't recursively call ReceiveMore if this is a sync read. | 111 // Don't recursively call ReceiveMore if this is a sync read. | 
| 97 base::MessageLoop::current()->PostTask( | 112 base::MessageLoop::current()->PostTask( | 
| 98 FROM_HERE, | 113 FROM_HERE, | 
| 99 base::Bind(&TCPConnectedSocketImpl::ReceiveMore, | 114 base::Bind(&TCPConnectedSocketImpl::ReceiveMore, | 
| 100 weak_ptr_factory_.GetWeakPtr())); | 115 weak_ptr_factory_.GetWeakPtr())); | 
| 101 } else { | 116 } else { | 
| 102 ReceiveMore(); | 117 ReceiveMore(); | 
| 103 } | 118 } | 
| 104 } | 119 } | 
| 105 | 120 | 
| 121 void TCPConnectedSocketImpl::ShutdownReceive() { | |
| 122 pending_receive_ = NULL; | |
| 
jamesr
2015/03/19 21:54:23
nullptr. only use NULL in code that has to compile
 
Cutch
2015/03/19 22:03:32
Fixed here and elsewhere.
 
Cutch
2015/03/19 22:03:32
Done.
 | |
| 123 receive_stream_.reset(); | |
| 124 } | |
| 125 | |
| 106 void TCPConnectedSocketImpl::SendMore() { | 126 void TCPConnectedSocketImpl::SendMore() { | 
| 107 uint32_t num_bytes = 0; | 127 uint32_t num_bytes = 0; | 
| 108 MojoResult result = MojoToNetPendingBuffer::BeginRead( | 128 MojoResult result = MojoToNetPendingBuffer::BeginRead( | 
| 109 &send_stream_, &pending_send_, &num_bytes); | 129 &send_stream_, &pending_send_, &num_bytes); | 
| 110 if (result == MOJO_RESULT_SHOULD_WAIT) { | 130 if (result == MOJO_RESULT_SHOULD_WAIT) { | 
| 111 // Data not ready, wait for it. | 131 // Data not ready, wait for it. | 
| 112 send_handle_watcher_.Start( | 132 send_handle_watcher_.Start( | 
| 113 send_stream_.get(), | 133 send_stream_.get(), | 
| 114 MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, | 134 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, | 
| 135 MOJO_DEADLINE_INDEFINITE, | |
| 115 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, | 136 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, | 
| 116 weak_ptr_factory_.GetWeakPtr())); | 137 weak_ptr_factory_.GetWeakPtr())); | 
| 117 return; | 138 return; | 
| 118 } else if (result != MOJO_RESULT_OK) { | 139 } else if (result != MOJO_RESULT_OK) { | 
| 119 // TODO(brettw) notify caller of error. | 140 ShutdownSend(); | 
| 141 // TODO(johnmccutchan): Notify socket direction is closed along with | |
| 142 // net_result and mojo_result. | |
| 120 return; | 143 return; | 
| 121 } | 144 } | 
| 122 | 145 | 
| 123 // Got a buffer from Mojo, give it to the socket. Note that the sockets may | 146 // Got a buffer from Mojo, give it to the socket. Note that the sockets may | 
| 124 // do partial writes. | 147 // do partial writes. | 
| 125 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); | 148 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); | 
| 126 int write_result = socket_->Write( | 149 int write_result = socket_->Write( | 
| 127 buf.get(), static_cast<int>(num_bytes), | 150 buf.get(), static_cast<int>(num_bytes), | 
| 128 base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), | 151 base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), | 
| 129 false)); | 152 false)); | 
| 130 if (write_result == net::ERR_IO_PENDING) { | 153 if (write_result == net::ERR_IO_PENDING) { | 
| 131 // Pending I/O, wait for result in DidSend(). | 154 // Pending I/O, wait for result in DidSend(). | 
| 132 } else if (write_result >= 0) { | 155 } else if (write_result >= 0) { | 
| 133 // Synchronous data consumed. | 156 // Synchronous data consumed. | 
| 134 DidSend(true, write_result); | 157 DidSend(true, write_result); | 
| 158 } else { | |
| 159 // write_result < 0 indicates error. | |
| 160 ShutdownSend(); | |
| 161 // TODO(johnmccutchan): Notify socket direction is closed along with | |
| 162 // net_result and mojo_result. | |
| 135 } | 163 } | 
| 136 } | 164 } | 
| 137 | 165 | 
| 138 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { | 166 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { | 
| 139 // TODO(brettw): Handle a bad |result| value. | 167 if (result != MOJO_RESULT_OK) { | 
| 168 ShutdownSend(); | |
| 169 // TODO(johnmccutchan): Notify socket direction is closed along with | |
| 170 // net_result and mojo_result. | |
| 171 return; | |
| 172 } | |
| 140 SendMore(); | 173 SendMore(); | 
| 141 } | 174 } | 
| 142 | 175 | 
| 143 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, | 176 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, | 
| 144 int result) { | 177 int result) { | 
| 145 if (result < 0) { | 178 if (result < 0) { | 
| 146 // TODO(brettw) report error. | 179 ShutdownSend(); | 
| 147 pending_send_ = NULL; | 180 // TODO(johnmccutchan): Notify socket direction is closed along with | 
| 181 // net_result and mojo_result. | |
| 148 return; | 182 return; | 
| 149 } | 183 } | 
| 150 | 184 | 
| 151 // Take back ownership of the stream and free the IOBuffer. | 185 // Take back ownership of the stream and free the IOBuffer. | 
| 152 send_stream_ = pending_send_->Complete(result); | 186 send_stream_ = pending_send_->Complete(result); | 
| 153 pending_send_ = NULL; | 187 pending_send_ = NULL; | 
| 154 | 188 | 
| 155 // Schedule more writing. | 189 // Schedule more writing. | 
| 156 if (completed_synchronously) { | 190 if (completed_synchronously) { | 
| 157 // Don't recursively call SendMore if this is a sync read. | 191 // Don't recursively call SendMore if this is a sync read. | 
| 158 base::MessageLoop::current()->PostTask( | 192 base::MessageLoop::current()->PostTask( | 
| 159 FROM_HERE, | 193 FROM_HERE, | 
| 160 base::Bind(&TCPConnectedSocketImpl::SendMore, | 194 base::Bind(&TCPConnectedSocketImpl::SendMore, | 
| 161 weak_ptr_factory_.GetWeakPtr())); | 195 weak_ptr_factory_.GetWeakPtr())); | 
| 162 } else { | 196 } else { | 
| 163 SendMore(); | 197 SendMore(); | 
| 164 } | 198 } | 
| 165 } | 199 } | 
| 166 | 200 | 
| 201 void TCPConnectedSocketImpl::ShutdownSend() { | |
| 202 pending_send_ = NULL; | |
| 
jamesr
2015/03/19 21:54:23
nullptr
 | |
| 203 send_stream_.reset(); | |
| 204 } | |
| 205 | |
| 167 } // namespace mojo | 206 } // namespace mojo | 
| OLD | NEW |