OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "tools/android/forwarder2/forwarder.h" | 5 #include "tools/android/forwarder2/forwarder.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/bind.h" | 8 #include "base/bind.h" |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
11 #include "base/message_loop/message_loop_proxy.h" | |
11 #include "base/posix/eintr_wrapper.h" | 12 #include "base/posix/eintr_wrapper.h" |
12 #include "base/single_thread_task_runner.h" | 13 #include "base/single_thread_task_runner.h" |
14 #include "base/threading/thread.h" | |
13 #include "tools/android/forwarder2/socket.h" | 15 #include "tools/android/forwarder2/socket.h" |
14 | 16 |
15 namespace forwarder2 { | 17 namespace forwarder2 { |
16 namespace { | 18 namespace { |
17 | 19 |
18 // Helper class to buffer reads and writes from one socket to another. | 20 // Helper class to buffer reads and writes from one socket to another. |
21 // Each implements a small buffer connected two one input socket, and | |
22 // one output socket. | |
23 // | |
24 // socket_from_ ---> [BufferedCopier] ---> socket_to_ | |
25 // | |
26 // These objects are used in a pair to handle duplex traffic, as in: | |
27 // | |
28 // ------> [BufferedCopier_1] ---> | |
29 // / \ | |
30 // socket_1 * * socket_2 | |
31 // \ / | |
32 // <------ [BufferedCopier_2] <---- | |
33 // | |
34 // When a BufferedCopier is in the READING state (see below), it only listens | |
35 // to events on its input socket, and won't detect when its output socket | |
36 // disconnects. To work around this, its peer will call its Close() method | |
37 // when that happens. | |
38 | |
19 class BufferedCopier { | 39 class BufferedCopier { |
20 public: | 40 public: |
41 // Possible states: | |
42 // READING - Empty buffer and Waiting for input. | |
43 // WRITING - Data in buffer, and waiting for output. | |
44 // CLOSING - Like WRITING, but do not try to read after that. | |
45 // CLOSED - Completely closed. | |
46 // | |
47 // State transitions are: | |
bulach
2014/01/14 14:15:01
nice documentation, thanks!
if I understood this
| |
48 // | |
49 // T01: READING ---[receive data]---> WRITING | |
50 // T02: READING ---[error on input socket]---> CLOSED | |
51 // T03: READING ---[Close() call]---> CLOSED | |
52 // | |
53 // T04: WRITING ---[write partial data]---> WRITING | |
54 // T05: WRITING ---[write all data]----> READING | |
55 // T06: WRITING ---[error on output socket]----> CLOSED | |
56 // T07: WRITING ---[Close() call]---> CLOSING | |
57 // | |
58 // T08: CLOSING ---[write partial data]---> CLOSING | |
59 // T09: CLOSING ---[write all data]----> CLOSED | |
60 // T10: CLOSING ---[Close() call]---> CLOSING | |
61 // T11: CLOSING ---[error on output socket] ---> CLOSED | |
62 // | |
63 enum State { | |
64 STATE_READING = 0, | |
65 STATE_WRITING = 1, | |
66 STATE_CLOSING = 2, | |
67 STATE_CLOSED = 3, | |
68 }; | |
69 | |
21 // Does NOT own the pointers. | 70 // Does NOT own the pointers. |
22 BufferedCopier(Socket* socket_from, | 71 BufferedCopier(Socket* socket_from, Socket* socket_to) |
23 Socket* socket_to) | |
24 : socket_from_(socket_from), | 72 : socket_from_(socket_from), |
25 socket_to_(socket_to), | 73 socket_to_(socket_to), |
26 bytes_read_(0), | 74 bytes_read_(0), |
27 write_offset_(0) { | 75 write_offset_(0), |
28 } | 76 peer_(NULL), |
29 | 77 state_(STATE_READING) {} |
30 bool AddToReadSet(fd_set* read_fds) { | 78 |
31 if (bytes_read_ == 0) | 79 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair. |
32 return socket_from_->AddFdToSet(read_fds); | 80 void SetPeer(BufferedCopier* peer) { peer_ = peer; } |
bulach
2014/01/14 14:15:01
nit: CHECK(!peer);
Philippe
2014/01/14 14:50:57
Yeah, good idea. I made it a DCHECK though if you
| |
33 return false; | 81 |
34 } | 82 // Gently asks to close a buffer. Called either by the peer or the forwarder. |
35 | 83 void Close() { |
36 bool AddToWriteSet(fd_set* write_fds) { | 84 switch (state_) { |
37 if (write_offset_ < bytes_read_) | 85 case STATE_READING: |
38 return socket_to_->AddFdToSet(write_fds); | 86 state_ = STATE_CLOSED; // T03 |
39 return false; | 87 break; |
40 } | 88 case STATE_WRITING: |
41 | 89 state_ = STATE_CLOSING; // T07 |
42 bool TryRead(const fd_set& read_fds) { | 90 break; |
43 if (!socket_from_->IsFdInSet(read_fds)) | 91 case STATE_CLOSING: |
44 return false; | 92 break; // T10 |
45 if (bytes_read_ != 0) // Can't read. | 93 case STATE_CLOSED: |
46 return false; | 94 ; |
47 int ret = socket_from_->Read(buffer_, kBufferSize); | 95 } |
48 if (ret > 0) { | 96 } |
49 bytes_read_ = ret; | 97 |
50 return true; | 98 // Call this before select(). This updates |read_fds|, |
51 } | 99 // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed. |
52 return false; | 100 void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) { |
53 } | 101 int fd; |
54 | 102 switch (state_) { |
55 bool TryWrite(const fd_set& write_fds) { | 103 case STATE_READING: |
56 if (!socket_to_->IsFdInSet(write_fds)) | 104 DCHECK(bytes_read_ == 0); |
57 return false; | 105 DCHECK(write_offset_ == 0); |
58 if (write_offset_ >= bytes_read_) // Nothing to write. | 106 fd = socket_from_->fd(); |
59 return false; | 107 if (fd < 0) { |
60 int ret = socket_to_->Write(buffer_ + write_offset_, | 108 ForceClose(); // T02 |
61 bytes_read_ - write_offset_); | 109 return; |
62 if (ret > 0) { | 110 } |
63 write_offset_ += ret; | 111 FD_SET(fd, read_fds); |
64 if (write_offset_ == bytes_read_) { | 112 break; |
113 | |
114 case STATE_WRITING: | |
115 case STATE_CLOSING: | |
116 DCHECK(bytes_read_ > 0); | |
117 DCHECK(write_offset_ < bytes_read_); | |
118 fd = socket_to_->fd(); | |
119 if (fd < 0) { | |
120 ForceClose(); // T06 | |
121 return; | |
122 } | |
123 FD_SET(fd, write_fds); | |
124 break; | |
125 | |
126 case STATE_CLOSED: | |
127 return; | |
128 } | |
129 *max_fd = std::max(*max_fd, fd); | |
130 } | |
131 | |
132 // Call this after a select() call to operate over the buffer. | |
133 void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) { | |
134 int fd, ret; | |
135 switch (state_) { | |
136 case STATE_READING: | |
137 fd = socket_from_->fd(); | |
138 if (fd < 0) { | |
139 state_ = STATE_CLOSED; // T02 | |
140 return; | |
141 } | |
142 if (!FD_ISSET(fd, &read_fds)) | |
143 return; | |
144 | |
145 ret = socket_from_->NonBlockingRead(buffer_, kBufferSize); | |
146 if (ret <= 0) { | |
147 ForceClose(); // T02 | |
148 return; | |
149 } | |
150 bytes_read_ = ret; | |
151 write_offset_ = 0; | |
152 state_ = STATE_WRITING; // T01 | |
153 break; | |
154 | |
155 case STATE_WRITING: | |
156 case STATE_CLOSING: | |
157 fd = socket_to_->fd(); | |
158 if (fd < 0) { | |
159 ForceClose(); // T06 + T11 | |
160 return; | |
161 } | |
162 if (!FD_ISSET(fd, &write_fds)) | |
163 return; | |
164 | |
165 ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_, | |
166 bytes_read_ - write_offset_); | |
167 if (ret <= 0) { | |
168 ForceClose(); // T06 + T11 | |
169 return; | |
170 } | |
171 | |
172 write_offset_ += ret; | |
173 if (write_offset_ < bytes_read_) | |
174 return; // T08 + T04 | |
175 | |
65 write_offset_ = 0; | 176 write_offset_ = 0; |
66 bytes_read_ = 0; | 177 bytes_read_ = 0; |
67 } | 178 if (state_ == STATE_CLOSING) { |
68 return true; | 179 ForceClose(); // T09 |
69 } | 180 return; |
70 return false; | 181 } |
182 state_ = STATE_READING; // T05 | |
183 break; | |
184 | |
185 case STATE_CLOSED: | |
186 ; | |
187 } | |
71 } | 188 } |
72 | 189 |
73 private: | 190 private: |
191 // Internal method used to close the buffer and notify the peer, if any. | |
192 void ForceClose() { | |
193 if (peer_) { | |
194 peer_->Close(); | |
195 peer_ = NULL; | |
196 } | |
197 state_ = STATE_CLOSED; | |
198 } | |
199 | |
74 // Not owned. | 200 // Not owned. |
75 Socket* socket_from_; | 201 Socket* socket_from_; |
76 Socket* socket_to_; | 202 Socket* socket_to_; |
77 | 203 |
78 // A big buffer to let our file-over-http bridge work more like real file. | 204 // A big buffer to let the file-over-http bridge work more like real file. |
79 static const int kBufferSize = 1024 * 128; | 205 static const int kBufferSize = 1024 * 128; |
80 int bytes_read_; | 206 int bytes_read_; |
81 int write_offset_; | 207 int write_offset_; |
208 BufferedCopier* peer_; | |
209 State state_; | |
82 char buffer_[kBufferSize]; | 210 char buffer_[kBufferSize]; |
83 | 211 |
84 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); | 212 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); |
85 }; | 213 }; |
86 | 214 |
87 // Internal class that wraps a helper thread to forward traffic between | 215 // Internal class that wraps a helper thread to forward traffic between |
88 // |socket1| and |socket2|. After creating a new instance, call its Start() | 216 // |socket1| and |socket2|. After creating a new instance, call its Start() |
89 // method to launch operations. Thread stops automatically if one of the socket | 217 // method to launch operations. Thread stops automatically if one of the socket |
90 // disconnects, but ensures that all buffered writes to the other, still alive, | 218 // disconnects, but ensures that all buffered writes to the other, still alive, |
91 // socket, are written first. When this happens, the instance will delete itself | 219 // socket, are written first. When this happens, the instance will delete itself |
92 // automatically. | 220 // automatically. |
93 // Note that the instance will always be destroyed on the same thread that | 221 // Note that the instance will always be destroyed on the same thread that |
94 // created it. | 222 // created it. |
95 class Forwarder { | 223 class Forwarder { |
96 public: | 224 public: |
225 // Create a new Forwarder instance. |socket1| and |socket2| are the two socket | |
226 // endpoints. | |
97 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) | 227 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) |
98 : socket1_(socket1.Pass()), | 228 : socket1_(socket1.Pass()), |
99 socket2_(socket2.Pass()), | 229 socket2_(socket2.Pass()), |
100 destructor_runner_(base::MessageLoopProxy::current()), | 230 destructor_runner_(base::MessageLoopProxy::current()), |
101 thread_("ForwarderThread") { | 231 thread_("ForwarderThread") {} |
102 } | |
103 | 232 |
104 void Start() { | 233 void Start() { |
105 thread_.Start(); | 234 thread_.Start(); |
106 thread_.message_loop_proxy()->PostTask( | 235 thread_.message_loop_proxy()->PostTask( |
107 FROM_HERE, | 236 FROM_HERE, |
108 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); | 237 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); |
109 } | 238 } |
110 | 239 |
111 private: | 240 private: |
112 void ThreadHandler() { | 241 void ThreadHandler() { |
113 const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; | |
114 fd_set read_fds; | 242 fd_set read_fds; |
115 fd_set write_fds; | 243 fd_set write_fds; |
116 | 244 |
117 // Copy from socket1 to socket2 | 245 // Copy from socket1 to socket2 |
118 BufferedCopier buffer1(socket1_.get(), socket2_.get()); | 246 BufferedCopier buffer1(socket1_.get(), socket2_.get()); |
247 | |
119 // Copy from socket2 to socket1 | 248 // Copy from socket2 to socket1 |
120 BufferedCopier buffer2(socket2_.get(), socket1_.get()); | 249 BufferedCopier buffer2(socket2_.get(), socket1_.get()); |
121 | 250 |
122 bool run = true; | 251 buffer1.SetPeer(&buffer2); |
123 while (run) { | 252 buffer2.SetPeer(&buffer1); |
253 | |
254 for (;;) { | |
124 FD_ZERO(&read_fds); | 255 FD_ZERO(&read_fds); |
125 FD_ZERO(&write_fds); | 256 FD_ZERO(&write_fds); |
126 | 257 |
127 buffer1.AddToReadSet(&read_fds); | 258 int max_fd = -1; |
128 buffer2.AddToReadSet(&read_fds); | 259 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); |
129 buffer1.AddToWriteSet(&write_fds); | 260 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); |
130 buffer2.AddToWriteSet(&write_fds); | |
131 | 261 |
132 if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { | 262 if (max_fd < 0) { |
263 // Both buffers are closed. Exit immediately. | |
264 break; | |
265 } | |
266 | |
267 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= | |
268 0) { | |
133 PLOG(ERROR) << "select"; | 269 PLOG(ERROR) << "select"; |
134 break; | 270 break; |
135 } | 271 } |
136 // When a socket in the read set closes the connection, select() returns | |
137 // with that socket descriptor set as "ready to read". When we call | |
138 // TryRead() below, it will return false, but the while loop will continue | |
139 // to run until all the write operations are finished, to make sure the | |
140 // buffers are completely flushed out. | |
141 | 272 |
142 // Keep running while we have some operation to do. | 273 buffer1.ProcessSelect(read_fds, write_fds); |
143 run = buffer1.TryRead(read_fds); | 274 buffer2.ProcessSelect(read_fds, write_fds); |
144 run = run || buffer2.TryRead(read_fds); | |
145 run = run || buffer1.TryWrite(write_fds); | |
146 run = run || buffer2.TryWrite(write_fds); | |
147 } | 275 } |
148 | 276 |
149 // Note that the thread that |destruction_runner_| runs tasks on could be | 277 // Note that the thread that |destruction_runner_| runs tasks on could be |
150 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close | 278 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close |
151 // the sockets now rather than relying on the destructor. | 279 // the sockets now rather than relying on the destructor. |
152 socket1_.reset(); | 280 socket1_.reset(); |
153 socket2_.reset(); | 281 socket2_.reset(); |
154 | 282 |
155 // Note that base::Thread must be destroyed on the thread it was created on. | 283 // Ensure the object is destroyed on the thread that created it. |
156 destructor_runner_->DeleteSoon(FROM_HERE, this); | 284 destructor_runner_->DeleteSoon(FROM_HERE, this); |
157 } | 285 } |
158 | 286 |
159 scoped_ptr<Socket> socket1_; | 287 scoped_ptr<Socket> socket1_; |
160 scoped_ptr<Socket> socket2_; | 288 scoped_ptr<Socket> socket2_; |
161 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; | 289 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; |
162 base::Thread thread_; | 290 base::Thread thread_; |
163 }; | 291 }; |
164 | 292 |
165 } // namespace | 293 } // namespace |
166 | 294 |
167 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { | 295 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { |
168 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); | 296 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); |
169 } | 297 } |
170 | 298 |
171 } // namespace forwarder2 | 299 } // namespace forwarder2 |
OLD | NEW |