Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(26)

Side by Side Diff: ipc/ipc_sync_message_filter.cc

Issue 2101163002: Reland Mojo-based waiting for IPC::SyncChannel (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_sync_message_filter.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_message_filter.h" 5 #include "ipc/ipc_sync_message_filter.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/macros.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/message_loop/message_loop.h"
10 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
11 #include "base/synchronization/waitable_event.h" 15 #include "base/synchronization/waitable_event.h"
12 #include "base/threading/thread_task_runner_handle.h" 16 #include "base/threading/thread_task_runner_handle.h"
13 #include "ipc/ipc_channel.h" 17 #include "ipc/ipc_channel.h"
14 #include "ipc/ipc_sync_message.h" 18 #include "ipc/ipc_sync_message.h"
19 #include "ipc/mojo_event.h"
20 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
15 21
16 namespace IPC { 22 namespace IPC {
17 23
24 namespace {
25
26 // A generic callback used when watching handles synchronously. Sets |*signal|
27 // to true. Also sets |*error| to true in case of an error.
28 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
29 *signal = true;
30 *error = result != MOJO_RESULT_OK;
31 }
32
33 } // namespace
34
35 // A helper class created by SyncMessageFilter to watch the lifetime of the IO
36 // MessageLoop. This holds a weak ref to the SyncMessageFilter and notifies it
37 // on its own thread if the SyncMessageFilter is still alive at the time of
38 // IO MessageLoop destruction.
39 class SyncMessageFilter::IOMessageLoopObserver
40 : public base::MessageLoop::DestructionObserver,
41 public base::RefCountedThreadSafe<IOMessageLoopObserver> {
42 public:
43 IOMessageLoopObserver(
44 base::WeakPtr<SyncMessageFilter> weak_filter,
45 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner)
46 : weak_filter_(weak_filter), filter_task_runner_(filter_task_runner) {}
47
48 void StartOnIOThread() {
49 DCHECK(!watching_);
50 watching_ = true;
51 io_task_runner_ = base::ThreadTaskRunnerHandle::Get();
52 base::MessageLoop::current()->AddDestructionObserver(this);
53 }
54
55 void Stop() {
56 if (!io_task_runner_)
57 return;
58
59 if (io_task_runner_->BelongsToCurrentThread()) {
60 StopOnIOThread();
61 } else {
62 io_task_runner_->PostTask(
63 FROM_HERE, base::Bind(&IOMessageLoopObserver::StopOnIOThread, this));
64 }
65 }
66
67 private:
68 void StopOnIOThread() {
69 DCHECK(io_task_runner_->BelongsToCurrentThread());
70 if (!watching_)
71 return;
72 watching_ = false;
73 base::MessageLoop::current()->RemoveDestructionObserver(this);
74 }
75
76 // base::MessageLoop::DestructionObserver:
77 void WillDestroyCurrentMessageLoop() override {
78 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread());
79 DCHECK(watching_);
80 StopOnIOThread();
81 filter_task_runner_->PostTask(
82 FROM_HERE,
83 base::Bind(&SyncMessageFilter::OnIOMessageLoopDestroyed, weak_filter_));
84 }
85
86 friend class base::RefCountedThreadSafe<IOMessageLoopObserver>;
87
88 ~IOMessageLoopObserver() override {}
89
90 bool watching_ = false;
91 base::WeakPtr<SyncMessageFilter> weak_filter_;
92 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner_;
93 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
94
95 DISALLOW_COPY_AND_ASSIGN(IOMessageLoopObserver);
96 };
97
18 bool SyncMessageFilter::Send(Message* message) { 98 bool SyncMessageFilter::Send(Message* message) {
19 if (!message->is_sync()) { 99 if (!message->is_sync()) {
20 { 100 {
21 base::AutoLock auto_lock(lock_); 101 base::AutoLock auto_lock(lock_);
22 if (sender_ && is_channel_send_thread_safe_) { 102 if (sender_ && is_channel_send_thread_safe_) {
23 sender_->Send(message); 103 sender_->Send(message);
24 return true; 104 return true;
25 } else if (!io_task_runner_.get()) { 105 } else if (!io_task_runner_.get()) {
26 pending_messages_.push_back(message); 106 pending_messages_.emplace_back(base::WrapUnique(message));
27 return true; 107 return true;
28 } 108 }
29 } 109 }
30 io_task_runner_->PostTask( 110 io_task_runner_->PostTask(
31 FROM_HERE, 111 FROM_HERE,
32 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 112 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
33 return true; 113 return true;
34 } 114 }
35 115
36 base::WaitableEvent done_event( 116 MojoEvent done_event;
37 base::WaitableEvent::ResetPolicy::MANUAL,
38 base::WaitableEvent::InitialState::NOT_SIGNALED);
39 PendingSyncMsg pending_message( 117 PendingSyncMsg pending_message(
40 SyncMessage::GetMessageId(*message), 118 SyncMessage::GetMessageId(*message),
41 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), 119 static_cast<SyncMessage*>(message)->GetReplyDeserializer(),
42 &done_event); 120 &done_event);
43 121
44 { 122 {
45 base::AutoLock auto_lock(lock_); 123 base::AutoLock auto_lock(lock_);
46 // Can't use this class on the main thread or else it can lead to deadlocks. 124 // Can't use this class on the main thread or else it can lead to deadlocks.
47 // Also by definition, can't use this on IO thread since we're blocking it. 125 // Also by definition, can't use this on IO thread since we're blocking it.
48 if (base::ThreadTaskRunnerHandle::IsSet()) { 126 if (base::ThreadTaskRunnerHandle::IsSet()) {
49 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); 127 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_);
50 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); 128 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_);
51 } 129 }
52 pending_sync_messages_.insert(&pending_message); 130 pending_sync_messages_.insert(&pending_message);
53 131
54 if (io_task_runner_.get()) { 132 if (io_task_runner_.get()) {
55 io_task_runner_->PostTask( 133 io_task_runner_->PostTask(
56 FROM_HERE, 134 FROM_HERE,
57 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 135 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
58 } else { 136 } else {
59 pending_messages_.push_back(message); 137 pending_messages_.emplace_back(base::WrapUnique(message));
60 } 138 }
61 } 139 }
62 140
63 base::WaitableEvent* events[2] = { shutdown_event_, &done_event }; 141 bool done = false;
64 if (base::WaitableEvent::WaitMany(events, 2) == 1) { 142 bool shutdown = false;
143 bool error = false;
144 scoped_refptr<mojo::SyncHandleRegistry> registry =
145 mojo::SyncHandleRegistry::current();
146 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(),
147 MOJO_HANDLE_SIGNAL_READABLE,
148 base::Bind(&OnSyncHandleReady, &shutdown, &error));
149 registry->RegisterHandle(done_event.GetHandle(),
150 MOJO_HANDLE_SIGNAL_READABLE,
151 base::Bind(&OnSyncHandleReady, &done, &error));
152
153 const bool* stop_flags[] = { &done, &shutdown };
154 registry->WatchAllHandles(stop_flags, 2);
155 DCHECK(!error);
156
157 if (done) {
65 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 158 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
66 "SyncMessageFilter::Send", &done_event); 159 "SyncMessageFilter::Send", &done_event);
67 } 160 }
161 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle());
162 registry->UnregisterHandle(done_event.GetHandle());
68 163
69 { 164 {
70 base::AutoLock auto_lock(lock_); 165 base::AutoLock auto_lock(lock_);
71 delete pending_message.deserializer; 166 delete pending_message.deserializer;
72 pending_sync_messages_.erase(&pending_message); 167 pending_sync_messages_.erase(&pending_message);
73 } 168 }
74 169
75 return pending_message.send_result; 170 return pending_message.send_result;
76 } 171 }
77 172
78 void SyncMessageFilter::OnFilterAdded(Sender* sender) { 173 void SyncMessageFilter::OnFilterAdded(Sender* sender) {
79 std::vector<Message*> pending_messages; 174 std::vector<std::unique_ptr<Message>> pending_messages;
80 { 175 {
81 base::AutoLock auto_lock(lock_); 176 base::AutoLock auto_lock(lock_);
82 sender_ = sender; 177 sender_ = sender;
83 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 178 io_task_runner_ = base::ThreadTaskRunnerHandle::Get();
84 pending_messages_.release(&pending_messages); 179 shutdown_watcher_.StartWatching(
180 shutdown_event_,
181 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this));
182 io_message_loop_observer_->StartOnIOThread();
183 std::swap(pending_messages_, pending_messages);
85 } 184 }
86 for (auto* msg : pending_messages) 185 for (auto& msg : pending_messages)
87 SendOnIOThread(msg); 186 SendOnIOThread(msg.release());
88 } 187 }
89 188
90 void SyncMessageFilter::OnChannelError() { 189 void SyncMessageFilter::OnChannelError() {
91 base::AutoLock auto_lock(lock_); 190 base::AutoLock auto_lock(lock_);
92 sender_ = NULL; 191 sender_ = NULL;
192 shutdown_watcher_.StopWatching();
93 SignalAllEvents(); 193 SignalAllEvents();
94 } 194 }
95 195
96 void SyncMessageFilter::OnChannelClosing() { 196 void SyncMessageFilter::OnChannelClosing() {
97 base::AutoLock auto_lock(lock_); 197 base::AutoLock auto_lock(lock_);
98 sender_ = NULL; 198 sender_ = NULL;
199 shutdown_watcher_.StopWatching();
99 SignalAllEvents(); 200 SignalAllEvents();
100 } 201 }
101 202
102 bool SyncMessageFilter::OnMessageReceived(const Message& message) { 203 bool SyncMessageFilter::OnMessageReceived(const Message& message) {
103 base::AutoLock auto_lock(lock_); 204 base::AutoLock auto_lock(lock_);
104 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 205 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
105 iter != pending_sync_messages_.end(); ++iter) { 206 iter != pending_sync_messages_.end(); ++iter) {
106 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { 207 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) {
107 if (!message.is_reply_error()) { 208 if (!message.is_reply_error()) {
108 (*iter)->send_result = 209 (*iter)->send_result =
109 (*iter)->deserializer->SerializeOutputParameters(message); 210 (*iter)->deserializer->SerializeOutputParameters(message);
110 } 211 }
111 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 212 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
112 "SyncMessageFilter::OnMessageReceived", 213 "SyncMessageFilter::OnMessageReceived",
113 (*iter)->done_event); 214 (*iter)->done_event);
114 (*iter)->done_event->Signal(); 215 (*iter)->done_event->Signal();
115 return true; 216 return true;
116 } 217 }
117 } 218 }
118 219
119 return false; 220 return false;
120 } 221 }
121 222
122 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event, 223 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event,
123 bool is_channel_send_thread_safe) 224 bool is_channel_send_thread_safe)
124 : sender_(NULL), 225 : sender_(NULL),
125 is_channel_send_thread_safe_(is_channel_send_thread_safe), 226 is_channel_send_thread_safe_(is_channel_send_thread_safe),
126 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 227 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
127 shutdown_event_(shutdown_event) { 228 shutdown_event_(shutdown_event),
229 weak_factory_(this) {
230 io_message_loop_observer_ = new IOMessageLoopObserver(
231 weak_factory_.GetWeakPtr(), listener_task_runner_);
128 } 232 }
129 233
130 SyncMessageFilter::~SyncMessageFilter() { 234 SyncMessageFilter::~SyncMessageFilter() {
235 io_message_loop_observer_->Stop();
131 } 236 }
132 237
133 void SyncMessageFilter::SendOnIOThread(Message* message) { 238 void SyncMessageFilter::SendOnIOThread(Message* message) {
134 if (sender_) { 239 if (sender_) {
135 sender_->Send(message); 240 sender_->Send(message);
136 return; 241 return;
137 } 242 }
138 243
139 if (message->is_sync()) { 244 if (message->is_sync()) {
140 // We don't know which thread sent it, but it doesn't matter, just signal 245 // We don't know which thread sent it, but it doesn't matter, just signal
141 // them all. 246 // them all.
142 base::AutoLock auto_lock(lock_); 247 base::AutoLock auto_lock(lock_);
143 SignalAllEvents(); 248 SignalAllEvents();
144 } 249 }
145 250
146 delete message; 251 delete message;
147 } 252 }
148 253
149 void SyncMessageFilter::SignalAllEvents() { 254 void SyncMessageFilter::SignalAllEvents() {
150 lock_.AssertAcquired(); 255 lock_.AssertAcquired();
151 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 256 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
152 iter != pending_sync_messages_.end(); ++iter) { 257 iter != pending_sync_messages_.end(); ++iter) {
153 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 258 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
154 "SyncMessageFilter::SignalAllEvents", 259 "SyncMessageFilter::SignalAllEvents",
155 (*iter)->done_event); 260 (*iter)->done_event);
156 (*iter)->done_event->Signal(); 261 (*iter)->done_event->Signal();
157 } 262 }
158 } 263 }
159 264
265 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) {
266 DCHECK_EQ(event, shutdown_event_);
267 shutdown_mojo_event_.Signal();
268 }
269
270 void SyncMessageFilter::OnIOMessageLoopDestroyed() {
271 // Since we use an async WaitableEventWatcher to watch the shutdown event
272 // from the IO thread, we can't forward the shutdown signal after the IO
273 // message loop is destroyed. Since that destruction indicates shutdown
274 // anyway, we manually signal the shutdown event in this case.
275 shutdown_mojo_event_.Signal();
276 }
277
160 } // namespace IPC 278 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_message_filter.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698