Index: tools/android/forwarder2/forwarder.cc |
diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc |
index df4c29cf9ff3ff01ef4ebca1209839900e920e08..2f25b9cb3ac223d0d12f72726e7ee6a1323918ee 100644 |
--- a/tools/android/forwarder2/forwarder.cc |
+++ b/tools/android/forwarder2/forwarder.cc |
@@ -8,77 +8,208 @@ |
#include "base/bind.h" |
#include "base/logging.h" |
#include "base/memory/ref_counted.h" |
+#include "base/message_loop/message_loop_proxy.h" |
#include "base/posix/eintr_wrapper.h" |
#include "base/single_thread_task_runner.h" |
+#include "base/threading/thread.h" |
#include "tools/android/forwarder2/socket.h" |
namespace forwarder2 { |
namespace { |
// Helper class to buffer reads and writes from one socket to another. |
+// Each implements a small buffer connected two one input socket, and |
+// one output socket. |
+// |
+// socket_from_ ---> [BufferedCopier] ---> socket_to_ |
+// |
+// These objects are used in a pair to handle duplex traffic, as in: |
+// |
+// ------> [BufferedCopier_1] ---> |
+// / \ |
+// socket_1 * * socket_2 |
+// \ / |
+// <------ [BufferedCopier_2] <---- |
+// |
+// When a BufferedCopier is in the READING state (see below), it only listens |
+// to events on its input socket, and won't detect when its output socket |
+// disconnects. To work around this, its peer will call its Close() method |
+// when that happens. |
+ |
class BufferedCopier { |
public: |
+ // Possible states: |
+ // READING - Empty buffer and Waiting for input. |
+ // WRITING - Data in buffer, and waiting for output. |
+ // CLOSING - Like WRITING, but do not try to read after that. |
+ // CLOSED - Completely closed. |
+ // |
+ // State transitions are: |
+ // |
+ // T01: READING ---[receive data]---> WRITING |
+ // T02: READING ---[error on input socket]---> CLOSED |
+ // T03: READING ---[Close() call]---> CLOSED |
+ // |
+ // T04: WRITING ---[write partial data]---> WRITING |
+ // T05: WRITING ---[write all data]----> READING |
+ // T06: WRITING ---[error on output socket]----> CLOSED |
+ // T07: WRITING ---[Close() call]---> CLOSING |
+ // |
+ // T08: CLOSING ---[write partial data]---> CLOSING |
+ // T09: CLOSING ---[write all data]----> CLOSED |
+ // T10: CLOSING ---[Close() call]---> CLOSING |
+ // T11: CLOSING ---[error on output socket] ---> CLOSED |
+ // |
+ enum State { |
+ STATE_READING = 0, |
+ STATE_WRITING = 1, |
+ STATE_CLOSING = 2, |
+ STATE_CLOSED = 3, |
+ }; |
+ |
// Does NOT own the pointers. |
- BufferedCopier(Socket* socket_from, |
- Socket* socket_to) |
+ BufferedCopier(Socket* socket_from, Socket* socket_to) |
: socket_from_(socket_from), |
socket_to_(socket_to), |
bytes_read_(0), |
- write_offset_(0) { |
- } |
+ write_offset_(0), |
+ peer_(NULL), |
+ state_(STATE_READING) {} |
- bool AddToReadSet(fd_set* read_fds) { |
- if (bytes_read_ == 0) |
- return socket_from_->AddFdToSet(read_fds); |
- return false; |
+ // Sets the 'peer_' field pointing to the other BufferedCopier in a pair. |
+ void SetPeer(BufferedCopier* peer) { |
+ DCHECK(!peer_); |
+ peer_ = peer; |
} |
- bool AddToWriteSet(fd_set* write_fds) { |
- if (write_offset_ < bytes_read_) |
- return socket_to_->AddFdToSet(write_fds); |
- return false; |
+ // Gently asks to close a buffer. Called either by the peer or the forwarder. |
+ void Close() { |
+ switch (state_) { |
+ case STATE_READING: |
+ state_ = STATE_CLOSED; // T03 |
+ break; |
+ case STATE_WRITING: |
+ state_ = STATE_CLOSING; // T07 |
+ break; |
+ case STATE_CLOSING: |
+ break; // T10 |
+ case STATE_CLOSED: |
+ ; |
+ } |
} |
- bool TryRead(const fd_set& read_fds) { |
- if (!socket_from_->IsFdInSet(read_fds)) |
- return false; |
- if (bytes_read_ != 0) // Can't read. |
- return false; |
- int ret = socket_from_->Read(buffer_, kBufferSize); |
- if (ret > 0) { |
- bytes_read_ = ret; |
- return true; |
+ // Call this before select(). This updates |read_fds|, |
+ // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed. |
+ void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) { |
+ int fd; |
+ switch (state_) { |
+ case STATE_READING: |
+ DCHECK(bytes_read_ == 0); |
+ DCHECK(write_offset_ == 0); |
+ fd = socket_from_->fd(); |
+ if (fd < 0) { |
+ ForceClose(); // T02 |
+ return; |
+ } |
+ FD_SET(fd, read_fds); |
+ break; |
+ |
+ case STATE_WRITING: |
+ case STATE_CLOSING: |
+ DCHECK(bytes_read_ > 0); |
+ DCHECK(write_offset_ < bytes_read_); |
+ fd = socket_to_->fd(); |
+ if (fd < 0) { |
+ ForceClose(); // T06 |
+ return; |
+ } |
+ FD_SET(fd, write_fds); |
+ break; |
+ |
+ case STATE_CLOSED: |
+ return; |
} |
- return false; |
+ *max_fd = std::max(*max_fd, fd); |
} |
- bool TryWrite(const fd_set& write_fds) { |
- if (!socket_to_->IsFdInSet(write_fds)) |
- return false; |
- if (write_offset_ >= bytes_read_) // Nothing to write. |
- return false; |
- int ret = socket_to_->Write(buffer_ + write_offset_, |
- bytes_read_ - write_offset_); |
- if (ret > 0) { |
- write_offset_ += ret; |
- if (write_offset_ == bytes_read_) { |
+ // Call this after a select() call to operate over the buffer. |
+ void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) { |
+ int fd, ret; |
+ switch (state_) { |
+ case STATE_READING: |
+ fd = socket_from_->fd(); |
+ if (fd < 0) { |
+ state_ = STATE_CLOSED; // T02 |
+ return; |
+ } |
+ if (!FD_ISSET(fd, &read_fds)) |
+ return; |
+ |
+ ret = socket_from_->NonBlockingRead(buffer_, kBufferSize); |
+ if (ret <= 0) { |
+ ForceClose(); // T02 |
+ return; |
+ } |
+ bytes_read_ = ret; |
+ write_offset_ = 0; |
+ state_ = STATE_WRITING; // T01 |
+ break; |
+ |
+ case STATE_WRITING: |
+ case STATE_CLOSING: |
+ fd = socket_to_->fd(); |
+ if (fd < 0) { |
+ ForceClose(); // T06 + T11 |
+ return; |
+ } |
+ if (!FD_ISSET(fd, &write_fds)) |
+ return; |
+ |
+ ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_, |
+ bytes_read_ - write_offset_); |
+ if (ret <= 0) { |
+ ForceClose(); // T06 + T11 |
+ return; |
+ } |
+ |
+ write_offset_ += ret; |
+ if (write_offset_ < bytes_read_) |
+ return; // T08 + T04 |
+ |
write_offset_ = 0; |
bytes_read_ = 0; |
- } |
- return true; |
+ if (state_ == STATE_CLOSING) { |
+ ForceClose(); // T09 |
+ return; |
+ } |
+ state_ = STATE_READING; // T05 |
+ break; |
+ |
+ case STATE_CLOSED: |
+ ; |
} |
- return false; |
} |
private: |
+ // Internal method used to close the buffer and notify the peer, if any. |
+ void ForceClose() { |
+ if (peer_) { |
+ peer_->Close(); |
+ peer_ = NULL; |
+ } |
+ state_ = STATE_CLOSED; |
+ } |
+ |
// Not owned. |
Socket* socket_from_; |
Socket* socket_to_; |
- // A big buffer to let our file-over-http bridge work more like real file. |
+ // A big buffer to let the file-over-http bridge work more like real file. |
static const int kBufferSize = 1024 * 128; |
int bytes_read_; |
int write_offset_; |
+ BufferedCopier* peer_; |
+ State state_; |
char buffer_[kBufferSize]; |
DISALLOW_COPY_AND_ASSIGN(BufferedCopier); |
@@ -94,12 +225,13 @@ class BufferedCopier { |
// created it. |
class Forwarder { |
public: |
+ // Create a new Forwarder instance. |socket1| and |socket2| are the two socket |
+ // endpoints. |
Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) |
: socket1_(socket1.Pass()), |
socket2_(socket2.Pass()), |
destructor_runner_(base::MessageLoopProxy::current()), |
- thread_("ForwarderThread") { |
- } |
+ thread_("ForwarderThread") {} |
void Start() { |
thread_.Start(); |
@@ -110,40 +242,39 @@ class Forwarder { |
private: |
void ThreadHandler() { |
- const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; |
fd_set read_fds; |
fd_set write_fds; |
// Copy from socket1 to socket2 |
BufferedCopier buffer1(socket1_.get(), socket2_.get()); |
+ |
// Copy from socket2 to socket1 |
BufferedCopier buffer2(socket2_.get(), socket1_.get()); |
- bool run = true; |
- while (run) { |
+ buffer1.SetPeer(&buffer2); |
+ buffer2.SetPeer(&buffer1); |
+ |
+ for (;;) { |
FD_ZERO(&read_fds); |
FD_ZERO(&write_fds); |
- buffer1.AddToReadSet(&read_fds); |
- buffer2.AddToReadSet(&read_fds); |
- buffer1.AddToWriteSet(&write_fds); |
- buffer2.AddToWriteSet(&write_fds); |
+ int max_fd = -1; |
+ buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); |
+ buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); |
- if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { |
+ if (max_fd < 0) { |
+ // Both buffers are closed. Exit immediately. |
+ break; |
+ } |
+ |
+ if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= |
+ 0) { |
PLOG(ERROR) << "select"; |
break; |
} |
- // When a socket in the read set closes the connection, select() returns |
- // with that socket descriptor set as "ready to read". When we call |
- // TryRead() below, it will return false, but the while loop will continue |
- // to run until all the write operations are finished, to make sure the |
- // buffers are completely flushed out. |
- |
- // Keep running while we have some operation to do. |
- run = buffer1.TryRead(read_fds); |
- run = run || buffer2.TryRead(read_fds); |
- run = run || buffer1.TryWrite(write_fds); |
- run = run || buffer2.TryWrite(write_fds); |
+ |
+ buffer1.ProcessSelect(read_fds, write_fds); |
+ buffer2.ProcessSelect(read_fds, write_fds); |
} |
// Note that the thread that |destruction_runner_| runs tasks on could be |
@@ -152,7 +283,7 @@ class Forwarder { |
socket1_.reset(); |
socket2_.reset(); |
- // Note that base::Thread must be destroyed on the thread it was created on. |
+ // Ensure the object is destroyed on the thread that created it. |
destructor_runner_->DeleteSoon(FROM_HERE, this); |
} |