Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(77)

Side by Side Diff: tools/android/forwarder2/forwarders_manager.cc

Issue 148113003: forwarder2: Make the Forwarder instances operate on a single thread. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "tools/android/forwarder2/forwarders_manager.h" 5 #include "tools/android/forwarder2/forwarders_manager.h"
6 6
7 #include <sys/select.h>
8 #include <unistd.h>
9
7 #include <algorithm> 10 #include <algorithm>
8 11
9 #include "base/basictypes.h" 12 #include "base/basictypes.h"
10 #include "base/bind.h" 13 #include "base/bind.h"
11 #include "base/location.h" 14 #include "base/location.h"
12 #include "base/logging.h" 15 #include "base/logging.h"
13 #include "base/message_loop/message_loop_proxy.h" 16 #include "base/message_loop/message_loop_proxy.h"
17 #include "base/posix/eintr_wrapper.h"
14 #include "tools/android/forwarder2/forwarder.h" 18 #include "tools/android/forwarder2/forwarder.h"
15 #include "tools/android/forwarder2/socket.h" 19 #include "tools/android/forwarder2/socket.h"
16 20
17 namespace forwarder2 { 21 namespace forwarder2 {
18 22
19 ForwardersManager::ForwardersManager() : delegate_(new Delegate()) {} 23 ForwardersManager::ForwardersManager() : thread_("ForwardersManagerThread") {
24 thread_.Start();
25 }
26
20 27
21 ForwardersManager::~ForwardersManager() { 28 ForwardersManager::~ForwardersManager() {
22 delegate_->Clear(); 29 deletion_notifier_.Notify();
23 } 30 }
24 31
25 void ForwardersManager::CreateAndStartNewForwarder(scoped_ptr<Socket> socket1, 32 void ForwardersManager::CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,
26 scoped_ptr<Socket> socket2) { 33 scoped_ptr<Socket> socket2) {
27 delegate_->CreateAndStartNewForwarder(socket1.Pass(), socket2.Pass()); 34 // Note that the internal Forwarder vector is populated on the internal thread
35 // which is the only thread from which it's accessed.
36 thread_.message_loop_proxy()->PostTask(
37 FROM_HERE,
38 base::Bind(&ForwardersManager::CreateNewForwarderOnInternalThread,
39 base::Unretained(this), base::Passed(&socket1),
40 base::Passed(&socket2)));
41
42 // Guarantees that the CreateNewForwarderOnInternalThread callback posted to
43 // the internal thread gets executed immediately.
44 wakeup_notifier_.Notify();
45
46 WaitForEventsOnInternalThreadSoon();
28 } 47 }
29 48
30 ForwardersManager::Delegate::Delegate() {} 49 void ForwardersManager::CreateNewForwarderOnInternalThread(
31 50 scoped_ptr<Socket> socket1,
32 ForwardersManager::Delegate::~Delegate() { 51 scoped_ptr<Socket> socket2) {
33 // The forwarder instances should already have been deleted on their 52 DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
34 // construction thread. Deleting them here would be unsafe since we don't know 53 forwarders_.push_back(new Forwarder(socket1.Pass(), socket2.Pass()));
35 // which thread this destructor is called on. 54 wakeup_notifier_.Reset();
36 DCHECK(forwarders_.empty());
37 } 55 }
38 56
39 void ForwardersManager::Delegate::Clear() { 57 void ForwardersManager::WaitForEventsOnInternalThreadSoon() {
40 if (!forwarders_constructor_runner_) { 58 thread_.message_loop_proxy()->PostTask(
41 DCHECK(forwarders_.empty()); 59 FROM_HERE,
60 base::Bind(&ForwardersManager::WaitForEventsOnInternalThread,
61 base::Unretained(this)));
62 }
63
64 void ForwardersManager::WaitForEventsOnInternalThread() {
65 DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
66 if (forwarders_.empty())
67 return;
68
69 fd_set read_fds;
70 fd_set write_fds;
71
72 FD_ZERO(&read_fds);
73 FD_ZERO(&write_fds);
74
75 // Populate the file descriptor sets.
76 int max_fd = -1;
77 for (ScopedVector<Forwarder>::iterator it = forwarders_.begin();
78 it != forwarders_.end(); ++it) {
79 Forwarder* const forwarder = *it;
80 forwarder->RegisterFDs(&read_fds, &write_fds, &max_fd);
81 }
82
83 const int notifier_fds[] = {
84 wakeup_notifier_.receiver_fd(),
85 deletion_notifier_.receiver_fd(),
86 };
87
88 for (int i = 0; i < arraysize(notifier_fds); ++i) {
89 const int notifier_fd = notifier_fds[i];
90 DCHECK_GT(notifier_fd, -1);
91 FD_SET(notifier_fd, &read_fds);
92 max_fd = std::max(max_fd, notifier_fd);
93 }
94
95 const int ret = HANDLE_EINTR(
96 select(max_fd + 1, &read_fds, &write_fds, NULL, NULL));
97 if (ret < 0) {
98 PLOG(ERROR) << "select";
42 return; 99 return;
43 } 100 }
44 if (forwarders_constructor_runner_->RunsTasksOnCurrentThread()) { 101
45 ClearOnForwarderConstructorThread(); 102 if (FD_ISSET(wakeup_notifier_.receiver_fd(), &read_fds)) {
103 // Note that the events on FDs other than the wakeup notifier one, if any,
104 // will be processed upon the next select().
46 return; 105 return;
47 } 106 }
48 forwarders_constructor_runner_->PostTask(
49 FROM_HERE,
50 base::Bind(
51 &ForwardersManager::Delegate::ClearOnForwarderConstructorThread,
52 this));
53 }
54 107
55 void ForwardersManager::Delegate::CreateAndStartNewForwarder( 108 // Notify the Forwarder instances and remove the ones that are closed.
56 scoped_ptr<Socket> socket1, 109 const bool must_shutdown = FD_ISSET(
57 scoped_ptr<Socket> socket2) { 110 deletion_notifier_.receiver_fd(), &read_fds);
58 const scoped_refptr<base::SingleThreadTaskRunner> current_task_runner( 111
59 base::MessageLoopProxy::current()); 112 for (size_t i = 0; i < forwarders_.size(); ) {
60 DCHECK(current_task_runner); 113 Forwarder* const forwarder = forwarders_[i];
61 if (forwarders_constructor_runner_) { 114 forwarder->ProcessEvents(read_fds, write_fds);
62 DCHECK_EQ(current_task_runner, forwarders_constructor_runner_); 115
63 } else { 116 if (must_shutdown)
64 forwarders_constructor_runner_ = current_task_runner; 117 forwarder->Shutdown();
118
119 if (!forwarder->IsClosed()) {
120 ++i;
121 continue;
122 }
123
124 std::swap(forwarders_[i], forwarders_.back());
125 forwarders_.pop_back();
65 } 126 }
66 forwarders_.push_back(
67 new Forwarder(socket1.Pass(), socket2.Pass(),
68 &deletion_notifier_,
69 base::Bind(&ForwardersManager::Delegate::OnForwarderError,
70 this)));
71 forwarders_.back()->Start();
72 }
73 127
74 void ForwardersManager::Delegate::OnForwarderError( 128 WaitForEventsOnInternalThreadSoon();
75 scoped_ptr<Forwarder> forwarder) {
76 DCHECK(forwarders_constructor_runner_->RunsTasksOnCurrentThread());
77 const ScopedVector<Forwarder>::iterator it = std::find(
78 forwarders_.begin(), forwarders_.end(), forwarder.get());
79 DCHECK(it != forwarders_.end());
80 std::swap(*it, forwarders_.back());
81 forwarders_.pop_back();
82 ignore_result(forwarder.release()); // Deleted by the pop_back() above.
83 }
84
85 void ForwardersManager::Delegate::ClearOnForwarderConstructorThread() {
86 DCHECK(forwarders_constructor_runner_->RunsTasksOnCurrentThread());
87 deletion_notifier_.Notify();
88 forwarders_.clear();
89 } 129 }
90 130
91 } // namespace forwarder2 131 } // namespace forwarder2
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698