Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(871)

Unified Diff: tools/android/forwarder2/forwarder.cc

Issue 137923004: Revert "Revert 235213 "android: forwader2: Simplify Forwarder implementa..."" (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add DCHECK() in SetPeer() Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/socket.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tools/android/forwarder2/forwarder.cc
diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc
index df4c29cf9ff3ff01ef4ebca1209839900e920e08..2f25b9cb3ac223d0d12f72726e7ee6a1323918ee 100644
--- a/tools/android/forwarder2/forwarder.cc
+++ b/tools/android/forwarder2/forwarder.cc
@@ -8,77 +8,208 @@
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
+#include "base/message_loop/message_loop_proxy.h"
#include "base/posix/eintr_wrapper.h"
#include "base/single_thread_task_runner.h"
+#include "base/threading/thread.h"
#include "tools/android/forwarder2/socket.h"
namespace forwarder2 {
namespace {
// Helper class to buffer reads and writes from one socket to another.
+// Each implements a small buffer connected two one input socket, and
+// one output socket.
+//
+// socket_from_ ---> [BufferedCopier] ---> socket_to_
+//
+// These objects are used in a pair to handle duplex traffic, as in:
+//
+// ------> [BufferedCopier_1] --->
+// / \
+// socket_1 * * socket_2
+// \ /
+// <------ [BufferedCopier_2] <----
+//
+// When a BufferedCopier is in the READING state (see below), it only listens
+// to events on its input socket, and won't detect when its output socket
+// disconnects. To work around this, its peer will call its Close() method
+// when that happens.
+
class BufferedCopier {
public:
+ // Possible states:
+ // READING - Empty buffer and Waiting for input.
+ // WRITING - Data in buffer, and waiting for output.
+ // CLOSING - Like WRITING, but do not try to read after that.
+ // CLOSED - Completely closed.
+ //
+ // State transitions are:
+ //
+ // T01: READING ---[receive data]---> WRITING
+ // T02: READING ---[error on input socket]---> CLOSED
+ // T03: READING ---[Close() call]---> CLOSED
+ //
+ // T04: WRITING ---[write partial data]---> WRITING
+ // T05: WRITING ---[write all data]----> READING
+ // T06: WRITING ---[error on output socket]----> CLOSED
+ // T07: WRITING ---[Close() call]---> CLOSING
+ //
+ // T08: CLOSING ---[write partial data]---> CLOSING
+ // T09: CLOSING ---[write all data]----> CLOSED
+ // T10: CLOSING ---[Close() call]---> CLOSING
+ // T11: CLOSING ---[error on output socket] ---> CLOSED
+ //
+ enum State {
+ STATE_READING = 0,
+ STATE_WRITING = 1,
+ STATE_CLOSING = 2,
+ STATE_CLOSED = 3,
+ };
+
// Does NOT own the pointers.
- BufferedCopier(Socket* socket_from,
- Socket* socket_to)
+ BufferedCopier(Socket* socket_from, Socket* socket_to)
: socket_from_(socket_from),
socket_to_(socket_to),
bytes_read_(0),
- write_offset_(0) {
- }
+ write_offset_(0),
+ peer_(NULL),
+ state_(STATE_READING) {}
- bool AddToReadSet(fd_set* read_fds) {
- if (bytes_read_ == 0)
- return socket_from_->AddFdToSet(read_fds);
- return false;
+ // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
+ void SetPeer(BufferedCopier* peer) {
+ DCHECK(!peer_);
+ peer_ = peer;
}
- bool AddToWriteSet(fd_set* write_fds) {
- if (write_offset_ < bytes_read_)
- return socket_to_->AddFdToSet(write_fds);
- return false;
+ // Gently asks to close a buffer. Called either by the peer or the forwarder.
+ void Close() {
+ switch (state_) {
+ case STATE_READING:
+ state_ = STATE_CLOSED; // T03
+ break;
+ case STATE_WRITING:
+ state_ = STATE_CLOSING; // T07
+ break;
+ case STATE_CLOSING:
+ break; // T10
+ case STATE_CLOSED:
+ ;
+ }
}
- bool TryRead(const fd_set& read_fds) {
- if (!socket_from_->IsFdInSet(read_fds))
- return false;
- if (bytes_read_ != 0) // Can't read.
- return false;
- int ret = socket_from_->Read(buffer_, kBufferSize);
- if (ret > 0) {
- bytes_read_ = ret;
- return true;
+ // Call this before select(). This updates |read_fds|,
+ // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
+ void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
+ int fd;
+ switch (state_) {
+ case STATE_READING:
+ DCHECK(bytes_read_ == 0);
+ DCHECK(write_offset_ == 0);
+ fd = socket_from_->fd();
+ if (fd < 0) {
+ ForceClose(); // T02
+ return;
+ }
+ FD_SET(fd, read_fds);
+ break;
+
+ case STATE_WRITING:
+ case STATE_CLOSING:
+ DCHECK(bytes_read_ > 0);
+ DCHECK(write_offset_ < bytes_read_);
+ fd = socket_to_->fd();
+ if (fd < 0) {
+ ForceClose(); // T06
+ return;
+ }
+ FD_SET(fd, write_fds);
+ break;
+
+ case STATE_CLOSED:
+ return;
}
- return false;
+ *max_fd = std::max(*max_fd, fd);
}
- bool TryWrite(const fd_set& write_fds) {
- if (!socket_to_->IsFdInSet(write_fds))
- return false;
- if (write_offset_ >= bytes_read_) // Nothing to write.
- return false;
- int ret = socket_to_->Write(buffer_ + write_offset_,
- bytes_read_ - write_offset_);
- if (ret > 0) {
- write_offset_ += ret;
- if (write_offset_ == bytes_read_) {
+ // Call this after a select() call to operate over the buffer.
+ void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
+ int fd, ret;
+ switch (state_) {
+ case STATE_READING:
+ fd = socket_from_->fd();
+ if (fd < 0) {
+ state_ = STATE_CLOSED; // T02
+ return;
+ }
+ if (!FD_ISSET(fd, &read_fds))
+ return;
+
+ ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
+ if (ret <= 0) {
+ ForceClose(); // T02
+ return;
+ }
+ bytes_read_ = ret;
+ write_offset_ = 0;
+ state_ = STATE_WRITING; // T01
+ break;
+
+ case STATE_WRITING:
+ case STATE_CLOSING:
+ fd = socket_to_->fd();
+ if (fd < 0) {
+ ForceClose(); // T06 + T11
+ return;
+ }
+ if (!FD_ISSET(fd, &write_fds))
+ return;
+
+ ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
+ bytes_read_ - write_offset_);
+ if (ret <= 0) {
+ ForceClose(); // T06 + T11
+ return;
+ }
+
+ write_offset_ += ret;
+ if (write_offset_ < bytes_read_)
+ return; // T08 + T04
+
write_offset_ = 0;
bytes_read_ = 0;
- }
- return true;
+ if (state_ == STATE_CLOSING) {
+ ForceClose(); // T09
+ return;
+ }
+ state_ = STATE_READING; // T05
+ break;
+
+ case STATE_CLOSED:
+ ;
}
- return false;
}
private:
+ // Internal method used to close the buffer and notify the peer, if any.
+ void ForceClose() {
+ if (peer_) {
+ peer_->Close();
+ peer_ = NULL;
+ }
+ state_ = STATE_CLOSED;
+ }
+
// Not owned.
Socket* socket_from_;
Socket* socket_to_;
- // A big buffer to let our file-over-http bridge work more like real file.
+ // A big buffer to let the file-over-http bridge work more like real file.
static const int kBufferSize = 1024 * 128;
int bytes_read_;
int write_offset_;
+ BufferedCopier* peer_;
+ State state_;
char buffer_[kBufferSize];
DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
@@ -94,12 +225,13 @@ class BufferedCopier {
// created it.
class Forwarder {
public:
+ // Create a new Forwarder instance. |socket1| and |socket2| are the two socket
+ // endpoints.
Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
: socket1_(socket1.Pass()),
socket2_(socket2.Pass()),
destructor_runner_(base::MessageLoopProxy::current()),
- thread_("ForwarderThread") {
- }
+ thread_("ForwarderThread") {}
void Start() {
thread_.Start();
@@ -110,40 +242,39 @@ class Forwarder {
private:
void ThreadHandler() {
- const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
fd_set read_fds;
fd_set write_fds;
// Copy from socket1 to socket2
BufferedCopier buffer1(socket1_.get(), socket2_.get());
+
// Copy from socket2 to socket1
BufferedCopier buffer2(socket2_.get(), socket1_.get());
- bool run = true;
- while (run) {
+ buffer1.SetPeer(&buffer2);
+ buffer2.SetPeer(&buffer1);
+
+ for (;;) {
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
- buffer1.AddToReadSet(&read_fds);
- buffer2.AddToReadSet(&read_fds);
- buffer1.AddToWriteSet(&write_fds);
- buffer2.AddToWriteSet(&write_fds);
+ int max_fd = -1;
+ buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
+ buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
- if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) {
+ if (max_fd < 0) {
+ // Both buffers are closed. Exit immediately.
+ break;
+ }
+
+ if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
+ 0) {
PLOG(ERROR) << "select";
break;
}
- // When a socket in the read set closes the connection, select() returns
- // with that socket descriptor set as "ready to read". When we call
- // TryRead() below, it will return false, but the while loop will continue
- // to run until all the write operations are finished, to make sure the
- // buffers are completely flushed out.
-
- // Keep running while we have some operation to do.
- run = buffer1.TryRead(read_fds);
- run = run || buffer2.TryRead(read_fds);
- run = run || buffer1.TryWrite(write_fds);
- run = run || buffer2.TryWrite(write_fds);
+
+ buffer1.ProcessSelect(read_fds, write_fds);
+ buffer2.ProcessSelect(read_fds, write_fds);
}
// Note that the thread that |destruction_runner_| runs tasks on could be
@@ -152,7 +283,7 @@ class Forwarder {
socket1_.reset();
socket2_.reset();
- // Note that base::Thread must be destroyed on the thread it was created on.
+ // Ensure the object is destroyed on the thread that created it.
destructor_runner_->DeleteSoon(FROM_HERE, this);
}
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/socket.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698