| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "tools/android/forwarder2/forwarder.h" | 5 #include "tools/android/forwarder2/forwarder.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/bind.h" | |
| 9 #include "base/logging.h" | 8 #include "base/logging.h" |
| 10 #include "base/memory/ref_counted.h" | |
| 11 #include "base/message_loop/message_loop_proxy.h" | |
| 12 #include "base/posix/eintr_wrapper.h" | 9 #include "base/posix/eintr_wrapper.h" |
| 13 #include "base/single_thread_task_runner.h" | |
| 14 #include "base/threading/thread.h" | |
| 15 #include "tools/android/forwarder2/pipe_notifier.h" | |
| 16 #include "tools/android/forwarder2/socket.h" | 10 #include "tools/android/forwarder2/socket.h" |
| 17 | 11 |
| 18 namespace forwarder2 { | 12 namespace forwarder2 { |
| 19 namespace { | 13 namespace { |
| 20 | 14 |
| 15 const int kBufferSize = 32 * 1024; |
| 16 |
| 17 } // namespace |
| 18 |
| 19 |
| 21 // Helper class to buffer reads and writes from one socket to another. | 20 // Helper class to buffer reads and writes from one socket to another. |
| 22 // Each implements a small buffer connected two one input socket, and | 21 // Each implements a small buffer connected two one input socket, and |
| 23 // one output socket. | 22 // one output socket. |
| 24 // | 23 // |
| 25 // socket_from_ ---> [BufferedCopier] ---> socket_to_ | 24 // socket_from_ ---> [BufferedCopier] ---> socket_to_ |
| 26 // | 25 // |
| 27 // These objects are used in a pair to handle duplex traffic, as in: | 26 // These objects are used in a pair to handle duplex traffic, as in: |
| 28 // | 27 // |
| 29 // ------> [BufferedCopier_1] ---> | 28 // ------> [BufferedCopier_1] ---> |
| 30 // / \ | 29 // / \ |
| 31 // socket_1 * * socket_2 | 30 // socket_1 * * socket_2 |
| 32 // \ / | 31 // \ / |
| 33 // <------ [BufferedCopier_2] <---- | 32 // <------ [BufferedCopier_2] <---- |
| 34 // | 33 // |
| 35 // When a BufferedCopier is in the READING state (see below), it only listens | 34 // When a BufferedCopier is in the READING state (see below), it only listens |
| 36 // to events on its input socket, and won't detect when its output socket | 35 // to events on its input socket, and won't detect when its output socket |
| 37 // disconnects. To work around this, its peer will call its Close() method | 36 // disconnects. To work around this, its peer will call its Close() method |
| 38 // when that happens. | 37 // when that happens. |
| 39 | 38 |
| 40 class BufferedCopier { | 39 class Forwarder::BufferedCopier { |
| 41 public: | 40 public: |
| 42 // Possible states: | 41 // Possible states: |
| 43 // READING - Empty buffer and Waiting for input. | 42 // READING - Empty buffer and Waiting for input. |
| 44 // WRITING - Data in buffer, and waiting for output. | 43 // WRITING - Data in buffer, and waiting for output. |
| 45 // CLOSING - Like WRITING, but do not try to read after that. | 44 // CLOSING - Like WRITING, but do not try to read after that. |
| 46 // CLOSED - Completely closed. | 45 // CLOSED - Completely closed. |
| 47 // | 46 // |
| 48 // State transitions are: | 47 // State transitions are: |
| 49 // | 48 // |
| 50 // T01: READING ---[receive data]---> WRITING | 49 // T01: READING ---[receive data]---> WRITING |
| (...skipping 25 matching lines...) Expand all Loading... |
| 76 write_offset_(0), | 75 write_offset_(0), |
| 77 peer_(NULL), | 76 peer_(NULL), |
| 78 state_(STATE_READING) {} | 77 state_(STATE_READING) {} |
| 79 | 78 |
| 80 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair. | 79 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair. |
| 81 void SetPeer(BufferedCopier* peer) { | 80 void SetPeer(BufferedCopier* peer) { |
| 82 DCHECK(!peer_); | 81 DCHECK(!peer_); |
| 83 peer_ = peer; | 82 peer_ = peer; |
| 84 } | 83 } |
| 85 | 84 |
| 85 bool is_closed() const { return state_ == STATE_CLOSED; } |
| 86 |
| 86 // Gently asks to close a buffer. Called either by the peer or the forwarder. | 87 // Gently asks to close a buffer. Called either by the peer or the forwarder. |
| 87 void Close() { | 88 void Close() { |
| 88 switch (state_) { | 89 switch (state_) { |
| 89 case STATE_READING: | 90 case STATE_READING: |
| 90 state_ = STATE_CLOSED; // T03 | 91 state_ = STATE_CLOSED; // T03 |
| 91 break; | 92 break; |
| 92 case STATE_WRITING: | 93 case STATE_WRITING: |
| 93 state_ = STATE_CLOSING; // T07 | 94 state_ = STATE_CLOSING; // T07 |
| 94 break; | 95 break; |
| 95 case STATE_CLOSING: | 96 case STATE_CLOSING: |
| (...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 198 peer_->Close(); | 199 peer_->Close(); |
| 199 peer_ = NULL; | 200 peer_ = NULL; |
| 200 } | 201 } |
| 201 state_ = STATE_CLOSED; | 202 state_ = STATE_CLOSED; |
| 202 } | 203 } |
| 203 | 204 |
| 204 // Not owned. | 205 // Not owned. |
| 205 Socket* socket_from_; | 206 Socket* socket_from_; |
| 206 Socket* socket_to_; | 207 Socket* socket_to_; |
| 207 | 208 |
| 208 // A big buffer to let the file-over-http bridge work more like real file. | |
| 209 static const int kBufferSize = 1024 * 128; | |
| 210 int bytes_read_; | 209 int bytes_read_; |
| 211 int write_offset_; | 210 int write_offset_; |
| 212 BufferedCopier* peer_; | 211 BufferedCopier* peer_; |
| 213 State state_; | 212 State state_; |
| 214 char buffer_[kBufferSize]; | 213 char buffer_[kBufferSize]; |
| 215 | 214 |
| 216 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); | 215 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); |
| 217 }; | 216 }; |
| 218 | 217 |
| 219 } // namespace | |
| 220 | |
| 221 Forwarder::Forwarder(scoped_ptr<Socket> socket1, | 218 Forwarder::Forwarder(scoped_ptr<Socket> socket1, |
| 222 scoped_ptr<Socket> socket2, | 219 scoped_ptr<Socket> socket2) |
| 223 PipeNotifier* deletion_notifier, | 220 : socket1_(socket1.Pass()), |
| 224 const ErrorCallback& error_callback) | |
| 225 : self_deleter_helper_(this, error_callback), | |
| 226 deletion_notifier_(deletion_notifier), | |
| 227 socket1_(socket1.Pass()), | |
| 228 socket2_(socket2.Pass()), | 221 socket2_(socket2.Pass()), |
| 229 thread_("ForwarderThread") { | 222 buffer1_(new BufferedCopier(socket1_.get(), socket2_.get())), |
| 230 DCHECK(deletion_notifier_); | 223 buffer2_(new BufferedCopier(socket2_.get(), socket1_.get())) { |
| 224 buffer1_->SetPeer(buffer2_.get()); |
| 225 buffer2_->SetPeer(buffer1_.get()); |
| 231 } | 226 } |
| 232 | 227 |
| 233 Forwarder::~Forwarder() {} | 228 Forwarder::~Forwarder() { |
| 234 | 229 DCHECK(thread_checker_.CalledOnValidThread()); |
| 235 void Forwarder::Start() { | |
| 236 thread_.Start(); | |
| 237 thread_.message_loop_proxy()->PostTask( | |
| 238 FROM_HERE, | |
| 239 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); | |
| 240 } | 230 } |
| 241 | 231 |
| 242 void Forwarder::ThreadHandler() { | 232 void Forwarder::RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd) { |
| 243 fd_set read_fds; | 233 DCHECK(thread_checker_.CalledOnValidThread()); |
| 244 fd_set write_fds; | 234 buffer1_->PrepareSelect(read_fds, write_fds, max_fd); |
| 235 buffer2_->PrepareSelect(read_fds, write_fds, max_fd); |
| 236 } |
| 245 | 237 |
| 246 // Copy from socket1 to socket2 | 238 void Forwarder::ProcessEvents(const fd_set& read_fds, const fd_set& write_fds) { |
| 247 BufferedCopier buffer1(socket1_.get(), socket2_.get()); | 239 DCHECK(thread_checker_.CalledOnValidThread()); |
| 248 // Copy from socket2 to socket1 | 240 buffer1_->ProcessSelect(read_fds, write_fds); |
| 249 BufferedCopier buffer2(socket2_.get(), socket1_.get()); | 241 buffer2_->ProcessSelect(read_fds, write_fds); |
| 242 } |
| 250 | 243 |
| 251 buffer1.SetPeer(&buffer2); | 244 bool Forwarder::IsClosed() const { |
| 252 buffer2.SetPeer(&buffer1); | 245 DCHECK(thread_checker_.CalledOnValidThread()); |
| 246 return buffer1_->is_closed() && buffer2_->is_closed(); |
| 247 } |
| 253 | 248 |
| 254 for (;;) { | 249 void Forwarder::Shutdown() { |
| 255 FD_ZERO(&read_fds); | 250 DCHECK(thread_checker_.CalledOnValidThread()); |
| 256 FD_ZERO(&write_fds); | 251 buffer1_->Close(); |
| 257 | 252 buffer2_->Close(); |
| 258 int max_fd = -1; | |
| 259 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); | |
| 260 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); | |
| 261 | |
| 262 if (max_fd < 0) { | |
| 263 // Both buffers are closed. Exit immediately. | |
| 264 break; | |
| 265 } | |
| 266 | |
| 267 const int deletion_fd = deletion_notifier_->receiver_fd(); | |
| 268 if (deletion_fd >= 0) { | |
| 269 FD_SET(deletion_fd, &read_fds); | |
| 270 max_fd = std::max(max_fd, deletion_fd); | |
| 271 } | |
| 272 | |
| 273 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= | |
| 274 0) { | |
| 275 PLOG(ERROR) << "select"; | |
| 276 break; | |
| 277 } | |
| 278 | |
| 279 buffer1.ProcessSelect(read_fds, write_fds); | |
| 280 buffer2.ProcessSelect(read_fds, write_fds); | |
| 281 | |
| 282 if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) { | |
| 283 buffer1.Close(); | |
| 284 buffer2.Close(); | |
| 285 } | |
| 286 } | |
| 287 | |
| 288 // Note that the thread that the destructor will run on could be temporarily | |
| 289 // blocked on I/O (e.g. select()) therefore it is safer to close the sockets | |
| 290 // now rather than relying on the destructor. | |
| 291 socket1_.reset(); | |
| 292 socket2_.reset(); | |
| 293 | |
| 294 self_deleter_helper_.MaybeSelfDeleteSoon(); | |
| 295 } | 253 } |
| 296 | 254 |
| 297 } // namespace forwarder2 | 255 } // namespace forwarder2 |
| OLD | NEW |