Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(192)

Side by Side Diff: mojo/services/network/tcp_connected_socket_impl.cc

Issue 1015733002: Close data pipes when TCP network connection goes down. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/services/network/tcp_connected_socket_impl.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/services/network/tcp_connected_socket_impl.h" 5 #include "mojo/services/network/tcp_connected_socket_impl.h"
6 6
7 #include "base/message_loop/message_loop.h" 7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h" 8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h" 9 #include "net/base/net_errors.h"
10 10
(...skipping 18 matching lines...) Expand all
29 void TCPConnectedSocketImpl::ReceiveMore() { 29 void TCPConnectedSocketImpl::ReceiveMore() {
30 DCHECK(!pending_receive_.get()); 30 DCHECK(!pending_receive_.get());
31 31
32 uint32_t num_bytes; 32 uint32_t num_bytes;
33 MojoResult result = NetToMojoPendingBuffer::BeginWrite( 33 MojoResult result = NetToMojoPendingBuffer::BeginWrite(
34 &receive_stream_, &pending_receive_, &num_bytes); 34 &receive_stream_, &pending_receive_, &num_bytes);
35 if (result == MOJO_RESULT_SHOULD_WAIT) { 35 if (result == MOJO_RESULT_SHOULD_WAIT) {
36 // The pipe is full. We need to wait for it to have more space. 36 // The pipe is full. We need to wait for it to have more space.
37 receive_handle_watcher_.Start( 37 receive_handle_watcher_.Start(
38 receive_stream_.get(), 38 receive_stream_.get(),
39 MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, 39 MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
40 MOJO_DEADLINE_INDEFINITE,
40 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, 41 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
41 weak_ptr_factory_.GetWeakPtr())); 42 weak_ptr_factory_.GetWeakPtr()));
42 return; 43 return;
43 } 44 }
44 45
45 if (result == MOJO_RESULT_FAILED_PRECONDITION) { 46 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
46 // It's valid that the user of this class consumed the data they care about 47 // It's valid that the user of this class consumed the data they care about
47 // and closed their data pipe handles after writing data. This class should 48 // and closed their data pipe handles after writing data. This class should
48 // still write out all the data. 49 // still write out all the data.
50 ShutdownReceive();
51 // TODO(johnmccutchan): Notify socket direction is closed along with
52 // net_result and mojo_result.
49 return; 53 return;
50 } 54 }
51 55
52 if (result != MOJO_RESULT_OK) { 56 if (result != MOJO_RESULT_OK) {
53 // The receive stream is in a bad state. 57 // The receive stream is in a bad state.
54 // TODO(darin): How should this be communicated to our client? 58 ShutdownReceive();
59 // TODO(johnmccutchan): Notify socket direction is closed along with
60 // net_result and mojo_result.
55 return; 61 return;
56 } 62 }
57 63
58 // Mojo is ready for the receive. 64 // Mojo is ready for the receive.
59 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); 65 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
60 scoped_refptr<net::IOBuffer> buf( 66 scoped_refptr<net::IOBuffer> buf(
61 new NetToMojoIOBuffer(pending_receive_.get())); 67 new NetToMojoIOBuffer(pending_receive_.get()));
62 int read_result = socket_->Read( 68 int read_result = socket_->Read(
63 buf.get(), static_cast<int>(num_bytes), 69 buf.get(), static_cast<int>(num_bytes),
64 base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), 70 base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this),
65 false)); 71 false));
66 if (read_result == net::ERR_IO_PENDING) { 72 if (read_result == net::ERR_IO_PENDING) {
67 // Pending I/O, wait for result in DidReceive(). 73 // Pending I/O, wait for result in DidReceive().
68 } else if (read_result >= 0) { 74 } else if (read_result > 0) {
69 // Synchronous data ready. 75 // Synchronous data ready.
70 DidReceive(true, read_result); 76 DidReceive(true, read_result);
71 } else { 77 } else {
72 // Some kind of error. 78 // read_result == 0 indicates EOF.
73 // TODO(brettw) notify caller of error. 79 // read_result < 0 indicates error.
80 ShutdownReceive();
81 // TODO(johnmccutchan): Notify socket direction is closed along with
82 // net_result and mojo_result.
74 } 83 }
75 } 84 }
76 85
77 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { 86 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
78 // TODO(darin): Handle a bad |result| value. 87 if (result != MOJO_RESULT_OK) {
88 ShutdownReceive();
89 // TODO(johnmccutchan): Notify socket direction is closed along with
90 // net_result and mojo_result.
91 return;
92 }
79 ReceiveMore(); 93 ReceiveMore();
80 } 94 }
81 95
82 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, 96 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
83 int result) { 97 int result) {
84 if (result < 0) { 98 if (result < 0) {
85 // Error. 99 // Error.
86 pending_receive_ = NULL; // Closes the pipe (owned by the pending write). 100 ShutdownReceive();
87 // TODO(brettw) notify the caller of an error? 101 // TODO(johnmccutchan): Notify socket direction is closed along with
102 // net_result and mojo_result.
88 return; 103 return;
89 } 104 }
90 105
91 receive_stream_ = pending_receive_->Complete(result); 106 receive_stream_ = pending_receive_->Complete(result);
92 pending_receive_ = NULL; 107 pending_receive_ = NULL;
93 108
94 // Schedule more reading. 109 // Schedule more reading.
95 if (completed_synchronously) { 110 if (completed_synchronously) {
96 // Don't recursively call ReceiveMore if this is a sync read. 111 // Don't recursively call ReceiveMore if this is a sync read.
97 base::MessageLoop::current()->PostTask( 112 base::MessageLoop::current()->PostTask(
98 FROM_HERE, 113 FROM_HERE,
99 base::Bind(&TCPConnectedSocketImpl::ReceiveMore, 114 base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
100 weak_ptr_factory_.GetWeakPtr())); 115 weak_ptr_factory_.GetWeakPtr()));
101 } else { 116 } else {
102 ReceiveMore(); 117 ReceiveMore();
103 } 118 }
104 } 119 }
105 120
121 void TCPConnectedSocketImpl::ShutdownReceive() {
122 pending_receive_ = NULL;
jamesr 2015/03/19 21:54:23 nullptr. only use NULL in code that has to compile
Cutch 2015/03/19 22:03:32 Fixed here and elsewhere.
Cutch 2015/03/19 22:03:32 Done.
123 receive_stream_.reset();
124 }
125
106 void TCPConnectedSocketImpl::SendMore() { 126 void TCPConnectedSocketImpl::SendMore() {
107 uint32_t num_bytes = 0; 127 uint32_t num_bytes = 0;
108 MojoResult result = MojoToNetPendingBuffer::BeginRead( 128 MojoResult result = MojoToNetPendingBuffer::BeginRead(
109 &send_stream_, &pending_send_, &num_bytes); 129 &send_stream_, &pending_send_, &num_bytes);
110 if (result == MOJO_RESULT_SHOULD_WAIT) { 130 if (result == MOJO_RESULT_SHOULD_WAIT) {
111 // Data not ready, wait for it. 131 // Data not ready, wait for it.
112 send_handle_watcher_.Start( 132 send_handle_watcher_.Start(
113 send_stream_.get(), 133 send_stream_.get(),
114 MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, 134 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
135 MOJO_DEADLINE_INDEFINITE,
115 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, 136 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
116 weak_ptr_factory_.GetWeakPtr())); 137 weak_ptr_factory_.GetWeakPtr()));
117 return; 138 return;
118 } else if (result != MOJO_RESULT_OK) { 139 } else if (result != MOJO_RESULT_OK) {
119 // TODO(brettw) notify caller of error. 140 ShutdownSend();
141 // TODO(johnmccutchan): Notify socket direction is closed along with
142 // net_result and mojo_result.
120 return; 143 return;
121 } 144 }
122 145
123 // Got a buffer from Mojo, give it to the socket. Note that the sockets may 146 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
124 // do partial writes. 147 // do partial writes.
125 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); 148 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
126 int write_result = socket_->Write( 149 int write_result = socket_->Write(
127 buf.get(), static_cast<int>(num_bytes), 150 buf.get(), static_cast<int>(num_bytes),
128 base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), 151 base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this),
129 false)); 152 false));
130 if (write_result == net::ERR_IO_PENDING) { 153 if (write_result == net::ERR_IO_PENDING) {
131 // Pending I/O, wait for result in DidSend(). 154 // Pending I/O, wait for result in DidSend().
132 } else if (write_result >= 0) { 155 } else if (write_result >= 0) {
133 // Synchronous data consumed. 156 // Synchronous data consumed.
134 DidSend(true, write_result); 157 DidSend(true, write_result);
158 } else {
159 // write_result < 0 indicates error.
160 ShutdownSend();
161 // TODO(johnmccutchan): Notify socket direction is closed along with
162 // net_result and mojo_result.
135 } 163 }
136 } 164 }
137 165
138 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { 166 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
139 // TODO(brettw): Handle a bad |result| value. 167 if (result != MOJO_RESULT_OK) {
168 ShutdownSend();
169 // TODO(johnmccutchan): Notify socket direction is closed along with
170 // net_result and mojo_result.
171 return;
172 }
140 SendMore(); 173 SendMore();
141 } 174 }
142 175
143 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, 176 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously,
144 int result) { 177 int result) {
145 if (result < 0) { 178 if (result < 0) {
146 // TODO(brettw) report error. 179 ShutdownSend();
147 pending_send_ = NULL; 180 // TODO(johnmccutchan): Notify socket direction is closed along with
181 // net_result and mojo_result.
148 return; 182 return;
149 } 183 }
150 184
151 // Take back ownership of the stream and free the IOBuffer. 185 // Take back ownership of the stream and free the IOBuffer.
152 send_stream_ = pending_send_->Complete(result); 186 send_stream_ = pending_send_->Complete(result);
153 pending_send_ = NULL; 187 pending_send_ = NULL;
154 188
155 // Schedule more writing. 189 // Schedule more writing.
156 if (completed_synchronously) { 190 if (completed_synchronously) {
157 // Don't recursively call SendMore if this is a sync read. 191 // Don't recursively call SendMore if this is a sync read.
158 base::MessageLoop::current()->PostTask( 192 base::MessageLoop::current()->PostTask(
159 FROM_HERE, 193 FROM_HERE,
160 base::Bind(&TCPConnectedSocketImpl::SendMore, 194 base::Bind(&TCPConnectedSocketImpl::SendMore,
161 weak_ptr_factory_.GetWeakPtr())); 195 weak_ptr_factory_.GetWeakPtr()));
162 } else { 196 } else {
163 SendMore(); 197 SendMore();
164 } 198 }
165 } 199 }
166 200
201 void TCPConnectedSocketImpl::ShutdownSend() {
202 pending_send_ = NULL;
jamesr 2015/03/19 21:54:23 nullptr
203 send_stream_.reset();
204 }
205
167 } // namespace mojo 206 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/services/network/tcp_connected_socket_impl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698