Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(637)

Side by Side Diff: mojo/services/network/tcp_connected_socket_impl.cc

Issue 1015733002: Close data pipes when TCP network connection goes down. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« AUTHORS ('K') | « mojo/services/network/tcp_connected_socket_impl.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/services/network/tcp_connected_socket_impl.h" 5 #include "mojo/services/network/tcp_connected_socket_impl.h"
6 6
7 #include "base/message_loop/message_loop.h" 7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h" 8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h" 9 #include "net/base/net_errors.h"
10 10
(...skipping 18 matching lines...) Expand all
29 void TCPConnectedSocketImpl::ReceiveMore() { 29 void TCPConnectedSocketImpl::ReceiveMore() {
30 DCHECK(!pending_receive_.get()); 30 DCHECK(!pending_receive_.get());
31 31
32 uint32_t num_bytes; 32 uint32_t num_bytes;
33 MojoResult result = NetToMojoPendingBuffer::BeginWrite( 33 MojoResult result = NetToMojoPendingBuffer::BeginWrite(
34 &receive_stream_, &pending_receive_, &num_bytes); 34 &receive_stream_, &pending_receive_, &num_bytes);
35 if (result == MOJO_RESULT_SHOULD_WAIT) { 35 if (result == MOJO_RESULT_SHOULD_WAIT) {
36 // The pipe is full. We need to wait for it to have more space. 36 // The pipe is full. We need to wait for it to have more space.
37 receive_handle_watcher_.Start( 37 receive_handle_watcher_.Start(
38 receive_stream_.get(), 38 receive_stream_.get(),
39 MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, 39 MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
40 MOJO_DEADLINE_INDEFINITE,
40 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady, 41 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
41 weak_ptr_factory_.GetWeakPtr())); 42 weak_ptr_factory_.GetWeakPtr()));
42 return; 43 return;
43 } 44 }
44 45
45 if (result == MOJO_RESULT_FAILED_PRECONDITION) { 46 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
46 // It's valid that the user of this class consumed the data they care about 47 // It's valid that the user of this class consumed the data they care about
47 // and closed their data pipe handles after writing data. This class should 48 // and closed their data pipe handles after writing data. This class should
48 // still write out all the data. 49 // still write out all the data.
50 ShutdownReceive();
51 NotifyClose(RECEIVE, 0, result);
49 return; 52 return;
50 } 53 }
51 54
52 if (result != MOJO_RESULT_OK) { 55 if (result != MOJO_RESULT_OK) {
53 // The receive stream is in a bad state. 56 // The receive stream is in a bad state.
54 // TODO(darin): How should this be communicated to our client? 57 ShutdownReceive();
58 NotifyClose(RECEIVE, 0, result);
55 return; 59 return;
56 } 60 }
57 61
58 // Mojo is ready for the receive. 62 // Mojo is ready for the receive.
59 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); 63 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
60 scoped_refptr<net::IOBuffer> buf( 64 scoped_refptr<net::IOBuffer> buf(
61 new NetToMojoIOBuffer(pending_receive_.get())); 65 new NetToMojoIOBuffer(pending_receive_.get()));
62 int read_result = socket_->Read( 66 int read_result = socket_->Read(
63 buf.get(), static_cast<int>(num_bytes), 67 buf.get(), static_cast<int>(num_bytes),
64 base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this), 68 base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this),
65 false)); 69 false));
66 if (read_result == net::ERR_IO_PENDING) { 70 if (read_result == net::ERR_IO_PENDING) {
67 // Pending I/O, wait for result in DidReceive(). 71 // Pending I/O, wait for result in DidReceive().
68 } else if (read_result >= 0) { 72 } else if (read_result > 0) {
69 // Synchronous data ready. 73 // Synchronous data ready.
70 DidReceive(true, read_result); 74 DidReceive(true, read_result);
71 } else { 75 } else {
72 // Some kind of error. 76 // read_result == 0 indicates EOF.
73 // TODO(brettw) notify caller of error. 77 // read_result < 0 indicates error.
78 ShutdownReceive();
79 NotifyClose(RECEIVE, read_result, MOJO_RESULT_OK);
74 } 80 }
75 } 81 }
76 82
77 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) { 83 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
78 // TODO(darin): Handle a bad |result| value. 84 if (result != MOJO_RESULT_OK) {
85 ShutdownReceive();
86 NotifyClose(RECEIVE, 0, result);
87 return;
88 }
79 ReceiveMore(); 89 ReceiveMore();
80 } 90 }
81 91
82 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously, 92 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
83 int result) { 93 int result) {
84 if (result < 0) { 94 if (result < 0) {
85 // Error. 95 // Error.
86 pending_receive_ = NULL; // Closes the pipe (owned by the pending write). 96 ShutdownReceive();
87 // TODO(brettw) notify the caller of an error? 97 NotifyClose(RECEIVE, result, MOJO_RESULT_OK);
88 return; 98 return;
89 } 99 }
90 100
91 receive_stream_ = pending_receive_->Complete(result); 101 receive_stream_ = pending_receive_->Complete(result);
92 pending_receive_ = NULL; 102 pending_receive_ = NULL;
93 103
94 // Schedule more reading. 104 // Schedule more reading.
95 if (completed_synchronously) { 105 if (completed_synchronously) {
96 // Don't recursively call ReceiveMore if this is a sync read. 106 // Don't recursively call ReceiveMore if this is a sync read.
97 base::MessageLoop::current()->PostTask( 107 base::MessageLoop::current()->PostTask(
98 FROM_HERE, 108 FROM_HERE,
99 base::Bind(&TCPConnectedSocketImpl::ReceiveMore, 109 base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
100 weak_ptr_factory_.GetWeakPtr())); 110 weak_ptr_factory_.GetWeakPtr()));
101 } else { 111 } else {
102 ReceiveMore(); 112 ReceiveMore();
103 } 113 }
104 } 114 }
105 115
116 void TCPConnectedSocketImpl::ShutdownReceive() {
117 pending_receive_ = NULL;
118 receive_stream_.reset();
119 }
120
106 void TCPConnectedSocketImpl::SendMore() { 121 void TCPConnectedSocketImpl::SendMore() {
107 uint32_t num_bytes = 0; 122 uint32_t num_bytes = 0;
108 MojoResult result = MojoToNetPendingBuffer::BeginRead( 123 MojoResult result = MojoToNetPendingBuffer::BeginRead(
109 &send_stream_, &pending_send_, &num_bytes); 124 &send_stream_, &pending_send_, &num_bytes);
110 if (result == MOJO_RESULT_SHOULD_WAIT) { 125 if (result == MOJO_RESULT_SHOULD_WAIT) {
111 // Data not ready, wait for it. 126 // Data not ready, wait for it.
112 send_handle_watcher_.Start( 127 send_handle_watcher_.Start(
113 send_stream_.get(), 128 send_stream_.get(),
114 MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, 129 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
130 MOJO_DEADLINE_INDEFINITE,
115 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady, 131 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
116 weak_ptr_factory_.GetWeakPtr())); 132 weak_ptr_factory_.GetWeakPtr()));
117 return; 133 return;
118 } else if (result != MOJO_RESULT_OK) { 134 } else if (result != MOJO_RESULT_OK) {
119 // TODO(brettw) notify caller of error. 135 ShutdownSend();
136 NotifyClose(SEND, 0, result);
120 return; 137 return;
121 } 138 }
122 139
123 // Got a buffer from Mojo, give it to the socket. Note that the sockets may 140 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
124 // do partial writes. 141 // do partial writes.
125 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get())); 142 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
126 int write_result = socket_->Write( 143 int write_result = socket_->Write(
127 buf.get(), static_cast<int>(num_bytes), 144 buf.get(), static_cast<int>(num_bytes),
128 base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this), 145 base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this),
129 false)); 146 false));
130 if (write_result == net::ERR_IO_PENDING) { 147 if (write_result == net::ERR_IO_PENDING) {
131 // Pending I/O, wait for result in DidSend(). 148 // Pending I/O, wait for result in DidSend().
132 } else if (write_result >= 0) { 149 } else if (write_result >= 0) {
133 // Synchronous data consumed. 150 // Synchronous data consumed.
134 DidSend(true, write_result); 151 DidSend(true, write_result);
152 } else {
153 // write_result < 0 indicates error.
154 ShutdownSend();
155 NotifyClose(SEND, write_result, MOJO_RESULT_OK);
135 } 156 }
136 } 157 }
137 158
138 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) { 159 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
139 // TODO(brettw): Handle a bad |result| value. 160 if (result != MOJO_RESULT_OK) {
161 ShutdownSend();
162 NotifyClose(SEND, 0, result);
163 return;
164 }
140 SendMore(); 165 SendMore();
141 } 166 }
142 167
143 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, 168 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously,
144 int result) { 169 int result) {
145 if (result < 0) { 170 if (result < 0) {
146 // TODO(brettw) report error. 171 ShutdownSend();
147 pending_send_ = NULL; 172 NotifyClose(SEND, result, MOJO_RESULT_OK);
148 return; 173 return;
149 } 174 }
150 175
151 // Take back ownership of the stream and free the IOBuffer. 176 // Take back ownership of the stream and free the IOBuffer.
152 send_stream_ = pending_send_->Complete(result); 177 send_stream_ = pending_send_->Complete(result);
153 pending_send_ = NULL; 178 pending_send_ = NULL;
154 179
155 // Schedule more writing. 180 // Schedule more writing.
156 if (completed_synchronously) { 181 if (completed_synchronously) {
157 // Don't recursively call SendMore if this is a sync read. 182 // Don't recursively call SendMore if this is a sync read.
158 base::MessageLoop::current()->PostTask( 183 base::MessageLoop::current()->PostTask(
159 FROM_HERE, 184 FROM_HERE,
160 base::Bind(&TCPConnectedSocketImpl::SendMore, 185 base::Bind(&TCPConnectedSocketImpl::SendMore,
161 weak_ptr_factory_.GetWeakPtr())); 186 weak_ptr_factory_.GetWeakPtr()));
162 } else { 187 } else {
163 SendMore(); 188 SendMore();
164 } 189 }
165 } 190 }
166 191
192 void TCPConnectedSocketImpl::ShutdownSend() {
193 pending_send_ = NULL;
194 send_stream_.reset();
195 }
196
197 void TCPConnectedSocketImpl::NotifyClose(Direction direction,
198 int net_result,
199 MojoResult mojo_result) {
200 // TODO(johnmccutchan): Notify socket direction is closed along with
201 // net_result and mojo_result.
202 }
203
167 } // namespace mojo 204 } // namespace mojo
OLDNEW
« AUTHORS ('K') | « mojo/services/network/tcp_connected_socket_impl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698