| Index: tools/android/forwarder2/forwarders_manager.cc
|
| diff --git a/tools/android/forwarder2/forwarders_manager.cc b/tools/android/forwarder2/forwarders_manager.cc
|
| index 1b3cd2c46ef1ecfc84449683afce233c585f702d..1795cb5cc01e94f19f5fcd61c33bec28cb14200e 100644
|
| --- a/tools/android/forwarder2/forwarders_manager.cc
|
| +++ b/tools/android/forwarder2/forwarders_manager.cc
|
| @@ -4,88 +4,129 @@
|
|
|
| #include "tools/android/forwarder2/forwarders_manager.h"
|
|
|
| +#include <sys/select.h>
|
| +#include <unistd.h>
|
| +
|
| #include <algorithm>
|
|
|
| #include "base/basictypes.h"
|
| #include "base/bind.h"
|
| +#include "base/callback_helpers.h"
|
| #include "base/location.h"
|
| #include "base/logging.h"
|
| #include "base/message_loop/message_loop_proxy.h"
|
| +#include "base/posix/eintr_wrapper.h"
|
| #include "tools/android/forwarder2/forwarder.h"
|
| #include "tools/android/forwarder2/socket.h"
|
|
|
| namespace forwarder2 {
|
|
|
| -ForwardersManager::ForwardersManager() : delegate_(new Delegate()) {}
|
| +ForwardersManager::ForwardersManager() : thread_("ForwardersManagerThread") {
|
| + thread_.Start();
|
| + WaitForEventsOnInternalThreadSoon();
|
| +}
|
| +
|
|
|
| ForwardersManager::~ForwardersManager() {
|
| - delegate_->Clear();
|
| + deletion_notifier_.Notify();
|
| }
|
|
|
| void ForwardersManager::CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,
|
| scoped_ptr<Socket> socket2) {
|
| - delegate_->CreateAndStartNewForwarder(socket1.Pass(), socket2.Pass());
|
| + // Note that the internal Forwarder vector is populated on the internal thread
|
| + // which is the only thread from which it's accessed.
|
| + thread_.message_loop_proxy()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&ForwardersManager::CreateNewForwarderOnInternalThread,
|
| + base::Unretained(this), base::Passed(&socket1),
|
| + base::Passed(&socket2)));
|
| +
|
| + // Guarantees that the CreateNewForwarderOnInternalThread callback posted to
|
| + // the internal thread gets executed immediately.
|
| + wakeup_notifier_.Notify();
|
| }
|
|
|
| -ForwardersManager::Delegate::Delegate() {}
|
| +void ForwardersManager::CreateNewForwarderOnInternalThread(
|
| + scoped_ptr<Socket> socket1,
|
| + scoped_ptr<Socket> socket2) {
|
| + DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
|
| + forwarders_.push_back(new Forwarder(socket1.Pass(), socket2.Pass()));
|
| +}
|
|
|
| -ForwardersManager::Delegate::~Delegate() {
|
| - // The forwarder instances should already have been deleted on their
|
| - // construction thread. Deleting them here would be unsafe since we don't know
|
| - // which thread this destructor is called on.
|
| - DCHECK(forwarders_.empty());
|
| +void ForwardersManager::WaitForEventsOnInternalThreadSoon() {
|
| + thread_.message_loop_proxy()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&ForwardersManager::WaitForEventsOnInternalThread,
|
| + base::Unretained(this)));
|
| }
|
|
|
| -void ForwardersManager::Delegate::Clear() {
|
| - if (!forwarders_constructor_runner_) {
|
| - DCHECK(forwarders_.empty());
|
| - return;
|
| +void ForwardersManager::WaitForEventsOnInternalThread() {
|
| + DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
|
| + fd_set read_fds;
|
| + fd_set write_fds;
|
| +
|
| + FD_ZERO(&read_fds);
|
| + FD_ZERO(&write_fds);
|
| +
|
| + // Populate the file descriptor sets.
|
| + int max_fd = -1;
|
| + for (ScopedVector<Forwarder>::iterator it = forwarders_.begin();
|
| + it != forwarders_.end(); ++it) {
|
| + Forwarder* const forwarder = *it;
|
| + forwarder->RegisterFDs(&read_fds, &write_fds, &max_fd);
|
| + }
|
| +
|
| + const int notifier_fds[] = {
|
| + wakeup_notifier_.receiver_fd(),
|
| + deletion_notifier_.receiver_fd(),
|
| + };
|
| +
|
| + for (int i = 0; i < arraysize(notifier_fds); ++i) {
|
| + const int notifier_fd = notifier_fds[i];
|
| + DCHECK_GT(notifier_fd, -1);
|
| + FD_SET(notifier_fd, &read_fds);
|
| + max_fd = std::max(max_fd, notifier_fd);
|
| }
|
| - if (forwarders_constructor_runner_->RunsTasksOnCurrentThread()) {
|
| - ClearOnForwarderConstructorThread();
|
| +
|
| + const int ret = HANDLE_EINTR(
|
| + select(max_fd + 1, &read_fds, &write_fds, NULL, NULL));
|
| + if (ret < 0) {
|
| + PLOG(ERROR) << "select";
|
| return;
|
| }
|
| - forwarders_constructor_runner_->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(
|
| - &ForwardersManager::Delegate::ClearOnForwarderConstructorThread,
|
| - this));
|
| -}
|
|
|
| -void ForwardersManager::Delegate::CreateAndStartNewForwarder(
|
| - scoped_ptr<Socket> socket1,
|
| - scoped_ptr<Socket> socket2) {
|
| - const scoped_refptr<base::SingleThreadTaskRunner> current_task_runner(
|
| - base::MessageLoopProxy::current());
|
| - DCHECK(current_task_runner);
|
| - if (forwarders_constructor_runner_) {
|
| - DCHECK_EQ(current_task_runner, forwarders_constructor_runner_);
|
| - } else {
|
| - forwarders_constructor_runner_ = current_task_runner;
|
| + const bool must_shutdown = FD_ISSET(
|
| + deletion_notifier_.receiver_fd(), &read_fds);
|
| + if (must_shutdown && forwarders_.empty())
|
| + return;
|
| +
|
| + base::ScopedClosureRunner wait_for_events_soon(
|
| + base::Bind(&ForwardersManager::WaitForEventsOnInternalThreadSoon,
|
| + base::Unretained(this)));
|
| +
|
| + if (FD_ISSET(wakeup_notifier_.receiver_fd(), &read_fds)) {
|
| + // Note that the events on FDs other than the wakeup notifier one, if any,
|
| + // will be processed upon the next select().
|
| + wakeup_notifier_.Reset();
|
| + return;
|
| }
|
| - forwarders_.push_back(
|
| - new Forwarder(socket1.Pass(), socket2.Pass(),
|
| - &deletion_notifier_,
|
| - base::Bind(&ForwardersManager::Delegate::OnForwarderError,
|
| - this)));
|
| - forwarders_.back()->Start();
|
| -}
|
|
|
| -void ForwardersManager::Delegate::OnForwarderError(
|
| - scoped_ptr<Forwarder> forwarder) {
|
| - DCHECK(forwarders_constructor_runner_->RunsTasksOnCurrentThread());
|
| - const ScopedVector<Forwarder>::iterator it = std::find(
|
| - forwarders_.begin(), forwarders_.end(), forwarder.get());
|
| - DCHECK(it != forwarders_.end());
|
| - std::swap(*it, forwarders_.back());
|
| - forwarders_.pop_back();
|
| - ignore_result(forwarder.release()); // Deleted by the pop_back() above.
|
| -}
|
| + // Notify the Forwarder instances and remove the ones that are closed.
|
| + for (size_t i = 0; i < forwarders_.size(); ) {
|
| + Forwarder* const forwarder = forwarders_[i];
|
| + forwarder->ProcessEvents(read_fds, write_fds);
|
|
|
| -void ForwardersManager::Delegate::ClearOnForwarderConstructorThread() {
|
| - DCHECK(forwarders_constructor_runner_->RunsTasksOnCurrentThread());
|
| - deletion_notifier_.Notify();
|
| - forwarders_.clear();
|
| + if (must_shutdown)
|
| + forwarder->Shutdown();
|
| +
|
| + if (!forwarder->IsClosed()) {
|
| + ++i;
|
| + continue;
|
| + }
|
| +
|
| + std::swap(forwarders_[i], forwarders_.back());
|
| + forwarders_.pop_back();
|
| + }
|
| }
|
|
|
| } // namespace forwarder2
|
|
|