Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "tools/android/forwarder2/forwarder.h" | 5 #include "tools/android/forwarder2/forwarder.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/bind.h" | 8 #include "base/bind.h" |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
| 11 #include "base/message_loop/message_loop_proxy.h" | |
| 11 #include "base/posix/eintr_wrapper.h" | 12 #include "base/posix/eintr_wrapper.h" |
| 12 #include "base/single_thread_task_runner.h" | 13 #include "base/single_thread_task_runner.h" |
| 14 #include "base/threading/thread.h" | |
| 13 #include "tools/android/forwarder2/socket.h" | 15 #include "tools/android/forwarder2/socket.h" |
| 14 | 16 |
| 15 namespace forwarder2 { | 17 namespace forwarder2 { |
| 16 namespace { | 18 namespace { |
| 17 | 19 |
| 18 // Helper class to buffer reads and writes from one socket to another. | 20 // Helper class to buffer reads and writes from one socket to another. |
| 21 // Each implements a small buffer connected two one input socket, and | |
| 22 // one output socket. | |
| 23 // | |
| 24 // socket_from_ ---> [BufferedCopier] ---> socket_to_ | |
| 25 // | |
| 26 // These objects are used in a pair to handle duplex traffic, as in: | |
| 27 // | |
| 28 // ------> [BufferedCopier_1] ---> | |
| 29 // / \ | |
| 30 // socket_1 * * socket_2 | |
| 31 // \ / | |
| 32 // <------ [BufferedCopier_2] <---- | |
| 33 // | |
| 34 // When a BufferedCopier is in the READING state (see below), it only listens | |
| 35 // to events on its input socket, and won't detect when its output socket | |
| 36 // disconnects. To work around this, its peer will call its Close() method | |
| 37 // when that happens. | |
| 38 | |
| 19 class BufferedCopier { | 39 class BufferedCopier { |
| 20 public: | 40 public: |
| 41 // Possible states: | |
| 42 // READING - Empty buffer and Waiting for input. | |
| 43 // WRITING - Data in buffer, and waiting for output. | |
| 44 // CLOSING - Like WRITING, but do not try to read after that. | |
| 45 // CLOSED - Completely closed. | |
| 46 // | |
| 47 // State transitions are: | |
|
bulach
2014/01/14 14:15:01
nice documentation, thanks!
if I understood this
| |
| 48 // | |
| 49 // T01: READING ---[receive data]---> WRITING | |
| 50 // T02: READING ---[error on input socket]---> CLOSED | |
| 51 // T03: READING ---[Close() call]---> CLOSED | |
| 52 // | |
| 53 // T04: WRITING ---[write partial data]---> WRITING | |
| 54 // T05: WRITING ---[write all data]----> READING | |
| 55 // T06: WRITING ---[error on output socket]----> CLOSED | |
| 56 // T07: WRITING ---[Close() call]---> CLOSING | |
| 57 // | |
| 58 // T08: CLOSING ---[write partial data]---> CLOSING | |
| 59 // T09: CLOSING ---[write all data]----> CLOSED | |
| 60 // T10: CLOSING ---[Close() call]---> CLOSING | |
| 61 // T11: CLOSING ---[error on output socket] ---> CLOSED | |
| 62 // | |
| 63 enum State { | |
| 64 STATE_READING = 0, | |
| 65 STATE_WRITING = 1, | |
| 66 STATE_CLOSING = 2, | |
| 67 STATE_CLOSED = 3, | |
| 68 }; | |
| 69 | |
| 21 // Does NOT own the pointers. | 70 // Does NOT own the pointers. |
| 22 BufferedCopier(Socket* socket_from, | 71 BufferedCopier(Socket* socket_from, Socket* socket_to) |
| 23 Socket* socket_to) | |
| 24 : socket_from_(socket_from), | 72 : socket_from_(socket_from), |
| 25 socket_to_(socket_to), | 73 socket_to_(socket_to), |
| 26 bytes_read_(0), | 74 bytes_read_(0), |
| 27 write_offset_(0) { | 75 write_offset_(0), |
| 28 } | 76 peer_(NULL), |
| 29 | 77 state_(STATE_READING) {} |
| 30 bool AddToReadSet(fd_set* read_fds) { | 78 |
| 31 if (bytes_read_ == 0) | 79 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair. |
| 32 return socket_from_->AddFdToSet(read_fds); | 80 void SetPeer(BufferedCopier* peer) { peer_ = peer; } |
|
bulach
2014/01/14 14:15:01
nit: CHECK(!peer);
Philippe
2014/01/14 14:50:57
Yeah, good idea. I made it a DCHECK though if you
| |
| 33 return false; | 81 |
| 34 } | 82 // Gently asks to close a buffer. Called either by the peer or the forwarder. |
| 35 | 83 void Close() { |
| 36 bool AddToWriteSet(fd_set* write_fds) { | 84 switch (state_) { |
| 37 if (write_offset_ < bytes_read_) | 85 case STATE_READING: |
| 38 return socket_to_->AddFdToSet(write_fds); | 86 state_ = STATE_CLOSED; // T03 |
| 39 return false; | 87 break; |
| 40 } | 88 case STATE_WRITING: |
| 41 | 89 state_ = STATE_CLOSING; // T07 |
| 42 bool TryRead(const fd_set& read_fds) { | 90 break; |
| 43 if (!socket_from_->IsFdInSet(read_fds)) | 91 case STATE_CLOSING: |
| 44 return false; | 92 break; // T10 |
| 45 if (bytes_read_ != 0) // Can't read. | 93 case STATE_CLOSED: |
| 46 return false; | 94 ; |
| 47 int ret = socket_from_->Read(buffer_, kBufferSize); | 95 } |
| 48 if (ret > 0) { | 96 } |
| 49 bytes_read_ = ret; | 97 |
| 50 return true; | 98 // Call this before select(). This updates |read_fds|, |
| 51 } | 99 // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed. |
| 52 return false; | 100 void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) { |
| 53 } | 101 int fd; |
| 54 | 102 switch (state_) { |
| 55 bool TryWrite(const fd_set& write_fds) { | 103 case STATE_READING: |
| 56 if (!socket_to_->IsFdInSet(write_fds)) | 104 DCHECK(bytes_read_ == 0); |
| 57 return false; | 105 DCHECK(write_offset_ == 0); |
| 58 if (write_offset_ >= bytes_read_) // Nothing to write. | 106 fd = socket_from_->fd(); |
| 59 return false; | 107 if (fd < 0) { |
| 60 int ret = socket_to_->Write(buffer_ + write_offset_, | 108 ForceClose(); // T02 |
| 61 bytes_read_ - write_offset_); | 109 return; |
| 62 if (ret > 0) { | 110 } |
| 63 write_offset_ += ret; | 111 FD_SET(fd, read_fds); |
| 64 if (write_offset_ == bytes_read_) { | 112 break; |
| 113 | |
| 114 case STATE_WRITING: | |
| 115 case STATE_CLOSING: | |
| 116 DCHECK(bytes_read_ > 0); | |
| 117 DCHECK(write_offset_ < bytes_read_); | |
| 118 fd = socket_to_->fd(); | |
| 119 if (fd < 0) { | |
| 120 ForceClose(); // T06 | |
| 121 return; | |
| 122 } | |
| 123 FD_SET(fd, write_fds); | |
| 124 break; | |
| 125 | |
| 126 case STATE_CLOSED: | |
| 127 return; | |
| 128 } | |
| 129 *max_fd = std::max(*max_fd, fd); | |
| 130 } | |
| 131 | |
| 132 // Call this after a select() call to operate over the buffer. | |
| 133 void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) { | |
| 134 int fd, ret; | |
| 135 switch (state_) { | |
| 136 case STATE_READING: | |
| 137 fd = socket_from_->fd(); | |
| 138 if (fd < 0) { | |
| 139 state_ = STATE_CLOSED; // T02 | |
| 140 return; | |
| 141 } | |
| 142 if (!FD_ISSET(fd, &read_fds)) | |
| 143 return; | |
| 144 | |
| 145 ret = socket_from_->NonBlockingRead(buffer_, kBufferSize); | |
| 146 if (ret <= 0) { | |
| 147 ForceClose(); // T02 | |
| 148 return; | |
| 149 } | |
| 150 bytes_read_ = ret; | |
| 151 write_offset_ = 0; | |
| 152 state_ = STATE_WRITING; // T01 | |
| 153 break; | |
| 154 | |
| 155 case STATE_WRITING: | |
| 156 case STATE_CLOSING: | |
| 157 fd = socket_to_->fd(); | |
| 158 if (fd < 0) { | |
| 159 ForceClose(); // T06 + T11 | |
| 160 return; | |
| 161 } | |
| 162 if (!FD_ISSET(fd, &write_fds)) | |
| 163 return; | |
| 164 | |
| 165 ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_, | |
| 166 bytes_read_ - write_offset_); | |
| 167 if (ret <= 0) { | |
| 168 ForceClose(); // T06 + T11 | |
| 169 return; | |
| 170 } | |
| 171 | |
| 172 write_offset_ += ret; | |
| 173 if (write_offset_ < bytes_read_) | |
| 174 return; // T08 + T04 | |
| 175 | |
| 65 write_offset_ = 0; | 176 write_offset_ = 0; |
| 66 bytes_read_ = 0; | 177 bytes_read_ = 0; |
| 67 } | 178 if (state_ == STATE_CLOSING) { |
| 68 return true; | 179 ForceClose(); // T09 |
| 69 } | 180 return; |
| 70 return false; | 181 } |
| 182 state_ = STATE_READING; // T05 | |
| 183 break; | |
| 184 | |
| 185 case STATE_CLOSED: | |
| 186 ; | |
| 187 } | |
| 71 } | 188 } |
| 72 | 189 |
| 73 private: | 190 private: |
| 191 // Internal method used to close the buffer and notify the peer, if any. | |
| 192 void ForceClose() { | |
| 193 if (peer_) { | |
| 194 peer_->Close(); | |
| 195 peer_ = NULL; | |
| 196 } | |
| 197 state_ = STATE_CLOSED; | |
| 198 } | |
| 199 | |
| 74 // Not owned. | 200 // Not owned. |
| 75 Socket* socket_from_; | 201 Socket* socket_from_; |
| 76 Socket* socket_to_; | 202 Socket* socket_to_; |
| 77 | 203 |
| 78 // A big buffer to let our file-over-http bridge work more like real file. | 204 // A big buffer to let the file-over-http bridge work more like real file. |
| 79 static const int kBufferSize = 1024 * 128; | 205 static const int kBufferSize = 1024 * 128; |
| 80 int bytes_read_; | 206 int bytes_read_; |
| 81 int write_offset_; | 207 int write_offset_; |
| 208 BufferedCopier* peer_; | |
| 209 State state_; | |
| 82 char buffer_[kBufferSize]; | 210 char buffer_[kBufferSize]; |
| 83 | 211 |
| 84 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); | 212 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); |
| 85 }; | 213 }; |
| 86 | 214 |
| 87 // Internal class that wraps a helper thread to forward traffic between | 215 // Internal class that wraps a helper thread to forward traffic between |
| 88 // |socket1| and |socket2|. After creating a new instance, call its Start() | 216 // |socket1| and |socket2|. After creating a new instance, call its Start() |
| 89 // method to launch operations. Thread stops automatically if one of the socket | 217 // method to launch operations. Thread stops automatically if one of the socket |
| 90 // disconnects, but ensures that all buffered writes to the other, still alive, | 218 // disconnects, but ensures that all buffered writes to the other, still alive, |
| 91 // socket, are written first. When this happens, the instance will delete itself | 219 // socket, are written first. When this happens, the instance will delete itself |
| 92 // automatically. | 220 // automatically. |
| 93 // Note that the instance will always be destroyed on the same thread that | 221 // Note that the instance will always be destroyed on the same thread that |
| 94 // created it. | 222 // created it. |
| 95 class Forwarder { | 223 class Forwarder { |
| 96 public: | 224 public: |
| 225 // Create a new Forwarder instance. |socket1| and |socket2| are the two socket | |
| 226 // endpoints. | |
| 97 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) | 227 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) |
| 98 : socket1_(socket1.Pass()), | 228 : socket1_(socket1.Pass()), |
| 99 socket2_(socket2.Pass()), | 229 socket2_(socket2.Pass()), |
| 100 destructor_runner_(base::MessageLoopProxy::current()), | 230 destructor_runner_(base::MessageLoopProxy::current()), |
| 101 thread_("ForwarderThread") { | 231 thread_("ForwarderThread") {} |
| 102 } | |
| 103 | 232 |
| 104 void Start() { | 233 void Start() { |
| 105 thread_.Start(); | 234 thread_.Start(); |
| 106 thread_.message_loop_proxy()->PostTask( | 235 thread_.message_loop_proxy()->PostTask( |
| 107 FROM_HERE, | 236 FROM_HERE, |
| 108 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); | 237 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); |
| 109 } | 238 } |
| 110 | 239 |
| 111 private: | 240 private: |
| 112 void ThreadHandler() { | 241 void ThreadHandler() { |
| 113 const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; | |
| 114 fd_set read_fds; | 242 fd_set read_fds; |
| 115 fd_set write_fds; | 243 fd_set write_fds; |
| 116 | 244 |
| 117 // Copy from socket1 to socket2 | 245 // Copy from socket1 to socket2 |
| 118 BufferedCopier buffer1(socket1_.get(), socket2_.get()); | 246 BufferedCopier buffer1(socket1_.get(), socket2_.get()); |
| 247 | |
| 119 // Copy from socket2 to socket1 | 248 // Copy from socket2 to socket1 |
| 120 BufferedCopier buffer2(socket2_.get(), socket1_.get()); | 249 BufferedCopier buffer2(socket2_.get(), socket1_.get()); |
| 121 | 250 |
| 122 bool run = true; | 251 buffer1.SetPeer(&buffer2); |
| 123 while (run) { | 252 buffer2.SetPeer(&buffer1); |
| 253 | |
| 254 for (;;) { | |
| 124 FD_ZERO(&read_fds); | 255 FD_ZERO(&read_fds); |
| 125 FD_ZERO(&write_fds); | 256 FD_ZERO(&write_fds); |
| 126 | 257 |
| 127 buffer1.AddToReadSet(&read_fds); | 258 int max_fd = -1; |
| 128 buffer2.AddToReadSet(&read_fds); | 259 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); |
| 129 buffer1.AddToWriteSet(&write_fds); | 260 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); |
| 130 buffer2.AddToWriteSet(&write_fds); | |
| 131 | 261 |
| 132 if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { | 262 if (max_fd < 0) { |
| 263 // Both buffers are closed. Exit immediately. | |
| 264 break; | |
| 265 } | |
| 266 | |
| 267 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= | |
| 268 0) { | |
| 133 PLOG(ERROR) << "select"; | 269 PLOG(ERROR) << "select"; |
| 134 break; | 270 break; |
| 135 } | 271 } |
| 136 // When a socket in the read set closes the connection, select() returns | |
| 137 // with that socket descriptor set as "ready to read". When we call | |
| 138 // TryRead() below, it will return false, but the while loop will continue | |
| 139 // to run until all the write operations are finished, to make sure the | |
| 140 // buffers are completely flushed out. | |
| 141 | 272 |
| 142 // Keep running while we have some operation to do. | 273 buffer1.ProcessSelect(read_fds, write_fds); |
| 143 run = buffer1.TryRead(read_fds); | 274 buffer2.ProcessSelect(read_fds, write_fds); |
| 144 run = run || buffer2.TryRead(read_fds); | |
| 145 run = run || buffer1.TryWrite(write_fds); | |
| 146 run = run || buffer2.TryWrite(write_fds); | |
| 147 } | 275 } |
| 148 | 276 |
| 149 // Note that the thread that |destruction_runner_| runs tasks on could be | 277 // Note that the thread that |destruction_runner_| runs tasks on could be |
| 150 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close | 278 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close |
| 151 // the sockets now rather than relying on the destructor. | 279 // the sockets now rather than relying on the destructor. |
| 152 socket1_.reset(); | 280 socket1_.reset(); |
| 153 socket2_.reset(); | 281 socket2_.reset(); |
| 154 | 282 |
| 155 // Note that base::Thread must be destroyed on the thread it was created on. | 283 // Ensure the object is destroyed on the thread that created it. |
| 156 destructor_runner_->DeleteSoon(FROM_HERE, this); | 284 destructor_runner_->DeleteSoon(FROM_HERE, this); |
| 157 } | 285 } |
| 158 | 286 |
| 159 scoped_ptr<Socket> socket1_; | 287 scoped_ptr<Socket> socket1_; |
| 160 scoped_ptr<Socket> socket2_; | 288 scoped_ptr<Socket> socket2_; |
| 161 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; | 289 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; |
| 162 base::Thread thread_; | 290 base::Thread thread_; |
| 163 }; | 291 }; |
| 164 | 292 |
| 165 } // namespace | 293 } // namespace |
| 166 | 294 |
| 167 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { | 295 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { |
| 168 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); | 296 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); |
| 169 } | 297 } |
| 170 | 298 |
| 171 } // namespace forwarder2 | 299 } // namespace forwarder2 |
| OLD | NEW |