Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(130)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 2754143005: Use WaitableEvents to wake up sync IPC waiting (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/BUILD.gn ('k') | mojo/public/cpp/bindings/lib/sync_event_watcher.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h" 14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h" 15 #include "base/single_thread_task_runner.h"
16 #include "base/stl_util.h" 16 #include "base/stl_util.h"
17 #include "base/synchronization/waitable_event.h"
17 #include "base/threading/thread_task_runner_handle.h" 18 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" 21 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" 22 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
22 23
23 namespace mojo { 24 namespace mojo {
24 namespace internal { 25 namespace internal {
25 26
26 // InterfaceEndpoint stores the information of an interface endpoint registered 27 // InterfaceEndpoint stores the information of an interface endpoint registered
27 // with the router. 28 // with the router.
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to 29 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
29 // this object. 30 // this object.
30 class MultiplexRouter::InterfaceEndpoint 31 class MultiplexRouter::InterfaceEndpoint
31 : public base::RefCounted<InterfaceEndpoint>, 32 : public base::RefCounted<InterfaceEndpoint>,
32 public InterfaceEndpointController { 33 public InterfaceEndpointController {
33 public: 34 public:
34 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) 35 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
35 : router_(router), 36 : router_(router),
36 id_(id), 37 id_(id),
37 closed_(false), 38 closed_(false),
38 peer_closed_(false), 39 peer_closed_(false),
39 handle_created_(false), 40 handle_created_(false),
40 client_(nullptr), 41 client_(nullptr) {}
41 event_signalled_(false) {}
42 42
43 // --------------------------------------------------------------------------- 43 // ---------------------------------------------------------------------------
44 // The following public methods are safe to call from any threads without 44 // The following public methods are safe to call from any threads without
45 // locking. 45 // locking.
46 46
47 InterfaceId id() const { return id_; } 47 InterfaceId id() const { return id_; }
48 48
49 // --------------------------------------------------------------------------- 49 // ---------------------------------------------------------------------------
50 // The following public methods are called under the router's lock. 50 // The following public methods are called under the router's lock.
51 51
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 DCHECK(task_runner_->BelongsToCurrentThread()); 101 DCHECK(task_runner_->BelongsToCurrentThread());
102 DCHECK(!closed_); 102 DCHECK(!closed_);
103 103
104 task_runner_ = nullptr; 104 task_runner_ = nullptr;
105 client_ = nullptr; 105 client_ = nullptr;
106 sync_watcher_.reset(); 106 sync_watcher_.reset();
107 } 107 }
108 108
109 void SignalSyncMessageEvent() { 109 void SignalSyncMessageEvent() {
110 router_->AssertLockAcquired(); 110 router_->AssertLockAcquired();
111 if (event_signalled_) 111 if (sync_message_event_signaled_)
112 return; 112 return;
113 113 sync_message_event_signaled_ = true;
114 event_signalled_ = true; 114 if (sync_message_event_)
115 if (!sync_message_event_sender_.is_valid()) 115 sync_message_event_->Signal();
116 return;
117
118 MojoResult result =
119 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
120 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
121 DCHECK_EQ(MOJO_RESULT_OK, result);
122 } 116 }
123 117
124 void ResetSyncMessageSignal() { 118 void ResetSyncMessageSignal() {
125 router_->AssertLockAcquired(); 119 router_->AssertLockAcquired();
126 120 if (!sync_message_event_signaled_)
127 if (!event_signalled_)
128 return; 121 return;
129 122 sync_message_event_signaled_ = false;
130 event_signalled_ = false; 123 if (sync_message_event_)
131 if (!sync_message_event_receiver_.is_valid()) 124 sync_message_event_->Reset();
132 return;
133
134 MojoResult result =
135 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr,
136 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
137 DCHECK_EQ(MOJO_RESULT_OK, result);
138 } 125 }
139 126
140 // --------------------------------------------------------------------------- 127 // ---------------------------------------------------------------------------
141 // The following public methods (i.e., InterfaceEndpointController 128 // The following public methods (i.e., InterfaceEndpointController
142 // implementation) are called by the client on the same thread as the 129 // implementation) are called by the client on the same thread as the
143 // AttachClient() call. They are called outside of the router's lock. 130 // AttachClient() call. They are called outside of the router's lock.
144 131
145 bool SendMessage(Message* message) override { 132 bool SendMessage(Message* message) override {
146 DCHECK(task_runner_->BelongsToCurrentThread()); 133 DCHECK(task_runner_->BelongsToCurrentThread());
147 message->set_interface_id(id_); 134 message->set_interface_id(id_);
(...skipping 19 matching lines...) Expand all
167 154
168 ~InterfaceEndpoint() override { 155 ~InterfaceEndpoint() override {
169 router_->AssertLockAcquired(); 156 router_->AssertLockAcquired();
170 157
171 DCHECK(!client_); 158 DCHECK(!client_);
172 DCHECK(closed_); 159 DCHECK(closed_);
173 DCHECK(peer_closed_); 160 DCHECK(peer_closed_);
174 DCHECK(!sync_watcher_); 161 DCHECK(!sync_watcher_);
175 } 162 }
176 163
177 void OnHandleReady(MojoResult result) { 164 void OnSyncEventSignaled() {
178 DCHECK(task_runner_->BelongsToCurrentThread()); 165 DCHECK(task_runner_->BelongsToCurrentThread());
179 scoped_refptr<MultiplexRouter> router_protector(router_); 166 scoped_refptr<MultiplexRouter> router_protector(router_);
180 167
181 // Because we never close |sync_message_event_{sender,receiver}_| before
182 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
183 DCHECK_EQ(MOJO_RESULT_OK, result);
184
185 MayAutoLock locker(&router_->lock_); 168 MayAutoLock locker(&router_->lock_);
186 scoped_refptr<InterfaceEndpoint> self_protector(this); 169 scoped_refptr<InterfaceEndpoint> self_protector(this);
187 170
188 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); 171 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
189 172
190 if (!more_to_process) 173 if (!more_to_process)
191 ResetSyncMessageSignal(); 174 ResetSyncMessageSignal();
192 175
193 // Currently there are no queued sync messages and the peer has closed so 176 // Currently there are no queued sync messages and the peer has closed so
194 // there won't be incoming sync messages in the future. 177 // there won't be incoming sync messages in the future.
195 if (!more_to_process && peer_closed_) { 178 if (!more_to_process && peer_closed_) {
196 // If a SyncWatch() call (or multiple ones) of this interface endpoint is 179 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
197 // on the call stack, resetting the sync watcher will allow it to exit 180 // on the call stack, resetting the sync watcher will allow it to exit
198 // when the call stack unwinds to that frame. 181 // when the call stack unwinds to that frame.
199 sync_watcher_.reset(); 182 sync_watcher_.reset();
200 } 183 }
201 } 184 }
202 185
203 void EnsureSyncWatcherExists() { 186 void EnsureSyncWatcherExists() {
204 DCHECK(task_runner_->BelongsToCurrentThread()); 187 DCHECK(task_runner_->BelongsToCurrentThread());
205 if (sync_watcher_) 188 if (sync_watcher_)
206 return; 189 return;
207 190
208 { 191 {
209 MayAutoLock locker(&router_->lock_); 192 MayAutoLock locker(&router_->lock_);
210 193 if (!sync_message_event_) {
211 if (!sync_message_event_sender_.is_valid()) { 194 sync_message_event_.emplace(
212 MojoResult result = 195 base::WaitableEvent::ResetPolicy::MANUAL,
213 CreateMessagePipe(nullptr, &sync_message_event_sender_, 196 base::WaitableEvent::InitialState::NOT_SIGNALED);
yzshen1 2017/03/24 16:41:47 If the flag |event_signaled_| is already set to tr
Ken Rockot(use gerrit already) 2017/03/24 16:43:55 Done
214 &sync_message_event_receiver_);
215 DCHECK_EQ(MOJO_RESULT_OK, result);
216
217 if (event_signalled_) {
218 // Reset the flag so that SignalSyncMessageEvent() will actually
219 // signal using the newly-created message pipe.
220 event_signalled_ = false;
221 SignalSyncMessageEvent();
222 }
223 } 197 }
224 } 198 }
225 199 sync_watcher_.reset(
226 sync_watcher_.reset(new SyncHandleWatcher( 200 new SyncEventWatcher(&sync_message_event_.value(),
227 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, 201 base::Bind(&InterfaceEndpoint::OnSyncEventSignaled,
228 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); 202 base::Unretained(this))));
229 } 203 }
230 204
231 // --------------------------------------------------------------------------- 205 // ---------------------------------------------------------------------------
232 // The following members are safe to access from any threads. 206 // The following members are safe to access from any threads.
233 207
234 MultiplexRouter* const router_; 208 MultiplexRouter* const router_;
235 const InterfaceId id_; 209 const InterfaceId id_;
236 210
237 // --------------------------------------------------------------------------- 211 // ---------------------------------------------------------------------------
238 // The following members are accessed under the router's lock. 212 // The following members are accessed under the router's lock.
239 213
240 // Whether the endpoint has been closed. 214 // Whether the endpoint has been closed.
241 bool closed_; 215 bool closed_;
242 // Whether the peer endpoint has been closed. 216 // Whether the peer endpoint has been closed.
243 bool peer_closed_; 217 bool peer_closed_;
244 218
245 // Whether there is already a ScopedInterfaceEndpointHandle created for this 219 // Whether there is already a ScopedInterfaceEndpointHandle created for this
246 // endpoint. 220 // endpoint.
247 bool handle_created_; 221 bool handle_created_;
248 222
249 base::Optional<DisconnectReason> disconnect_reason_; 223 base::Optional<DisconnectReason> disconnect_reason_;
250 224
251 // The task runner on which |client_|'s methods can be called. 225 // The task runner on which |client_|'s methods can be called.
252 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 226 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
253 // Not owned. It is null if no client is attached to this endpoint. 227 // Not owned. It is null if no client is attached to this endpoint.
254 InterfaceEndpointClient* client_; 228 InterfaceEndpointClient* client_;
255 229
256 // A message pipe used as an event to signal that sync messages are available. 230 // An event used to signal that sync messages are available. The event is
257 // The message pipe handles are initialized under the router's lock and remain 231 // initialized under the router's lock and remains unchanged afterwards. It
258 // unchanged afterwards. They may be accessed outside of the router's lock 232 // may be accessed outside of the router's lock later.
259 // later. 233 base::Optional<base::WaitableEvent> sync_message_event_;
260 ScopedMessagePipeHandle sync_message_event_sender_; 234 bool sync_message_event_signaled_ = false;
261 ScopedMessagePipeHandle sync_message_event_receiver_;
262 bool event_signalled_;
263 235
264 // --------------------------------------------------------------------------- 236 // ---------------------------------------------------------------------------
265 // The following members are only valid while a client is attached. They are 237 // The following members are only valid while a client is attached. They are
266 // used exclusively on the client's thread. They may be accessed outside of 238 // used exclusively on the client's thread. They may be accessed outside of
267 // the router's lock. 239 // the router's lock.
268 240
269 std::unique_ptr<SyncHandleWatcher> sync_watcher_; 241 std::unique_ptr<SyncEventWatcher> sync_watcher_;
270 242
271 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); 243 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
272 }; 244 };
273 245
274 // MessageWrapper objects are always destroyed under the router's lock. On 246 // MessageWrapper objects are always destroyed under the router's lock. On
275 // destruction, if the message it wrappers contains 247 // destruction, if the message it wrappers contains
276 // ScopedInterfaceEndpointHandles (which cannot be destructed under the 248 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
277 // router's lock), the wrapper unlocks to clean them up. 249 // router's lock), the wrapper unlocks to clean them up.
278 class MultiplexRouter::MessageWrapper { 250 class MultiplexRouter::MessageWrapper {
279 public: 251 public:
(...skipping 697 matching lines...) Expand 10 before | Expand all | Expand 10 after
977 949
978 void MultiplexRouter::AssertLockAcquired() { 950 void MultiplexRouter::AssertLockAcquired() {
979 #if DCHECK_IS_ON() 951 #if DCHECK_IS_ON()
980 if (lock_) 952 if (lock_)
981 lock_->AssertAcquired(); 953 lock_->AssertAcquired();
982 #endif 954 #endif
983 } 955 }
984 956
985 } // namespace internal 957 } // namespace internal
986 } // namespace mojo 958 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/BUILD.gn ('k') | mojo/public/cpp/bindings/lib/sync_event_watcher.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698