Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(124)

Side by Side Diff: ipc/ipc_sync_message_filter.cc

Issue 2033243003: Use Mojo pipes to signal sync IPC events (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_message_filter.h" 5 #include "ipc/ipc_sync_message_filter.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/message_loop/message_loop.h"
10 #include "base/single_thread_task_runner.h" 13 #include "base/single_thread_task_runner.h"
11 #include "base/synchronization/waitable_event.h" 14 #include "base/synchronization/waitable_event.h"
12 #include "base/threading/thread_task_runner_handle.h" 15 #include "base/threading/thread_task_runner_handle.h"
13 #include "ipc/ipc_channel.h" 16 #include "ipc/ipc_channel.h"
14 #include "ipc/ipc_sync_message.h" 17 #include "ipc/ipc_sync_message.h"
18 #include "ipc/mojo_event.h"
19 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
15 20
16 namespace IPC { 21 namespace IPC {
17 22
23 namespace {
24
25 // A generic callback used when watching handles synchronously. Sets |*signal|
26 // to true. Also sets |*error| to true in case of an error.
27 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
28 *signal = true;
29 *error = result != MOJO_RESULT_OK;
30 }
31
32 } // namespace
33
18 bool SyncMessageFilter::Send(Message* message) { 34 bool SyncMessageFilter::Send(Message* message) {
19 if (!message->is_sync()) { 35 if (!message->is_sync()) {
20 { 36 {
21 base::AutoLock auto_lock(lock_); 37 base::AutoLock auto_lock(lock_);
22 if (!io_task_runner_.get()) { 38 if (!io_task_runner_.get()) {
23 pending_messages_.push_back(message); 39 pending_messages_.emplace_back(base::WrapUnique(message));
24 return true; 40 return true;
25 } 41 }
26 } 42 }
27 io_task_runner_->PostTask( 43 io_task_runner_->PostTask(
28 FROM_HERE, 44 FROM_HERE,
29 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 45 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
30 return true; 46 return true;
31 } 47 }
32 48
33 base::WaitableEvent done_event( 49 MojoEvent done_event;
34 base::WaitableEvent::ResetPolicy::MANUAL,
35 base::WaitableEvent::InitialState::NOT_SIGNALED);
36 PendingSyncMsg pending_message( 50 PendingSyncMsg pending_message(
37 SyncMessage::GetMessageId(*message), 51 SyncMessage::GetMessageId(*message),
38 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), 52 static_cast<SyncMessage*>(message)->GetReplyDeserializer(),
39 &done_event); 53 &done_event);
40 54
41 { 55 {
42 base::AutoLock auto_lock(lock_); 56 base::AutoLock auto_lock(lock_);
43 // Can't use this class on the main thread or else it can lead to deadlocks. 57 // Can't use this class on the main thread or else it can lead to deadlocks.
44 // Also by definition, can't use this on IO thread since we're blocking it. 58 // Also by definition, can't use this on IO thread since we're blocking it.
45 if (base::ThreadTaskRunnerHandle::IsSet()) { 59 if (base::ThreadTaskRunnerHandle::IsSet()) {
46 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); 60 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_);
47 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); 61 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_);
48 } 62 }
49 pending_sync_messages_.insert(&pending_message); 63 pending_sync_messages_.insert(&pending_message);
50 64
51 if (io_task_runner_.get()) { 65 if (io_task_runner_.get()) {
52 io_task_runner_->PostTask( 66 io_task_runner_->PostTask(
53 FROM_HERE, 67 FROM_HERE,
54 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 68 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
55 } else { 69 } else {
56 pending_messages_.push_back(message); 70 pending_messages_.emplace_back(base::WrapUnique(message));
57 } 71 }
58 } 72 }
59 73
60 base::WaitableEvent* events[2] = { shutdown_event_, &done_event }; 74 bool done = false;
61 if (base::WaitableEvent::WaitMany(events, 2) == 1) { 75 bool shutdown = false;
76 bool error = false;
77 scoped_refptr<mojo::SyncHandleRegistry> registry =
78 mojo::SyncHandleRegistry::current();
79 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(),
80 MOJO_HANDLE_SIGNAL_READABLE,
81 base::Bind(&OnSyncHandleReady, &shutdown, &error));
82 registry->RegisterHandle(done_event.GetHandle(),
83 MOJO_HANDLE_SIGNAL_READABLE,
84 base::Bind(&OnSyncHandleReady, &done, &error));
85
86 const bool* stop_flags[] = { &done, &shutdown };
87 bool result = registry->WatchAllHandles(stop_flags, 2);
88 DCHECK(result);
89 DCHECK(!error);
90
91 if (done) {
62 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 92 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
63 "SyncMessageFilter::Send", &done_event); 93 "SyncMessageFilter::Send", &done_event);
64 } 94 }
95 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle());
96 registry->UnregisterHandle(done_event.GetHandle());
65 97
66 { 98 {
67 base::AutoLock auto_lock(lock_); 99 base::AutoLock auto_lock(lock_);
68 delete pending_message.deserializer; 100 delete pending_message.deserializer;
69 pending_sync_messages_.erase(&pending_message); 101 pending_sync_messages_.erase(&pending_message);
70 } 102 }
71 103
72 return pending_message.send_result; 104 return pending_message.send_result;
73 } 105 }
74 106
75 void SyncMessageFilter::OnFilterAdded(Sender* sender) { 107 void SyncMessageFilter::OnFilterAdded(Sender* sender) {
76 std::vector<Message*> pending_messages; 108 std::vector<std::unique_ptr<Message>> pending_messages;
77 { 109 {
78 base::AutoLock auto_lock(lock_); 110 base::AutoLock auto_lock(lock_);
79 sender_ = sender; 111 sender_ = sender;
80 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 112 io_task_runner_ = base::ThreadTaskRunnerHandle::Get();
81 pending_messages_.release(&pending_messages); 113 shutdown_watcher_.StartWatching(
114 shutdown_event_,
115 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this));
116 std::swap(pending_messages_, pending_messages);
82 } 117 }
83 for (auto* msg : pending_messages) 118 for (auto& msg : pending_messages)
84 SendOnIOThread(msg); 119 SendOnIOThread(msg.release());
85 } 120 }
86 121
87 void SyncMessageFilter::OnChannelError() { 122 void SyncMessageFilter::OnChannelError() {
88 base::AutoLock auto_lock(lock_); 123 base::AutoLock auto_lock(lock_);
89 sender_ = NULL; 124 sender_ = NULL;
125 shutdown_watcher_.StopWatching();
90 SignalAllEvents(); 126 SignalAllEvents();
91 } 127 }
92 128
93 void SyncMessageFilter::OnChannelClosing() { 129 void SyncMessageFilter::OnChannelClosing() {
94 base::AutoLock auto_lock(lock_); 130 base::AutoLock auto_lock(lock_);
95 sender_ = NULL; 131 sender_ = NULL;
132 shutdown_watcher_.StopWatching();
96 SignalAllEvents(); 133 SignalAllEvents();
97 } 134 }
98 135
99 bool SyncMessageFilter::OnMessageReceived(const Message& message) { 136 bool SyncMessageFilter::OnMessageReceived(const Message& message) {
100 base::AutoLock auto_lock(lock_); 137 base::AutoLock auto_lock(lock_);
101 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 138 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
102 iter != pending_sync_messages_.end(); ++iter) { 139 iter != pending_sync_messages_.end(); ++iter) {
103 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { 140 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) {
104 if (!message.is_reply_error()) { 141 if (!message.is_reply_error()) {
105 (*iter)->send_result = 142 (*iter)->send_result =
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
160 lock_.AssertAcquired(); 197 lock_.AssertAcquired();
161 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 198 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
162 iter != pending_sync_messages_.end(); ++iter) { 199 iter != pending_sync_messages_.end(); ++iter) {
163 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 200 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
164 "SyncMessageFilter::SignalAllEvents", 201 "SyncMessageFilter::SignalAllEvents",
165 (*iter)->done_event); 202 (*iter)->done_event);
166 (*iter)->done_event->Signal(); 203 (*iter)->done_event->Signal();
167 } 204 }
168 } 205 }
169 206
207 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) {
208 DCHECK_EQ(event, shutdown_event_);
209 shutdown_mojo_event_.Signal();
210 }
211
170 } // namespace IPC 212 } // namespace IPC
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698