Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: tools/android/forwarder2/forwarder.cc

Issue 137923004: Revert "Revert 235213 "android: forwader2: Simplify Forwarder implementa..."" (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix original CL by removing usage of PipeNotifier Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/socket.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "tools/android/forwarder2/forwarder.h" 5 #include "tools/android/forwarder2/forwarder.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/bind.h" 8 #include "base/bind.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
11 #include "base/message_loop/message_loop_proxy.h"
11 #include "base/posix/eintr_wrapper.h" 12 #include "base/posix/eintr_wrapper.h"
12 #include "base/single_thread_task_runner.h" 13 #include "base/single_thread_task_runner.h"
14 #include "base/threading/thread.h"
13 #include "tools/android/forwarder2/socket.h" 15 #include "tools/android/forwarder2/socket.h"
14 16
15 namespace forwarder2 { 17 namespace forwarder2 {
16 namespace { 18 namespace {
17 19
18 // Helper class to buffer reads and writes from one socket to another. 20 // Helper class to buffer reads and writes from one socket to another.
21 // Each implements a small buffer connected two one input socket, and
22 // one output socket.
23 //
24 // socket_from_ ---> [BufferedCopier] ---> socket_to_
25 //
26 // These objects are used in a pair to handle duplex traffic, as in:
27 //
28 // ------> [BufferedCopier_1] --->
29 // / \
30 // socket_1 * * socket_2
31 // \ /
32 // <------ [BufferedCopier_2] <----
33 //
34 // When a BufferedCopier is in the READING state (see below), it only listens
35 // to events on its input socket, and won't detect when its output socket
36 // disconnects. To work around this, its peer will call its Close() method
37 // when that happens.
38
19 class BufferedCopier { 39 class BufferedCopier {
20 public: 40 public:
41 // Possible states:
42 // READING - Empty buffer and Waiting for input.
43 // WRITING - Data in buffer, and waiting for output.
44 // CLOSING - Like WRITING, but do not try to read after that.
45 // CLOSED - Completely closed.
46 //
47 // State transitions are:
bulach 2014/01/14 14:15:01 nice documentation, thanks! if I understood this
48 //
49 // T01: READING ---[receive data]---> WRITING
50 // T02: READING ---[error on input socket]---> CLOSED
51 // T03: READING ---[Close() call]---> CLOSED
52 //
53 // T04: WRITING ---[write partial data]---> WRITING
54 // T05: WRITING ---[write all data]----> READING
55 // T06: WRITING ---[error on output socket]----> CLOSED
56 // T07: WRITING ---[Close() call]---> CLOSING
57 //
58 // T08: CLOSING ---[write partial data]---> CLOSING
59 // T09: CLOSING ---[write all data]----> CLOSED
60 // T10: CLOSING ---[Close() call]---> CLOSING
61 // T11: CLOSING ---[error on output socket] ---> CLOSED
62 //
63 enum State {
64 STATE_READING = 0,
65 STATE_WRITING = 1,
66 STATE_CLOSING = 2,
67 STATE_CLOSED = 3,
68 };
69
21 // Does NOT own the pointers. 70 // Does NOT own the pointers.
22 BufferedCopier(Socket* socket_from, 71 BufferedCopier(Socket* socket_from, Socket* socket_to)
23 Socket* socket_to)
24 : socket_from_(socket_from), 72 : socket_from_(socket_from),
25 socket_to_(socket_to), 73 socket_to_(socket_to),
26 bytes_read_(0), 74 bytes_read_(0),
27 write_offset_(0) { 75 write_offset_(0),
28 } 76 peer_(NULL),
29 77 state_(STATE_READING) {}
30 bool AddToReadSet(fd_set* read_fds) { 78
31 if (bytes_read_ == 0) 79 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
32 return socket_from_->AddFdToSet(read_fds); 80 void SetPeer(BufferedCopier* peer) { peer_ = peer; }
bulach 2014/01/14 14:15:01 nit: CHECK(!peer);
Philippe 2014/01/14 14:50:57 Yeah, good idea. I made it a DCHECK though if you
33 return false; 81
34 } 82 // Gently asks to close a buffer. Called either by the peer or the forwarder.
35 83 void Close() {
36 bool AddToWriteSet(fd_set* write_fds) { 84 switch (state_) {
37 if (write_offset_ < bytes_read_) 85 case STATE_READING:
38 return socket_to_->AddFdToSet(write_fds); 86 state_ = STATE_CLOSED; // T03
39 return false; 87 break;
40 } 88 case STATE_WRITING:
41 89 state_ = STATE_CLOSING; // T07
42 bool TryRead(const fd_set& read_fds) { 90 break;
43 if (!socket_from_->IsFdInSet(read_fds)) 91 case STATE_CLOSING:
44 return false; 92 break; // T10
45 if (bytes_read_ != 0) // Can't read. 93 case STATE_CLOSED:
46 return false; 94 ;
47 int ret = socket_from_->Read(buffer_, kBufferSize); 95 }
48 if (ret > 0) { 96 }
49 bytes_read_ = ret; 97
50 return true; 98 // Call this before select(). This updates |read_fds|,
51 } 99 // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
52 return false; 100 void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
53 } 101 int fd;
54 102 switch (state_) {
55 bool TryWrite(const fd_set& write_fds) { 103 case STATE_READING:
56 if (!socket_to_->IsFdInSet(write_fds)) 104 DCHECK(bytes_read_ == 0);
57 return false; 105 DCHECK(write_offset_ == 0);
58 if (write_offset_ >= bytes_read_) // Nothing to write. 106 fd = socket_from_->fd();
59 return false; 107 if (fd < 0) {
60 int ret = socket_to_->Write(buffer_ + write_offset_, 108 ForceClose(); // T02
61 bytes_read_ - write_offset_); 109 return;
62 if (ret > 0) { 110 }
63 write_offset_ += ret; 111 FD_SET(fd, read_fds);
64 if (write_offset_ == bytes_read_) { 112 break;
113
114 case STATE_WRITING:
115 case STATE_CLOSING:
116 DCHECK(bytes_read_ > 0);
117 DCHECK(write_offset_ < bytes_read_);
118 fd = socket_to_->fd();
119 if (fd < 0) {
120 ForceClose(); // T06
121 return;
122 }
123 FD_SET(fd, write_fds);
124 break;
125
126 case STATE_CLOSED:
127 return;
128 }
129 *max_fd = std::max(*max_fd, fd);
130 }
131
132 // Call this after a select() call to operate over the buffer.
133 void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
134 int fd, ret;
135 switch (state_) {
136 case STATE_READING:
137 fd = socket_from_->fd();
138 if (fd < 0) {
139 state_ = STATE_CLOSED; // T02
140 return;
141 }
142 if (!FD_ISSET(fd, &read_fds))
143 return;
144
145 ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
146 if (ret <= 0) {
147 ForceClose(); // T02
148 return;
149 }
150 bytes_read_ = ret;
151 write_offset_ = 0;
152 state_ = STATE_WRITING; // T01
153 break;
154
155 case STATE_WRITING:
156 case STATE_CLOSING:
157 fd = socket_to_->fd();
158 if (fd < 0) {
159 ForceClose(); // T06 + T11
160 return;
161 }
162 if (!FD_ISSET(fd, &write_fds))
163 return;
164
165 ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
166 bytes_read_ - write_offset_);
167 if (ret <= 0) {
168 ForceClose(); // T06 + T11
169 return;
170 }
171
172 write_offset_ += ret;
173 if (write_offset_ < bytes_read_)
174 return; // T08 + T04
175
65 write_offset_ = 0; 176 write_offset_ = 0;
66 bytes_read_ = 0; 177 bytes_read_ = 0;
67 } 178 if (state_ == STATE_CLOSING) {
68 return true; 179 ForceClose(); // T09
69 } 180 return;
70 return false; 181 }
182 state_ = STATE_READING; // T05
183 break;
184
185 case STATE_CLOSED:
186 ;
187 }
71 } 188 }
72 189
73 private: 190 private:
191 // Internal method used to close the buffer and notify the peer, if any.
192 void ForceClose() {
193 if (peer_) {
194 peer_->Close();
195 peer_ = NULL;
196 }
197 state_ = STATE_CLOSED;
198 }
199
74 // Not owned. 200 // Not owned.
75 Socket* socket_from_; 201 Socket* socket_from_;
76 Socket* socket_to_; 202 Socket* socket_to_;
77 203
78 // A big buffer to let our file-over-http bridge work more like real file. 204 // A big buffer to let the file-over-http bridge work more like real file.
79 static const int kBufferSize = 1024 * 128; 205 static const int kBufferSize = 1024 * 128;
80 int bytes_read_; 206 int bytes_read_;
81 int write_offset_; 207 int write_offset_;
208 BufferedCopier* peer_;
209 State state_;
82 char buffer_[kBufferSize]; 210 char buffer_[kBufferSize];
83 211
84 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); 212 DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
85 }; 213 };
86 214
87 // Internal class that wraps a helper thread to forward traffic between 215 // Internal class that wraps a helper thread to forward traffic between
88 // |socket1| and |socket2|. After creating a new instance, call its Start() 216 // |socket1| and |socket2|. After creating a new instance, call its Start()
89 // method to launch operations. Thread stops automatically if one of the socket 217 // method to launch operations. Thread stops automatically if one of the socket
90 // disconnects, but ensures that all buffered writes to the other, still alive, 218 // disconnects, but ensures that all buffered writes to the other, still alive,
91 // socket, are written first. When this happens, the instance will delete itself 219 // socket, are written first. When this happens, the instance will delete itself
92 // automatically. 220 // automatically.
93 // Note that the instance will always be destroyed on the same thread that 221 // Note that the instance will always be destroyed on the same thread that
94 // created it. 222 // created it.
95 class Forwarder { 223 class Forwarder {
96 public: 224 public:
225 // Create a new Forwarder instance. |socket1| and |socket2| are the two socket
226 // endpoints.
97 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) 227 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
98 : socket1_(socket1.Pass()), 228 : socket1_(socket1.Pass()),
99 socket2_(socket2.Pass()), 229 socket2_(socket2.Pass()),
100 destructor_runner_(base::MessageLoopProxy::current()), 230 destructor_runner_(base::MessageLoopProxy::current()),
101 thread_("ForwarderThread") { 231 thread_("ForwarderThread") {}
102 }
103 232
104 void Start() { 233 void Start() {
105 thread_.Start(); 234 thread_.Start();
106 thread_.message_loop_proxy()->PostTask( 235 thread_.message_loop_proxy()->PostTask(
107 FROM_HERE, 236 FROM_HERE,
108 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); 237 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
109 } 238 }
110 239
111 private: 240 private:
112 void ThreadHandler() { 241 void ThreadHandler() {
113 const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
114 fd_set read_fds; 242 fd_set read_fds;
115 fd_set write_fds; 243 fd_set write_fds;
116 244
117 // Copy from socket1 to socket2 245 // Copy from socket1 to socket2
118 BufferedCopier buffer1(socket1_.get(), socket2_.get()); 246 BufferedCopier buffer1(socket1_.get(), socket2_.get());
247
119 // Copy from socket2 to socket1 248 // Copy from socket2 to socket1
120 BufferedCopier buffer2(socket2_.get(), socket1_.get()); 249 BufferedCopier buffer2(socket2_.get(), socket1_.get());
121 250
122 bool run = true; 251 buffer1.SetPeer(&buffer2);
123 while (run) { 252 buffer2.SetPeer(&buffer1);
253
254 for (;;) {
124 FD_ZERO(&read_fds); 255 FD_ZERO(&read_fds);
125 FD_ZERO(&write_fds); 256 FD_ZERO(&write_fds);
126 257
127 buffer1.AddToReadSet(&read_fds); 258 int max_fd = -1;
128 buffer2.AddToReadSet(&read_fds); 259 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
129 buffer1.AddToWriteSet(&write_fds); 260 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
130 buffer2.AddToWriteSet(&write_fds);
131 261
132 if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { 262 if (max_fd < 0) {
263 // Both buffers are closed. Exit immediately.
264 break;
265 }
266
267 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
268 0) {
133 PLOG(ERROR) << "select"; 269 PLOG(ERROR) << "select";
134 break; 270 break;
135 } 271 }
136 // When a socket in the read set closes the connection, select() returns
137 // with that socket descriptor set as "ready to read". When we call
138 // TryRead() below, it will return false, but the while loop will continue
139 // to run until all the write operations are finished, to make sure the
140 // buffers are completely flushed out.
141 272
142 // Keep running while we have some operation to do. 273 buffer1.ProcessSelect(read_fds, write_fds);
143 run = buffer1.TryRead(read_fds); 274 buffer2.ProcessSelect(read_fds, write_fds);
144 run = run || buffer2.TryRead(read_fds);
145 run = run || buffer1.TryWrite(write_fds);
146 run = run || buffer2.TryWrite(write_fds);
147 } 275 }
148 276
149 // Note that the thread that |destruction_runner_| runs tasks on could be 277 // Note that the thread that |destruction_runner_| runs tasks on could be
150 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close 278 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close
151 // the sockets now rather than relying on the destructor. 279 // the sockets now rather than relying on the destructor.
152 socket1_.reset(); 280 socket1_.reset();
153 socket2_.reset(); 281 socket2_.reset();
154 282
155 // Note that base::Thread must be destroyed on the thread it was created on. 283 // Ensure the object is destroyed on the thread that created it.
156 destructor_runner_->DeleteSoon(FROM_HERE, this); 284 destructor_runner_->DeleteSoon(FROM_HERE, this);
157 } 285 }
158 286
159 scoped_ptr<Socket> socket1_; 287 scoped_ptr<Socket> socket1_;
160 scoped_ptr<Socket> socket2_; 288 scoped_ptr<Socket> socket2_;
161 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; 289 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_;
162 base::Thread thread_; 290 base::Thread thread_;
163 }; 291 };
164 292
165 } // namespace 293 } // namespace
166 294
167 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { 295 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) {
168 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); 296 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start();
169 } 297 }
170 298
171 } // namespace forwarder2 299 } // namespace forwarder2
OLDNEW
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/socket.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698