Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(214)

Side by Side Diff: ipc/ipc_sync_message_filter.cc

Issue 2097103002: Revert Mojo-based SyncChannel waiting again (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_sync_message_filter.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_message_filter.h" 5 #include "ipc/ipc_sync_message_filter.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/macros.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/single_thread_task_runner.h" 10 #include "base/single_thread_task_runner.h"
15 #include "base/synchronization/waitable_event.h" 11 #include "base/synchronization/waitable_event.h"
16 #include "base/threading/thread_task_runner_handle.h" 12 #include "base/threading/thread_task_runner_handle.h"
17 #include "ipc/ipc_channel.h" 13 #include "ipc/ipc_channel.h"
18 #include "ipc/ipc_sync_message.h" 14 #include "ipc/ipc_sync_message.h"
19 #include "ipc/mojo_event.h"
20 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
21 15
22 namespace IPC { 16 namespace IPC {
23 17
24 namespace {
25
26 // A generic callback used when watching handles synchronously. Sets |*signal|
27 // to true. Also sets |*error| to true in case of an error.
28 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
29 *signal = true;
30 *error = result != MOJO_RESULT_OK;
31 }
32
33 } // namespace
34
35 // A helper class created by SyncMessageFilter to watch the lifetime of the IO
36 // MessageLoop. This holds a weak ref to the SyncMessageFilter and notifies it
37 // on its own thread if the SyncMessageFilter is still alive at the time of
38 // IO MessageLoop destruction.
39 class SyncMessageFilter::IOMessageLoopObserver
40 : public base::MessageLoop::DestructionObserver,
41 public base::RefCountedThreadSafe<IOMessageLoopObserver> {
42 public:
43 IOMessageLoopObserver(
44 base::WeakPtr<SyncMessageFilter> weak_filter,
45 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner)
46 : weak_filter_(weak_filter), filter_task_runner_(filter_task_runner) {}
47
48 void StartOnIOThread() {
49 DCHECK(!watching_);
50 watching_ = true;
51 io_task_runner_ = base::ThreadTaskRunnerHandle::Get();
52 base::MessageLoop::current()->AddDestructionObserver(this);
53 }
54
55 void Stop() {
56 if (!io_task_runner_)
57 return;
58
59 if (io_task_runner_->BelongsToCurrentThread()) {
60 StopOnIOThread();
61 } else {
62 io_task_runner_->PostTask(
63 FROM_HERE, base::Bind(&IOMessageLoopObserver::StopOnIOThread, this));
64 }
65 }
66
67 private:
68 void StopOnIOThread() {
69 DCHECK(io_task_runner_->BelongsToCurrentThread());
70 if (!watching_)
71 return;
72 watching_ = false;
73 base::MessageLoop::current()->RemoveDestructionObserver(this);
74 }
75
76 // base::MessageLoop::DestructionObserver:
77 void WillDestroyCurrentMessageLoop() override {
78 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread());
79 DCHECK(watching_);
80 StopOnIOThread();
81 filter_task_runner_->PostTask(
82 FROM_HERE,
83 base::Bind(&SyncMessageFilter::OnIOMessageLoopDestroyed, weak_filter_));
84 }
85
86 friend class base::RefCountedThreadSafe<IOMessageLoopObserver>;
87
88 ~IOMessageLoopObserver() override {}
89
90 bool watching_ = false;
91 base::WeakPtr<SyncMessageFilter> weak_filter_;
92 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner_;
93 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
94
95 DISALLOW_COPY_AND_ASSIGN(IOMessageLoopObserver);
96 };
97
98 bool SyncMessageFilter::Send(Message* message) { 18 bool SyncMessageFilter::Send(Message* message) {
99 if (!message->is_sync()) { 19 if (!message->is_sync()) {
100 { 20 {
101 base::AutoLock auto_lock(lock_); 21 base::AutoLock auto_lock(lock_);
102 if (sender_ && is_channel_send_thread_safe_) { 22 if (sender_ && is_channel_send_thread_safe_) {
103 sender_->Send(message); 23 sender_->Send(message);
104 return true; 24 return true;
105 } else if (!io_task_runner_.get()) { 25 } else if (!io_task_runner_.get()) {
106 pending_messages_.emplace_back(base::WrapUnique(message)); 26 pending_messages_.push_back(message);
107 return true; 27 return true;
108 } 28 }
109 } 29 }
110 io_task_runner_->PostTask( 30 io_task_runner_->PostTask(
111 FROM_HERE, 31 FROM_HERE,
112 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 32 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
113 return true; 33 return true;
114 } 34 }
115 35
116 MojoEvent done_event; 36 base::WaitableEvent done_event(
37 base::WaitableEvent::ResetPolicy::MANUAL,
38 base::WaitableEvent::InitialState::NOT_SIGNALED);
117 PendingSyncMsg pending_message( 39 PendingSyncMsg pending_message(
118 SyncMessage::GetMessageId(*message), 40 SyncMessage::GetMessageId(*message),
119 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), 41 static_cast<SyncMessage*>(message)->GetReplyDeserializer(),
120 &done_event); 42 &done_event);
121 43
122 { 44 {
123 base::AutoLock auto_lock(lock_); 45 base::AutoLock auto_lock(lock_);
124 // Can't use this class on the main thread or else it can lead to deadlocks. 46 // Can't use this class on the main thread or else it can lead to deadlocks.
125 // Also by definition, can't use this on IO thread since we're blocking it. 47 // Also by definition, can't use this on IO thread since we're blocking it.
126 if (base::ThreadTaskRunnerHandle::IsSet()) { 48 if (base::ThreadTaskRunnerHandle::IsSet()) {
127 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); 49 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_);
128 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); 50 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_);
129 } 51 }
130 pending_sync_messages_.insert(&pending_message); 52 pending_sync_messages_.insert(&pending_message);
131 53
132 if (io_task_runner_.get()) { 54 if (io_task_runner_.get()) {
133 io_task_runner_->PostTask( 55 io_task_runner_->PostTask(
134 FROM_HERE, 56 FROM_HERE,
135 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 57 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
136 } else { 58 } else {
137 pending_messages_.emplace_back(base::WrapUnique(message)); 59 pending_messages_.push_back(message);
138 } 60 }
139 } 61 }
140 62
141 bool done = false; 63 base::WaitableEvent* events[2] = { shutdown_event_, &done_event };
142 bool shutdown = false; 64 if (base::WaitableEvent::WaitMany(events, 2) == 1) {
143 bool error = false;
144 scoped_refptr<mojo::SyncHandleRegistry> registry =
145 mojo::SyncHandleRegistry::current();
146 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(),
147 MOJO_HANDLE_SIGNAL_READABLE,
148 base::Bind(&OnSyncHandleReady, &shutdown, &error));
149 registry->RegisterHandle(done_event.GetHandle(),
150 MOJO_HANDLE_SIGNAL_READABLE,
151 base::Bind(&OnSyncHandleReady, &done, &error));
152
153 const bool* stop_flags[] = { &done, &shutdown };
154 registry->WatchAllHandles(stop_flags, 2);
155 DCHECK(!error);
156
157 if (done) {
158 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 65 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
159 "SyncMessageFilter::Send", &done_event); 66 "SyncMessageFilter::Send", &done_event);
160 } 67 }
161 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle());
162 registry->UnregisterHandle(done_event.GetHandle());
163 68
164 { 69 {
165 base::AutoLock auto_lock(lock_); 70 base::AutoLock auto_lock(lock_);
166 delete pending_message.deserializer; 71 delete pending_message.deserializer;
167 pending_sync_messages_.erase(&pending_message); 72 pending_sync_messages_.erase(&pending_message);
168 } 73 }
169 74
170 return pending_message.send_result; 75 return pending_message.send_result;
171 } 76 }
172 77
173 void SyncMessageFilter::OnFilterAdded(Sender* sender) { 78 void SyncMessageFilter::OnFilterAdded(Sender* sender) {
174 std::vector<std::unique_ptr<Message>> pending_messages; 79 std::vector<Message*> pending_messages;
175 { 80 {
176 base::AutoLock auto_lock(lock_); 81 base::AutoLock auto_lock(lock_);
177 sender_ = sender; 82 sender_ = sender;
178 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 83 io_task_runner_ = base::ThreadTaskRunnerHandle::Get();
179 shutdown_watcher_.StartWatching( 84 pending_messages_.release(&pending_messages);
180 shutdown_event_,
181 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this));
182 io_message_loop_observer_->StartOnIOThread();
183 std::swap(pending_messages_, pending_messages);
184 } 85 }
185 for (auto& msg : pending_messages) 86 for (auto* msg : pending_messages)
186 SendOnIOThread(msg.release()); 87 SendOnIOThread(msg);
187 } 88 }
188 89
189 void SyncMessageFilter::OnChannelError() { 90 void SyncMessageFilter::OnChannelError() {
190 base::AutoLock auto_lock(lock_); 91 base::AutoLock auto_lock(lock_);
191 sender_ = NULL; 92 sender_ = NULL;
192 shutdown_watcher_.StopWatching();
193 SignalAllEvents(); 93 SignalAllEvents();
194 } 94 }
195 95
196 void SyncMessageFilter::OnChannelClosing() { 96 void SyncMessageFilter::OnChannelClosing() {
197 base::AutoLock auto_lock(lock_); 97 base::AutoLock auto_lock(lock_);
198 sender_ = NULL; 98 sender_ = NULL;
199 shutdown_watcher_.StopWatching();
200 SignalAllEvents(); 99 SignalAllEvents();
201 } 100 }
202 101
203 bool SyncMessageFilter::OnMessageReceived(const Message& message) { 102 bool SyncMessageFilter::OnMessageReceived(const Message& message) {
204 base::AutoLock auto_lock(lock_); 103 base::AutoLock auto_lock(lock_);
205 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 104 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
206 iter != pending_sync_messages_.end(); ++iter) { 105 iter != pending_sync_messages_.end(); ++iter) {
207 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { 106 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) {
208 if (!message.is_reply_error()) { 107 if (!message.is_reply_error()) {
209 (*iter)->send_result = 108 (*iter)->send_result =
210 (*iter)->deserializer->SerializeOutputParameters(message); 109 (*iter)->deserializer->SerializeOutputParameters(message);
211 } 110 }
212 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 111 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
213 "SyncMessageFilter::OnMessageReceived", 112 "SyncMessageFilter::OnMessageReceived",
214 (*iter)->done_event); 113 (*iter)->done_event);
215 (*iter)->done_event->Signal(); 114 (*iter)->done_event->Signal();
216 return true; 115 return true;
217 } 116 }
218 } 117 }
219 118
220 return false; 119 return false;
221 } 120 }
222 121
223 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event, 122 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event,
224 bool is_channel_send_thread_safe) 123 bool is_channel_send_thread_safe)
225 : sender_(NULL), 124 : sender_(NULL),
226 is_channel_send_thread_safe_(is_channel_send_thread_safe), 125 is_channel_send_thread_safe_(is_channel_send_thread_safe),
227 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 126 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
228 shutdown_event_(shutdown_event), 127 shutdown_event_(shutdown_event) {
229 weak_factory_(this) {
230 io_message_loop_observer_ = new IOMessageLoopObserver(
231 weak_factory_.GetWeakPtr(), listener_task_runner_);
232 } 128 }
233 129
234 SyncMessageFilter::~SyncMessageFilter() { 130 SyncMessageFilter::~SyncMessageFilter() {
235 io_message_loop_observer_->Stop();
236 } 131 }
237 132
238 void SyncMessageFilter::SendOnIOThread(Message* message) { 133 void SyncMessageFilter::SendOnIOThread(Message* message) {
239 if (sender_) { 134 if (sender_) {
240 sender_->Send(message); 135 sender_->Send(message);
241 return; 136 return;
242 } 137 }
243 138
244 if (message->is_sync()) { 139 if (message->is_sync()) {
245 // We don't know which thread sent it, but it doesn't matter, just signal 140 // We don't know which thread sent it, but it doesn't matter, just signal
246 // them all. 141 // them all.
247 base::AutoLock auto_lock(lock_); 142 base::AutoLock auto_lock(lock_);
248 SignalAllEvents(); 143 SignalAllEvents();
249 } 144 }
250 145
251 delete message; 146 delete message;
252 } 147 }
253 148
254 void SyncMessageFilter::SignalAllEvents() { 149 void SyncMessageFilter::SignalAllEvents() {
255 lock_.AssertAcquired(); 150 lock_.AssertAcquired();
256 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 151 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
257 iter != pending_sync_messages_.end(); ++iter) { 152 iter != pending_sync_messages_.end(); ++iter) {
258 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 153 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
259 "SyncMessageFilter::SignalAllEvents", 154 "SyncMessageFilter::SignalAllEvents",
260 (*iter)->done_event); 155 (*iter)->done_event);
261 (*iter)->done_event->Signal(); 156 (*iter)->done_event->Signal();
262 } 157 }
263 } 158 }
264 159
265 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) {
266 DCHECK_EQ(event, shutdown_event_);
267 shutdown_mojo_event_.Signal();
268 }
269
270 void SyncMessageFilter::OnIOMessageLoopDestroyed() {
271 // Since we use an async WaitableEventWatcher to watch the shutdown event
272 // from the IO thread, we can't forward the shutdown signal after the IO
273 // message loop is destroyed. Since that destruction indicates shutdown
274 // anyway, we manually signal the shutdown event in this case.
275 shutdown_mojo_event_.Signal();
276 }
277
278 } // namespace IPC 160 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_message_filter.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698