Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Side by Side Diff: tools/android/forwarder2/forwarder.cc

Issue 137923004: Revert "Revert 235213 "android: forwader2: Simplify Forwarder implementa..."" (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add DCHECK() in SetPeer() Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/socket.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "tools/android/forwarder2/forwarder.h" 5 #include "tools/android/forwarder2/forwarder.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/bind.h" 8 #include "base/bind.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
11 #include "base/message_loop/message_loop_proxy.h"
11 #include "base/posix/eintr_wrapper.h" 12 #include "base/posix/eintr_wrapper.h"
12 #include "base/single_thread_task_runner.h" 13 #include "base/single_thread_task_runner.h"
14 #include "base/threading/thread.h"
13 #include "tools/android/forwarder2/socket.h" 15 #include "tools/android/forwarder2/socket.h"
14 16
15 namespace forwarder2 { 17 namespace forwarder2 {
16 namespace { 18 namespace {
17 19
18 // Helper class to buffer reads and writes from one socket to another. 20 // Helper class to buffer reads and writes from one socket to another.
21 // Each implements a small buffer connected two one input socket, and
22 // one output socket.
23 //
24 // socket_from_ ---> [BufferedCopier] ---> socket_to_
25 //
26 // These objects are used in a pair to handle duplex traffic, as in:
27 //
28 // ------> [BufferedCopier_1] --->
29 // / \
30 // socket_1 * * socket_2
31 // \ /
32 // <------ [BufferedCopier_2] <----
33 //
34 // When a BufferedCopier is in the READING state (see below), it only listens
35 // to events on its input socket, and won't detect when its output socket
36 // disconnects. To work around this, its peer will call its Close() method
37 // when that happens.
38
19 class BufferedCopier { 39 class BufferedCopier {
20 public: 40 public:
41 // Possible states:
42 // READING - Empty buffer and Waiting for input.
43 // WRITING - Data in buffer, and waiting for output.
44 // CLOSING - Like WRITING, but do not try to read after that.
45 // CLOSED - Completely closed.
46 //
47 // State transitions are:
48 //
49 // T01: READING ---[receive data]---> WRITING
50 // T02: READING ---[error on input socket]---> CLOSED
51 // T03: READING ---[Close() call]---> CLOSED
52 //
53 // T04: WRITING ---[write partial data]---> WRITING
54 // T05: WRITING ---[write all data]----> READING
55 // T06: WRITING ---[error on output socket]----> CLOSED
56 // T07: WRITING ---[Close() call]---> CLOSING
57 //
58 // T08: CLOSING ---[write partial data]---> CLOSING
59 // T09: CLOSING ---[write all data]----> CLOSED
60 // T10: CLOSING ---[Close() call]---> CLOSING
61 // T11: CLOSING ---[error on output socket] ---> CLOSED
62 //
63 enum State {
64 STATE_READING = 0,
65 STATE_WRITING = 1,
66 STATE_CLOSING = 2,
67 STATE_CLOSED = 3,
68 };
69
21 // Does NOT own the pointers. 70 // Does NOT own the pointers.
22 BufferedCopier(Socket* socket_from, 71 BufferedCopier(Socket* socket_from, Socket* socket_to)
23 Socket* socket_to)
24 : socket_from_(socket_from), 72 : socket_from_(socket_from),
25 socket_to_(socket_to), 73 socket_to_(socket_to),
26 bytes_read_(0), 74 bytes_read_(0),
27 write_offset_(0) { 75 write_offset_(0),
28 } 76 peer_(NULL),
29 77 state_(STATE_READING) {}
30 bool AddToReadSet(fd_set* read_fds) { 78
31 if (bytes_read_ == 0) 79 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
32 return socket_from_->AddFdToSet(read_fds); 80 void SetPeer(BufferedCopier* peer) {
33 return false; 81 DCHECK(!peer_);
34 } 82 peer_ = peer;
35 83 }
36 bool AddToWriteSet(fd_set* write_fds) { 84
37 if (write_offset_ < bytes_read_) 85 // Gently asks to close a buffer. Called either by the peer or the forwarder.
38 return socket_to_->AddFdToSet(write_fds); 86 void Close() {
39 return false; 87 switch (state_) {
40 } 88 case STATE_READING:
41 89 state_ = STATE_CLOSED; // T03
42 bool TryRead(const fd_set& read_fds) { 90 break;
43 if (!socket_from_->IsFdInSet(read_fds)) 91 case STATE_WRITING:
44 return false; 92 state_ = STATE_CLOSING; // T07
45 if (bytes_read_ != 0) // Can't read. 93 break;
46 return false; 94 case STATE_CLOSING:
47 int ret = socket_from_->Read(buffer_, kBufferSize); 95 break; // T10
48 if (ret > 0) { 96 case STATE_CLOSED:
49 bytes_read_ = ret; 97 ;
50 return true; 98 }
51 } 99 }
52 return false; 100
53 } 101 // Call this before select(). This updates |read_fds|,
54 102 // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
55 bool TryWrite(const fd_set& write_fds) { 103 void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
56 if (!socket_to_->IsFdInSet(write_fds)) 104 int fd;
57 return false; 105 switch (state_) {
58 if (write_offset_ >= bytes_read_) // Nothing to write. 106 case STATE_READING:
59 return false; 107 DCHECK(bytes_read_ == 0);
60 int ret = socket_to_->Write(buffer_ + write_offset_, 108 DCHECK(write_offset_ == 0);
61 bytes_read_ - write_offset_); 109 fd = socket_from_->fd();
62 if (ret > 0) { 110 if (fd < 0) {
63 write_offset_ += ret; 111 ForceClose(); // T02
64 if (write_offset_ == bytes_read_) { 112 return;
113 }
114 FD_SET(fd, read_fds);
115 break;
116
117 case STATE_WRITING:
118 case STATE_CLOSING:
119 DCHECK(bytes_read_ > 0);
120 DCHECK(write_offset_ < bytes_read_);
121 fd = socket_to_->fd();
122 if (fd < 0) {
123 ForceClose(); // T06
124 return;
125 }
126 FD_SET(fd, write_fds);
127 break;
128
129 case STATE_CLOSED:
130 return;
131 }
132 *max_fd = std::max(*max_fd, fd);
133 }
134
135 // Call this after a select() call to operate over the buffer.
136 void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
137 int fd, ret;
138 switch (state_) {
139 case STATE_READING:
140 fd = socket_from_->fd();
141 if (fd < 0) {
142 state_ = STATE_CLOSED; // T02
143 return;
144 }
145 if (!FD_ISSET(fd, &read_fds))
146 return;
147
148 ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
149 if (ret <= 0) {
150 ForceClose(); // T02
151 return;
152 }
153 bytes_read_ = ret;
154 write_offset_ = 0;
155 state_ = STATE_WRITING; // T01
156 break;
157
158 case STATE_WRITING:
159 case STATE_CLOSING:
160 fd = socket_to_->fd();
161 if (fd < 0) {
162 ForceClose(); // T06 + T11
163 return;
164 }
165 if (!FD_ISSET(fd, &write_fds))
166 return;
167
168 ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
169 bytes_read_ - write_offset_);
170 if (ret <= 0) {
171 ForceClose(); // T06 + T11
172 return;
173 }
174
175 write_offset_ += ret;
176 if (write_offset_ < bytes_read_)
177 return; // T08 + T04
178
65 write_offset_ = 0; 179 write_offset_ = 0;
66 bytes_read_ = 0; 180 bytes_read_ = 0;
67 } 181 if (state_ == STATE_CLOSING) {
68 return true; 182 ForceClose(); // T09
69 } 183 return;
70 return false; 184 }
185 state_ = STATE_READING; // T05
186 break;
187
188 case STATE_CLOSED:
189 ;
190 }
71 } 191 }
72 192
73 private: 193 private:
194 // Internal method used to close the buffer and notify the peer, if any.
195 void ForceClose() {
196 if (peer_) {
197 peer_->Close();
198 peer_ = NULL;
199 }
200 state_ = STATE_CLOSED;
201 }
202
74 // Not owned. 203 // Not owned.
75 Socket* socket_from_; 204 Socket* socket_from_;
76 Socket* socket_to_; 205 Socket* socket_to_;
77 206
78 // A big buffer to let our file-over-http bridge work more like real file. 207 // A big buffer to let the file-over-http bridge work more like real file.
79 static const int kBufferSize = 1024 * 128; 208 static const int kBufferSize = 1024 * 128;
80 int bytes_read_; 209 int bytes_read_;
81 int write_offset_; 210 int write_offset_;
211 BufferedCopier* peer_;
212 State state_;
82 char buffer_[kBufferSize]; 213 char buffer_[kBufferSize];
83 214
84 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); 215 DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
85 }; 216 };
86 217
87 // Internal class that wraps a helper thread to forward traffic between 218 // Internal class that wraps a helper thread to forward traffic between
88 // |socket1| and |socket2|. After creating a new instance, call its Start() 219 // |socket1| and |socket2|. After creating a new instance, call its Start()
89 // method to launch operations. Thread stops automatically if one of the socket 220 // method to launch operations. Thread stops automatically if one of the socket
90 // disconnects, but ensures that all buffered writes to the other, still alive, 221 // disconnects, but ensures that all buffered writes to the other, still alive,
91 // socket, are written first. When this happens, the instance will delete itself 222 // socket, are written first. When this happens, the instance will delete itself
92 // automatically. 223 // automatically.
93 // Note that the instance will always be destroyed on the same thread that 224 // Note that the instance will always be destroyed on the same thread that
94 // created it. 225 // created it.
95 class Forwarder { 226 class Forwarder {
96 public: 227 public:
228 // Create a new Forwarder instance. |socket1| and |socket2| are the two socket
229 // endpoints.
97 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) 230 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
98 : socket1_(socket1.Pass()), 231 : socket1_(socket1.Pass()),
99 socket2_(socket2.Pass()), 232 socket2_(socket2.Pass()),
100 destructor_runner_(base::MessageLoopProxy::current()), 233 destructor_runner_(base::MessageLoopProxy::current()),
101 thread_("ForwarderThread") { 234 thread_("ForwarderThread") {}
102 }
103 235
104 void Start() { 236 void Start() {
105 thread_.Start(); 237 thread_.Start();
106 thread_.message_loop_proxy()->PostTask( 238 thread_.message_loop_proxy()->PostTask(
107 FROM_HERE, 239 FROM_HERE,
108 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); 240 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
109 } 241 }
110 242
111 private: 243 private:
112 void ThreadHandler() { 244 void ThreadHandler() {
113 const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
114 fd_set read_fds; 245 fd_set read_fds;
115 fd_set write_fds; 246 fd_set write_fds;
116 247
117 // Copy from socket1 to socket2 248 // Copy from socket1 to socket2
118 BufferedCopier buffer1(socket1_.get(), socket2_.get()); 249 BufferedCopier buffer1(socket1_.get(), socket2_.get());
250
119 // Copy from socket2 to socket1 251 // Copy from socket2 to socket1
120 BufferedCopier buffer2(socket2_.get(), socket1_.get()); 252 BufferedCopier buffer2(socket2_.get(), socket1_.get());
121 253
122 bool run = true; 254 buffer1.SetPeer(&buffer2);
123 while (run) { 255 buffer2.SetPeer(&buffer1);
256
257 for (;;) {
124 FD_ZERO(&read_fds); 258 FD_ZERO(&read_fds);
125 FD_ZERO(&write_fds); 259 FD_ZERO(&write_fds);
126 260
127 buffer1.AddToReadSet(&read_fds); 261 int max_fd = -1;
128 buffer2.AddToReadSet(&read_fds); 262 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
129 buffer1.AddToWriteSet(&write_fds); 263 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
130 buffer2.AddToWriteSet(&write_fds);
131 264
132 if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { 265 if (max_fd < 0) {
266 // Both buffers are closed. Exit immediately.
267 break;
268 }
269
270 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
271 0) {
133 PLOG(ERROR) << "select"; 272 PLOG(ERROR) << "select";
134 break; 273 break;
135 } 274 }
136 // When a socket in the read set closes the connection, select() returns
137 // with that socket descriptor set as "ready to read". When we call
138 // TryRead() below, it will return false, but the while loop will continue
139 // to run until all the write operations are finished, to make sure the
140 // buffers are completely flushed out.
141 275
142 // Keep running while we have some operation to do. 276 buffer1.ProcessSelect(read_fds, write_fds);
143 run = buffer1.TryRead(read_fds); 277 buffer2.ProcessSelect(read_fds, write_fds);
144 run = run || buffer2.TryRead(read_fds);
145 run = run || buffer1.TryWrite(write_fds);
146 run = run || buffer2.TryWrite(write_fds);
147 } 278 }
148 279
149 // Note that the thread that |destruction_runner_| runs tasks on could be 280 // Note that the thread that |destruction_runner_| runs tasks on could be
150 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close 281 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close
151 // the sockets now rather than relying on the destructor. 282 // the sockets now rather than relying on the destructor.
152 socket1_.reset(); 283 socket1_.reset();
153 socket2_.reset(); 284 socket2_.reset();
154 285
155 // Note that base::Thread must be destroyed on the thread it was created on. 286 // Ensure the object is destroyed on the thread that created it.
156 destructor_runner_->DeleteSoon(FROM_HERE, this); 287 destructor_runner_->DeleteSoon(FROM_HERE, this);
157 } 288 }
158 289
159 scoped_ptr<Socket> socket1_; 290 scoped_ptr<Socket> socket1_;
160 scoped_ptr<Socket> socket2_; 291 scoped_ptr<Socket> socket2_;
161 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; 292 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_;
162 base::Thread thread_; 293 base::Thread thread_;
163 }; 294 };
164 295
165 } // namespace 296 } // namespace
166 297
167 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { 298 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) {
168 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); 299 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start();
169 } 300 }
170 301
171 } // namespace forwarder2 302 } // namespace forwarder2
OLDNEW
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/socket.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698