| Index: tools/android/forwarder2/forwarder.cc
|
| diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc
|
| index df4c29cf9ff3ff01ef4ebca1209839900e920e08..2f25b9cb3ac223d0d12f72726e7ee6a1323918ee 100644
|
| --- a/tools/android/forwarder2/forwarder.cc
|
| +++ b/tools/android/forwarder2/forwarder.cc
|
| @@ -8,77 +8,208 @@
|
| #include "base/bind.h"
|
| #include "base/logging.h"
|
| #include "base/memory/ref_counted.h"
|
| +#include "base/message_loop/message_loop_proxy.h"
|
| #include "base/posix/eintr_wrapper.h"
|
| #include "base/single_thread_task_runner.h"
|
| +#include "base/threading/thread.h"
|
| #include "tools/android/forwarder2/socket.h"
|
|
|
| namespace forwarder2 {
|
| namespace {
|
|
|
| // Helper class to buffer reads and writes from one socket to another.
|
| +// Each implements a small buffer connected two one input socket, and
|
| +// one output socket.
|
| +//
|
| +// socket_from_ ---> [BufferedCopier] ---> socket_to_
|
| +//
|
| +// These objects are used in a pair to handle duplex traffic, as in:
|
| +//
|
| +// ------> [BufferedCopier_1] --->
|
| +// / \
|
| +// socket_1 * * socket_2
|
| +// \ /
|
| +// <------ [BufferedCopier_2] <----
|
| +//
|
| +// When a BufferedCopier is in the READING state (see below), it only listens
|
| +// to events on its input socket, and won't detect when its output socket
|
| +// disconnects. To work around this, its peer will call its Close() method
|
| +// when that happens.
|
| +
|
| class BufferedCopier {
|
| public:
|
| + // Possible states:
|
| + // READING - Empty buffer and Waiting for input.
|
| + // WRITING - Data in buffer, and waiting for output.
|
| + // CLOSING - Like WRITING, but do not try to read after that.
|
| + // CLOSED - Completely closed.
|
| + //
|
| + // State transitions are:
|
| + //
|
| + // T01: READING ---[receive data]---> WRITING
|
| + // T02: READING ---[error on input socket]---> CLOSED
|
| + // T03: READING ---[Close() call]---> CLOSED
|
| + //
|
| + // T04: WRITING ---[write partial data]---> WRITING
|
| + // T05: WRITING ---[write all data]----> READING
|
| + // T06: WRITING ---[error on output socket]----> CLOSED
|
| + // T07: WRITING ---[Close() call]---> CLOSING
|
| + //
|
| + // T08: CLOSING ---[write partial data]---> CLOSING
|
| + // T09: CLOSING ---[write all data]----> CLOSED
|
| + // T10: CLOSING ---[Close() call]---> CLOSING
|
| + // T11: CLOSING ---[error on output socket] ---> CLOSED
|
| + //
|
| + enum State {
|
| + STATE_READING = 0,
|
| + STATE_WRITING = 1,
|
| + STATE_CLOSING = 2,
|
| + STATE_CLOSED = 3,
|
| + };
|
| +
|
| // Does NOT own the pointers.
|
| - BufferedCopier(Socket* socket_from,
|
| - Socket* socket_to)
|
| + BufferedCopier(Socket* socket_from, Socket* socket_to)
|
| : socket_from_(socket_from),
|
| socket_to_(socket_to),
|
| bytes_read_(0),
|
| - write_offset_(0) {
|
| - }
|
| + write_offset_(0),
|
| + peer_(NULL),
|
| + state_(STATE_READING) {}
|
|
|
| - bool AddToReadSet(fd_set* read_fds) {
|
| - if (bytes_read_ == 0)
|
| - return socket_from_->AddFdToSet(read_fds);
|
| - return false;
|
| + // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
|
| + void SetPeer(BufferedCopier* peer) {
|
| + DCHECK(!peer_);
|
| + peer_ = peer;
|
| }
|
|
|
| - bool AddToWriteSet(fd_set* write_fds) {
|
| - if (write_offset_ < bytes_read_)
|
| - return socket_to_->AddFdToSet(write_fds);
|
| - return false;
|
| + // Gently asks to close a buffer. Called either by the peer or the forwarder.
|
| + void Close() {
|
| + switch (state_) {
|
| + case STATE_READING:
|
| + state_ = STATE_CLOSED; // T03
|
| + break;
|
| + case STATE_WRITING:
|
| + state_ = STATE_CLOSING; // T07
|
| + break;
|
| + case STATE_CLOSING:
|
| + break; // T10
|
| + case STATE_CLOSED:
|
| + ;
|
| + }
|
| }
|
|
|
| - bool TryRead(const fd_set& read_fds) {
|
| - if (!socket_from_->IsFdInSet(read_fds))
|
| - return false;
|
| - if (bytes_read_ != 0) // Can't read.
|
| - return false;
|
| - int ret = socket_from_->Read(buffer_, kBufferSize);
|
| - if (ret > 0) {
|
| - bytes_read_ = ret;
|
| - return true;
|
| + // Call this before select(). This updates |read_fds|,
|
| + // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
|
| + void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
|
| + int fd;
|
| + switch (state_) {
|
| + case STATE_READING:
|
| + DCHECK(bytes_read_ == 0);
|
| + DCHECK(write_offset_ == 0);
|
| + fd = socket_from_->fd();
|
| + if (fd < 0) {
|
| + ForceClose(); // T02
|
| + return;
|
| + }
|
| + FD_SET(fd, read_fds);
|
| + break;
|
| +
|
| + case STATE_WRITING:
|
| + case STATE_CLOSING:
|
| + DCHECK(bytes_read_ > 0);
|
| + DCHECK(write_offset_ < bytes_read_);
|
| + fd = socket_to_->fd();
|
| + if (fd < 0) {
|
| + ForceClose(); // T06
|
| + return;
|
| + }
|
| + FD_SET(fd, write_fds);
|
| + break;
|
| +
|
| + case STATE_CLOSED:
|
| + return;
|
| }
|
| - return false;
|
| + *max_fd = std::max(*max_fd, fd);
|
| }
|
|
|
| - bool TryWrite(const fd_set& write_fds) {
|
| - if (!socket_to_->IsFdInSet(write_fds))
|
| - return false;
|
| - if (write_offset_ >= bytes_read_) // Nothing to write.
|
| - return false;
|
| - int ret = socket_to_->Write(buffer_ + write_offset_,
|
| - bytes_read_ - write_offset_);
|
| - if (ret > 0) {
|
| - write_offset_ += ret;
|
| - if (write_offset_ == bytes_read_) {
|
| + // Call this after a select() call to operate over the buffer.
|
| + void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
|
| + int fd, ret;
|
| + switch (state_) {
|
| + case STATE_READING:
|
| + fd = socket_from_->fd();
|
| + if (fd < 0) {
|
| + state_ = STATE_CLOSED; // T02
|
| + return;
|
| + }
|
| + if (!FD_ISSET(fd, &read_fds))
|
| + return;
|
| +
|
| + ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
|
| + if (ret <= 0) {
|
| + ForceClose(); // T02
|
| + return;
|
| + }
|
| + bytes_read_ = ret;
|
| + write_offset_ = 0;
|
| + state_ = STATE_WRITING; // T01
|
| + break;
|
| +
|
| + case STATE_WRITING:
|
| + case STATE_CLOSING:
|
| + fd = socket_to_->fd();
|
| + if (fd < 0) {
|
| + ForceClose(); // T06 + T11
|
| + return;
|
| + }
|
| + if (!FD_ISSET(fd, &write_fds))
|
| + return;
|
| +
|
| + ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
|
| + bytes_read_ - write_offset_);
|
| + if (ret <= 0) {
|
| + ForceClose(); // T06 + T11
|
| + return;
|
| + }
|
| +
|
| + write_offset_ += ret;
|
| + if (write_offset_ < bytes_read_)
|
| + return; // T08 + T04
|
| +
|
| write_offset_ = 0;
|
| bytes_read_ = 0;
|
| - }
|
| - return true;
|
| + if (state_ == STATE_CLOSING) {
|
| + ForceClose(); // T09
|
| + return;
|
| + }
|
| + state_ = STATE_READING; // T05
|
| + break;
|
| +
|
| + case STATE_CLOSED:
|
| + ;
|
| }
|
| - return false;
|
| }
|
|
|
| private:
|
| + // Internal method used to close the buffer and notify the peer, if any.
|
| + void ForceClose() {
|
| + if (peer_) {
|
| + peer_->Close();
|
| + peer_ = NULL;
|
| + }
|
| + state_ = STATE_CLOSED;
|
| + }
|
| +
|
| // Not owned.
|
| Socket* socket_from_;
|
| Socket* socket_to_;
|
|
|
| - // A big buffer to let our file-over-http bridge work more like real file.
|
| + // A big buffer to let the file-over-http bridge work more like real file.
|
| static const int kBufferSize = 1024 * 128;
|
| int bytes_read_;
|
| int write_offset_;
|
| + BufferedCopier* peer_;
|
| + State state_;
|
| char buffer_[kBufferSize];
|
|
|
| DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
|
| @@ -94,12 +225,13 @@ class BufferedCopier {
|
| // created it.
|
| class Forwarder {
|
| public:
|
| + // Create a new Forwarder instance. |socket1| and |socket2| are the two socket
|
| + // endpoints.
|
| Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
|
| : socket1_(socket1.Pass()),
|
| socket2_(socket2.Pass()),
|
| destructor_runner_(base::MessageLoopProxy::current()),
|
| - thread_("ForwarderThread") {
|
| - }
|
| + thread_("ForwarderThread") {}
|
|
|
| void Start() {
|
| thread_.Start();
|
| @@ -110,40 +242,39 @@ class Forwarder {
|
|
|
| private:
|
| void ThreadHandler() {
|
| - const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
|
| fd_set read_fds;
|
| fd_set write_fds;
|
|
|
| // Copy from socket1 to socket2
|
| BufferedCopier buffer1(socket1_.get(), socket2_.get());
|
| +
|
| // Copy from socket2 to socket1
|
| BufferedCopier buffer2(socket2_.get(), socket1_.get());
|
|
|
| - bool run = true;
|
| - while (run) {
|
| + buffer1.SetPeer(&buffer2);
|
| + buffer2.SetPeer(&buffer1);
|
| +
|
| + for (;;) {
|
| FD_ZERO(&read_fds);
|
| FD_ZERO(&write_fds);
|
|
|
| - buffer1.AddToReadSet(&read_fds);
|
| - buffer2.AddToReadSet(&read_fds);
|
| - buffer1.AddToWriteSet(&write_fds);
|
| - buffer2.AddToWriteSet(&write_fds);
|
| + int max_fd = -1;
|
| + buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
|
| + buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
|
|
|
| - if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) {
|
| + if (max_fd < 0) {
|
| + // Both buffers are closed. Exit immediately.
|
| + break;
|
| + }
|
| +
|
| + if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
|
| + 0) {
|
| PLOG(ERROR) << "select";
|
| break;
|
| }
|
| - // When a socket in the read set closes the connection, select() returns
|
| - // with that socket descriptor set as "ready to read". When we call
|
| - // TryRead() below, it will return false, but the while loop will continue
|
| - // to run until all the write operations are finished, to make sure the
|
| - // buffers are completely flushed out.
|
| -
|
| - // Keep running while we have some operation to do.
|
| - run = buffer1.TryRead(read_fds);
|
| - run = run || buffer2.TryRead(read_fds);
|
| - run = run || buffer1.TryWrite(write_fds);
|
| - run = run || buffer2.TryWrite(write_fds);
|
| +
|
| + buffer1.ProcessSelect(read_fds, write_fds);
|
| + buffer2.ProcessSelect(read_fds, write_fds);
|
| }
|
|
|
| // Note that the thread that |destruction_runner_| runs tasks on could be
|
| @@ -152,7 +283,7 @@ class Forwarder {
|
| socket1_.reset();
|
| socket2_.reset();
|
|
|
| - // Note that base::Thread must be destroyed on the thread it was created on.
|
| + // Ensure the object is destroyed on the thread that created it.
|
| destructor_runner_->DeleteSoon(FROM_HERE, this);
|
| }
|
|
|
|
|