Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: tools/android/forwarder2/forwarder.cc

Issue 148113003: forwarder2: Make the Forwarder instances operate on a single thread. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/forwarders_manager.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "tools/android/forwarder2/forwarder.h" 5 #include "tools/android/forwarder2/forwarder.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/bind.h"
9 #include "base/logging.h" 8 #include "base/logging.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "base/posix/eintr_wrapper.h" 9 #include "base/posix/eintr_wrapper.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/threading/thread.h"
15 #include "tools/android/forwarder2/pipe_notifier.h"
16 #include "tools/android/forwarder2/socket.h" 10 #include "tools/android/forwarder2/socket.h"
17 11
18 namespace forwarder2 { 12 namespace forwarder2 {
19 namespace {
20 13
21 // Helper class to buffer reads and writes from one socket to another. 14 // Helper class to buffer reads and writes from one socket to another.
22 // Each implements a small buffer connected two one input socket, and 15 // Each implements a small buffer connected two one input socket, and
23 // one output socket. 16 // one output socket.
24 // 17 //
25 // socket_from_ ---> [BufferedCopier] ---> socket_to_ 18 // socket_from_ ---> [BufferedCopier] ---> socket_to_
26 // 19 //
27 // These objects are used in a pair to handle duplex traffic, as in: 20 // These objects are used in a pair to handle duplex traffic, as in:
28 // 21 //
29 // ------> [BufferedCopier_1] ---> 22 // ------> [BufferedCopier_1] --->
30 // / \ 23 // / \
31 // socket_1 * * socket_2 24 // socket_1 * * socket_2
32 // \ / 25 // \ /
33 // <------ [BufferedCopier_2] <---- 26 // <------ [BufferedCopier_2] <----
34 // 27 //
35 // When a BufferedCopier is in the READING state (see below), it only listens 28 // When a BufferedCopier is in the READING state (see below), it only listens
36 // to events on its input socket, and won't detect when its output socket 29 // to events on its input socket, and won't detect when its output socket
37 // disconnects. To work around this, its peer will call its Close() method 30 // disconnects. To work around this, its peer will call its Close() method
38 // when that happens. 31 // when that happens.
39 32
40 class BufferedCopier { 33 class Forwarder::BufferedCopier {
41 public: 34 public:
42 // Possible states: 35 // Possible states:
43 // READING - Empty buffer and Waiting for input. 36 // READING - Empty buffer and Waiting for input.
44 // WRITING - Data in buffer, and waiting for output. 37 // WRITING - Data in buffer, and waiting for output.
45 // CLOSING - Like WRITING, but do not try to read after that. 38 // CLOSING - Like WRITING, but do not try to read after that.
46 // CLOSED - Completely closed. 39 // CLOSED - Completely closed.
47 // 40 //
48 // State transitions are: 41 // State transitions are:
49 // 42 //
50 // T01: READING ---[receive data]---> WRITING 43 // T01: READING ---[receive data]---> WRITING
(...skipping 25 matching lines...) Expand all
76 write_offset_(0), 69 write_offset_(0),
77 peer_(NULL), 70 peer_(NULL),
78 state_(STATE_READING) {} 71 state_(STATE_READING) {}
79 72
80 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair. 73 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
81 void SetPeer(BufferedCopier* peer) { 74 void SetPeer(BufferedCopier* peer) {
82 DCHECK(!peer_); 75 DCHECK(!peer_);
83 peer_ = peer; 76 peer_ = peer;
84 } 77 }
85 78
79 bool is_closed() const { return state_ == STATE_CLOSED; }
80
86 // Gently asks to close a buffer. Called either by the peer or the forwarder. 81 // Gently asks to close a buffer. Called either by the peer or the forwarder.
87 void Close() { 82 void Close() {
88 switch (state_) { 83 switch (state_) {
89 case STATE_READING: 84 case STATE_READING:
90 state_ = STATE_CLOSED; // T03 85 state_ = STATE_CLOSED; // T03
91 break; 86 break;
92 case STATE_WRITING: 87 case STATE_WRITING:
93 state_ = STATE_CLOSING; // T07 88 state_ = STATE_CLOSING; // T07
94 break; 89 break;
95 case STATE_CLOSING: 90 case STATE_CLOSING:
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
139 switch (state_) { 134 switch (state_) {
140 case STATE_READING: 135 case STATE_READING:
141 fd = socket_from_->fd(); 136 fd = socket_from_->fd();
142 if (fd < 0) { 137 if (fd < 0) {
143 state_ = STATE_CLOSED; // T02 138 state_ = STATE_CLOSED; // T02
144 return; 139 return;
145 } 140 }
146 if (!FD_ISSET(fd, &read_fds)) 141 if (!FD_ISSET(fd, &read_fds))
147 return; 142 return;
148 143
149 ret = socket_from_->NonBlockingRead(buffer_, kBufferSize); 144 ret = socket_from_->NonBlockingRead(buffer_, sizeof(buffer_));
150 if (ret <= 0) { 145 if (ret <= 0) {
151 ForceClose(); // T02 146 ForceClose(); // T02
152 return; 147 return;
153 } 148 }
154 bytes_read_ = ret; 149 bytes_read_ = ret;
155 write_offset_ = 0; 150 write_offset_ = 0;
156 state_ = STATE_WRITING; // T01 151 state_ = STATE_WRITING; // T01
157 break; 152 break;
158 153
159 case STATE_WRITING: 154 case STATE_WRITING:
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
198 peer_->Close(); 193 peer_->Close();
199 peer_ = NULL; 194 peer_ = NULL;
200 } 195 }
201 state_ = STATE_CLOSED; 196 state_ = STATE_CLOSED;
202 } 197 }
203 198
204 // Not owned. 199 // Not owned.
205 Socket* socket_from_; 200 Socket* socket_from_;
206 Socket* socket_to_; 201 Socket* socket_to_;
207 202
208 // A big buffer to let the file-over-http bridge work more like real file.
209 static const int kBufferSize = 1024 * 128;
210 int bytes_read_; 203 int bytes_read_;
211 int write_offset_; 204 int write_offset_;
212 BufferedCopier* peer_; 205 BufferedCopier* peer_;
213 State state_; 206 State state_;
214 char buffer_[kBufferSize]; 207 char buffer_[32 * 1024];
qsr 2014/01/27 13:28:29 why getting rid of the constant?
Philippe 2014/01/27 13:41:36 I mainly wanted to get rid of the comment but I ad
215 208
216 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); 209 DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
217 }; 210 };
218 211
219 } // namespace
220
221 Forwarder::Forwarder(scoped_ptr<Socket> socket1, 212 Forwarder::Forwarder(scoped_ptr<Socket> socket1,
222 scoped_ptr<Socket> socket2, 213 scoped_ptr<Socket> socket2)
223 PipeNotifier* deletion_notifier, 214 : socket1_(socket1.Pass()),
224 const ErrorCallback& error_callback)
225 : self_deleter_helper_(this, error_callback),
226 deletion_notifier_(deletion_notifier),
227 socket1_(socket1.Pass()),
228 socket2_(socket2.Pass()), 215 socket2_(socket2.Pass()),
229 thread_("ForwarderThread") { 216 buffer1_(new BufferedCopier(socket1_.get(), socket2_.get())),
230 DCHECK(deletion_notifier_); 217 buffer2_(new BufferedCopier(socket2_.get(), socket1_.get())) {
218 buffer1_->SetPeer(buffer2_.get());
219 buffer2_->SetPeer(buffer1_.get());
231 } 220 }
232 221
233 Forwarder::~Forwarder() {} 222 Forwarder::~Forwarder() {
234 223 DCHECK(thread_checker_.CalledOnValidThread());
235 void Forwarder::Start() {
236 thread_.Start();
237 thread_.message_loop_proxy()->PostTask(
238 FROM_HERE,
239 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
240 } 224 }
241 225
242 void Forwarder::ThreadHandler() { 226 void Forwarder::RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
243 fd_set read_fds; 227 DCHECK(thread_checker_.CalledOnValidThread());
244 fd_set write_fds; 228 buffer1_->PrepareSelect(read_fds, write_fds, max_fd);
229 buffer2_->PrepareSelect(read_fds, write_fds, max_fd);
230 }
245 231
246 // Copy from socket1 to socket2 232 void Forwarder::ProcessEvents(const fd_set& read_fds, const fd_set& write_fds) {
247 BufferedCopier buffer1(socket1_.get(), socket2_.get()); 233 DCHECK(thread_checker_.CalledOnValidThread());
248 // Copy from socket2 to socket1 234 buffer1_->ProcessSelect(read_fds, write_fds);
249 BufferedCopier buffer2(socket2_.get(), socket1_.get()); 235 buffer2_->ProcessSelect(read_fds, write_fds);
236 }
250 237
251 buffer1.SetPeer(&buffer2); 238 bool Forwarder::IsClosed() const {
252 buffer2.SetPeer(&buffer1); 239 DCHECK(thread_checker_.CalledOnValidThread());
240 return buffer1_->is_closed() && buffer2_->is_closed();
241 }
253 242
254 for (;;) { 243 void Forwarder::Shutdown() {
255 FD_ZERO(&read_fds); 244 DCHECK(thread_checker_.CalledOnValidThread());
256 FD_ZERO(&write_fds); 245 buffer1_->Close();
257 246 buffer2_->Close();
258 int max_fd = -1;
259 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
260 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
261
262 if (max_fd < 0) {
263 // Both buffers are closed. Exit immediately.
264 break;
265 }
266
267 const int deletion_fd = deletion_notifier_->receiver_fd();
268 if (deletion_fd >= 0) {
269 FD_SET(deletion_fd, &read_fds);
270 max_fd = std::max(max_fd, deletion_fd);
271 }
272
273 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
274 0) {
275 PLOG(ERROR) << "select";
276 break;
277 }
278
279 buffer1.ProcessSelect(read_fds, write_fds);
280 buffer2.ProcessSelect(read_fds, write_fds);
281
282 if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) {
283 buffer1.Close();
284 buffer2.Close();
285 }
286 }
287
288 // Note that the thread that the destructor will run on could be temporarily
289 // blocked on I/O (e.g. select()) therefore it is safer to close the sockets
290 // now rather than relying on the destructor.
291 socket1_.reset();
292 socket2_.reset();
293
294 self_deleter_helper_.MaybeSelfDeleteSoon();
295 } 247 }
296 248
297 } // namespace forwarder2 249 } // namespace forwarder2
OLDNEW
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/forwarders_manager.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698