Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(26)

Side by Side Diff: mojo/services/network/tcp_connected_socket_impl.cc

Issue 1873463003: Remove mojo network service. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/services/network/tcp_connected_socket_impl.h"
6
7 #include <stdint.h>
8
9 #include <utility>
10
11 #include "base/message_loop/message_loop.h"
12 #include "mojo/services/network/net_adapters.h"
13 #include "net/base/net_errors.h"
14
15 namespace mojo {
16
17 TCPConnectedSocketImpl::TCPConnectedSocketImpl(
18 scoped_ptr<net::TCPSocket> socket,
19 ScopedDataPipeConsumerHandle send_stream,
20 ScopedDataPipeProducerHandle receive_stream,
21 InterfaceRequest<TCPConnectedSocket> request,
22 scoped_ptr<mojo::MessageLoopRef> app_refcount)
23 : socket_(std::move(socket)),
24 send_stream_(std::move(send_stream)),
25 receive_stream_(std::move(receive_stream)),
26 binding_(this, std::move(request)),
27 app_refcount_(std::move(app_refcount)),
28 weak_ptr_factory_(this) {
29 // Queue up async communication.
30 binding_.set_connection_error_handler([this]() { OnConnectionError(); });
31 ListenForReceivePeerClosed();
32 ListenForSendPeerClosed();
33 ReceiveMore();
34 SendMore();
35 }
36
37 TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
38 }
39
40 void TCPConnectedSocketImpl::OnConnectionError() {
41 binding_.Close();
42 DeleteIfNeeded();
43 }
44
45 void TCPConnectedSocketImpl::ReceiveMore() {
46 DCHECK(!pending_receive_.get());
47
48 uint32_t num_bytes;
49 MojoResult result = NetToMojoPendingBuffer::BeginWrite(
50 &receive_stream_, &pending_receive_, &num_bytes);
51 if (result == MOJO_RESULT_SHOULD_WAIT) {
52 // The pipe is full. We need to wait for it to have more space.
53 receive_handle_watcher_.Start(
54 receive_stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
55 MOJO_DEADLINE_INDEFINITE,
56 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
57 weak_ptr_factory_.GetWeakPtr()));
58 return;
59 }
60
61 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
62 // It's valid that the user of this class consumed the data they care about
63 // and closed their data pipe handles after writing data. This class should
64 // still write out all the data.
65 ShutdownReceive();
66 // TODO(johnmccutchan): Notify socket direction is closed along with
67 // net_result and mojo_result.
68 return;
69 }
70
71 if (result != MOJO_RESULT_OK) {
72 // The receive stream is in a bad state.
73 ShutdownReceive();
74 // TODO(johnmccutchan): Notify socket direction is closed along with
75 // net_result and mojo_result.
76 return;
77 }
78
79 // Mojo is ready for the receive.
80 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
81 scoped_refptr<net::IOBuffer> buf(
82 new NetToMojoIOBuffer(pending_receive_.get()));
83 int read_result =
84 socket_->Read(buf.get(), static_cast<int>(num_bytes),
85 base::Bind(&TCPConnectedSocketImpl::DidReceive,
86 weak_ptr_factory_.GetWeakPtr(), false));
87 if (read_result == net::ERR_IO_PENDING) {
88 // Pending I/O, wait for result in DidReceive().
89 } else if (read_result > 0) {
90 // Synchronous data ready.
91 DidReceive(true, read_result);
92 } else {
93 // read_result == 0 indicates EOF.
94 // read_result < 0 indicates error.
95 ShutdownReceive();
96 // TODO(johnmccutchan): Notify socket direction is closed along with
97 // net_result and mojo_result.
98 }
99 }
100
101 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
102 if (result != MOJO_RESULT_OK) {
103 ShutdownReceive();
104 // TODO(johnmccutchan): Notify socket direction is closed along with
105 // net_result and mojo_result.
106 return;
107 }
108 ListenForReceivePeerClosed();
109 ReceiveMore();
110 }
111
112 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
113 int result) {
114 if (!pending_receive_)
115 return;
116
117 if (result < 0) {
118 // Error.
119 ShutdownReceive();
120 // TODO(johnmccutchan): Notify socket direction is closed along with
121 // net_result and mojo_result.
122 return;
123 }
124
125 receive_stream_ = pending_receive_->Complete(result);
126 pending_receive_ = nullptr;
127
128 // Schedule more reading.
129 if (completed_synchronously) {
130 // Don't recursively call ReceiveMore if this is a sync read.
131 base::MessageLoop::current()->PostTask(
132 FROM_HERE, base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
133 weak_ptr_factory_.GetWeakPtr()));
134 } else {
135 ReceiveMore();
136 }
137 }
138
139 void TCPConnectedSocketImpl::ShutdownReceive() {
140 receive_handle_watcher_.Stop();
141 pending_receive_ = nullptr;
142 receive_stream_.reset();
143 DeleteIfNeeded();
144 }
145
146 void TCPConnectedSocketImpl::ListenForReceivePeerClosed() {
147 receive_handle_watcher_.Start(
148 receive_stream_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
149 MOJO_DEADLINE_INDEFINITE,
150 base::Bind(&TCPConnectedSocketImpl::OnReceiveDataPipeClosed,
151 weak_ptr_factory_.GetWeakPtr()));
152 }
153
154 void TCPConnectedSocketImpl::OnReceiveDataPipeClosed(MojoResult result) {
155 ShutdownReceive();
156 }
157
158 void TCPConnectedSocketImpl::SendMore() {
159 uint32_t num_bytes = 0;
160 MojoResult result = MojoToNetPendingBuffer::BeginRead(
161 &send_stream_, &pending_send_, &num_bytes);
162 if (result == MOJO_RESULT_SHOULD_WAIT) {
163 // Data not ready, wait for it.
164 send_handle_watcher_.Start(
165 send_stream_.get(), MOJO_HANDLE_SIGNAL_READABLE,
166 MOJO_DEADLINE_INDEFINITE,
167 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
168 weak_ptr_factory_.GetWeakPtr()));
169 return;
170 } else if (result != MOJO_RESULT_OK) {
171 ShutdownSend();
172 // TODO(johnmccutchan): Notify socket direction is closed along with
173 // net_result and mojo_result.
174 return;
175 }
176
177 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
178 // do partial writes.
179 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
180 int write_result =
181 socket_->Write(buf.get(), static_cast<int>(num_bytes),
182 base::Bind(&TCPConnectedSocketImpl::DidSend,
183 weak_ptr_factory_.GetWeakPtr(), false));
184 if (write_result == net::ERR_IO_PENDING) {
185 // Pending I/O, wait for result in DidSend().
186 } else if (write_result >= 0) {
187 // Synchronous data consumed.
188 DidSend(true, write_result);
189 } else {
190 // write_result < 0 indicates error.
191 ShutdownSend();
192 // TODO(johnmccutchan): Notify socket direction is closed along with
193 // net_result and mojo_result.
194 }
195 }
196
197 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
198 if (result != MOJO_RESULT_OK) {
199 ShutdownSend();
200 // TODO(johnmccutchan): Notify socket direction is closed along with
201 // net_result and mojo_result.
202 return;
203 }
204 ListenForSendPeerClosed();
205 SendMore();
206 }
207
208 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, int result) {
209 if (!pending_send_)
210 return;
211
212 if (result < 0) {
213 ShutdownSend();
214 // TODO(johnmccutchan): Notify socket direction is closed along with
215 // net_result and mojo_result.
216 return;
217 }
218
219 // Take back ownership of the stream and free the IOBuffer.
220 send_stream_ = pending_send_->Complete(result);
221 pending_send_ = nullptr;
222
223 // Schedule more writing.
224 if (completed_synchronously) {
225 // Don't recursively call SendMore if this is a sync read.
226 base::MessageLoop::current()->PostTask(
227 FROM_HERE, base::Bind(&TCPConnectedSocketImpl::SendMore,
228 weak_ptr_factory_.GetWeakPtr()));
229 } else {
230 SendMore();
231 }
232 }
233
234 void TCPConnectedSocketImpl::ShutdownSend() {
235 send_handle_watcher_.Stop();
236 pending_send_ = nullptr;
237 send_stream_.reset();
238 DeleteIfNeeded();
239 }
240
241 void TCPConnectedSocketImpl::ListenForSendPeerClosed() {
242 send_handle_watcher_.Start(
243 send_stream_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
244 MOJO_DEADLINE_INDEFINITE,
245 base::Bind(&TCPConnectedSocketImpl::OnSendDataPipeClosed,
246 weak_ptr_factory_.GetWeakPtr()));
247 }
248
249 void TCPConnectedSocketImpl::OnSendDataPipeClosed(MojoResult result) {
250 ShutdownSend();
251 }
252
253 void TCPConnectedSocketImpl::DeleteIfNeeded() {
254 bool has_send = pending_send_ || send_stream_.is_valid();
255 bool has_receive = pending_receive_ || receive_stream_.is_valid();
256 if (!binding_.is_bound() && !has_send && !has_receive)
257 delete this;
258 }
259
260 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/services/network/tcp_connected_socket_impl.h ('k') | mojo/services/network/tcp_server_socket_impl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698