Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(304)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 1823683006: Mojo C++ bindings: sync call support for associated interfaces and master interfaces (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/router.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/macros.h" 12 #include "base/macros.h"
13 #include "base/message_loop/message_loop.h" 13 #include "base/message_loop/message_loop.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/stl_util.h" 15 #include "base/stl_util.h"
16 #include "mojo/public/cpp/bindings/associated_group.h" 16 #include "mojo/public/cpp/bindings/associated_group.h"
17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
18 #include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h"
19 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
18 20
19 namespace mojo { 21 namespace mojo {
20 namespace internal { 22 namespace internal {
21 23
22 // InterfaceEndpoint stores the information of an interface endpoint registered 24 // InterfaceEndpoint stores the information of an interface endpoint registered
23 // with the router. Always accessed under the router's lock. 25 // with the router.
24 // No one other than the router's |endpoints_| and |tasks_| should hold refs to 26 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
25 // this object. 27 // this object.
26 class MultiplexRouter::InterfaceEndpoint 28 class MultiplexRouter::InterfaceEndpoint
27 : public base::RefCounted<InterfaceEndpoint> { 29 : public base::RefCounted<InterfaceEndpoint>,
30 public InterfaceEndpointController {
28 public: 31 public:
29 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) 32 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
30 : router_lock_(&router->lock_), 33 : router_(router),
31 id_(id), 34 id_(id),
32 closed_(false), 35 closed_(false),
33 peer_closed_(false), 36 peer_closed_(false),
34 client_(nullptr) { 37 client_(nullptr),
35 router_lock_->AssertAcquired(); 38 event_signalled_(false) {}
36 } 39
40 // ---------------------------------------------------------------------------
41 // The following public methods are safe to call from any threads without
42 // locking.
37 43
38 InterfaceId id() const { return id_; } 44 InterfaceId id() const { return id_; }
39 45
46 // ---------------------------------------------------------------------------
47 // The following public methods are called under the router's lock.
48
40 bool closed() const { return closed_; } 49 bool closed() const { return closed_; }
41 void set_closed() { 50 void set_closed() {
42 router_lock_->AssertAcquired(); 51 router_->lock_.AssertAcquired();
43 closed_ = true; 52 closed_ = true;
44 } 53 }
45 54
46 bool peer_closed() const { return peer_closed_; } 55 bool peer_closed() const { return peer_closed_; }
47 void set_peer_closed() { 56 void set_peer_closed() {
48 router_lock_->AssertAcquired(); 57 router_->lock_.AssertAcquired();
49 peer_closed_ = true; 58 peer_closed_ = true;
50 } 59 }
51 60
52 base::SingleThreadTaskRunner* task_runner() const { 61 base::SingleThreadTaskRunner* task_runner() const {
53 return task_runner_.get(); 62 return task_runner_.get();
54 } 63 }
55 void set_task_runner(
56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
57 router_lock_->AssertAcquired();
58 task_runner_ = std::move(task_runner);
59 }
60 64
61 InterfaceEndpointClient* client() const { return client_; } 65 InterfaceEndpointClient* client() const { return client_; }
62 void set_client(InterfaceEndpointClient* client) { 66
63 router_lock_->AssertAcquired(); 67 void AttachClient(InterfaceEndpointClient* client) {
68 router_->lock_.AssertAcquired();
69 DCHECK(!client_);
70 DCHECK(!closed_);
71
72 task_runner_ = base::MessageLoop::current()->task_runner();
64 client_ = client; 73 client_ = client;
65 } 74 }
66 75
76 // This method must be called on the same thread as the corresponding
77 // AttachClient() call.
78 void DetachClient() {
79 router_->lock_.AssertAcquired();
80 DCHECK(client_);
81 DCHECK(task_runner_->BelongsToCurrentThread());
82 DCHECK(!closed_);
83
84 task_runner_ = nullptr;
85 client_ = nullptr;
86 sync_watcher_.reset();
87 }
88
89 void SignalSyncMessageEvent() {
90 router_->lock_.AssertAcquired();
91 if (event_signalled_)
92 return;
93
94 EnsureEventMessagePipeExists();
95 event_signalled_ = true;
96 MojoResult result =
97 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
98 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
99 DCHECK_EQ(MOJO_RESULT_OK, result);
100 }
101
102 // ---------------------------------------------------------------------------
103 // The following public methods (i.e., InterfaceEndpointController
104 // implementation) are called by the client on the same thread as the
105 // AttachClient() call. They are called outside of the router's lock.
106
107 bool SendMessage(Message* message) override {
108 DCHECK(task_runner_->BelongsToCurrentThread());
109 message->set_interface_id(id_);
110 return router_->connector_.Accept(message);
111 }
112
113 void AllowWokenUpBySyncWatchOnSameThread() override {
114 DCHECK(task_runner_->BelongsToCurrentThread());
115
116 EnsureSyncWatcherExists();
117 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
118 }
119
120 bool SyncWatch(const bool* should_stop) override {
121 DCHECK(task_runner_->BelongsToCurrentThread());
122
123 EnsureSyncWatcherExists();
124 return sync_watcher_->SyncWatch(should_stop);
125 }
126
67 private: 127 private:
68 friend class base::RefCounted<InterfaceEndpoint>; 128 friend class base::RefCounted<InterfaceEndpoint>;
69 129
70 ~InterfaceEndpoint() { 130 ~InterfaceEndpoint() override {
71 router_lock_->AssertAcquired(); 131 router_->lock_.AssertAcquired();
72 132
73 DCHECK(!client_); 133 DCHECK(!client_);
74 DCHECK(closed_); 134 DCHECK(closed_);
75 DCHECK(peer_closed_); 135 DCHECK(peer_closed_);
76 } 136 DCHECK(!sync_watcher_);
77 137 }
78 base::Lock* const router_lock_; 138
139 void OnHandleReady(MojoResult result) {
140 DCHECK(task_runner_->BelongsToCurrentThread());
141 scoped_refptr<InterfaceEndpoint> self_protector(this);
142 scoped_refptr<MultiplexRouter> router_protector(router_);
143
144 // Because we never close |sync_message_event_{sender,receiver}_| before
145 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
146 DCHECK_EQ(MOJO_RESULT_OK, result);
147 bool reset_sync_watcher = false;
148 {
149 base::AutoLock locker(router_->lock_);
150
151 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
152
153 if (!more_to_process)
154 ResetSyncMessageSignal();
155
156 // Currently there are no queued sync messages and the peer has closed so
157 // there won't be incoming sync messages in the future.
158 reset_sync_watcher = !more_to_process && peer_closed_;
159 }
160 if (reset_sync_watcher) {
161 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
162 // on the call stack, resetting the sync watcher will allow it to exit
163 // when the call stack unwinds to that frame.
164 sync_watcher_.reset();
165 }
166 }
167
168 void EnsureSyncWatcherExists() {
169 DCHECK(task_runner_->BelongsToCurrentThread());
170 if (sync_watcher_)
171 return;
172
173 {
174 base::AutoLock locker(router_->lock_);
175 EnsureEventMessagePipeExists();
176
177 auto iter = router_->sync_message_tasks_.find(id_);
178 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
179 SignalSyncMessageEvent();
180 }
181
182 sync_watcher_.reset(new SyncHandleWatcher(
183 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
184 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
185 }
186
187 void EnsureEventMessagePipeExists() {
188 router_->lock_.AssertAcquired();
189
190 if (sync_message_event_receiver_.is_valid())
191 return;
192
193 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
194 &sync_message_event_receiver_);
195 DCHECK_EQ(MOJO_RESULT_OK, result);
196 }
197
198 void ResetSyncMessageSignal() {
199 router_->lock_.AssertAcquired();
200
201 if (!event_signalled_)
202 return;
203
204 DCHECK(sync_message_event_receiver_.is_valid());
205 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
206 nullptr, nullptr, nullptr, nullptr,
207 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
208 DCHECK_EQ(MOJO_RESULT_OK, result);
209 event_signalled_ = false;
210 }
211
212 // ---------------------------------------------------------------------------
213 // The following members are safe to access from any threads.
214
215 MultiplexRouter* const router_;
79 const InterfaceId id_; 216 const InterfaceId id_;
80 217
218 // ---------------------------------------------------------------------------
219 // The following members are accessed under the router's lock.
220
81 // Whether the endpoint has been closed. 221 // Whether the endpoint has been closed.
82 bool closed_; 222 bool closed_;
83 // Whether the peer endpoint has been closed. 223 // Whether the peer endpoint has been closed.
84 bool peer_closed_; 224 bool peer_closed_;
85 225
86 // The task runner on which |client_| can be accessed. 226 // The task runner on which |client_|'s methods can be called.
87 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 227 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
88 // Not owned. It is null if no client is attached to this endpoint. 228 // Not owned. It is null if no client is attached to this endpoint.
89 InterfaceEndpointClient* client_; 229 InterfaceEndpointClient* client_;
90 230
231 // A message pipe used as an event to signal that sync messages are available.
232 // The message pipe handles are initialized under the router's lock and remain
233 // unchanged afterwards. They may be accessed outside of the router's lock
234 // later.
235 ScopedMessagePipeHandle sync_message_event_sender_;
236 ScopedMessagePipeHandle sync_message_event_receiver_;
237 bool event_signalled_;
238
239 // ---------------------------------------------------------------------------
240 // The following members are only valid while a client is attached. They are
241 // used exclusively on the client's thread. They may be accessed outside of
242 // the router's lock.
243
244 scoped_ptr<SyncHandleWatcher> sync_watcher_;
245
91 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); 246 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
92 }; 247 };
93 248
94 struct MultiplexRouter::Task { 249 struct MultiplexRouter::Task {
95 public: 250 public:
96 // Doesn't take ownership of |message| but takes its contents. 251 // Doesn't take ownership of |message| but takes its contents.
97 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { 252 static scoped_ptr<Task> CreateMessageTask(Message* message) {
98 Task* task = new Task(); 253 Task* task = new Task(MESSAGE);
99 task->message.reset(new Message); 254 task->message.reset(new Message);
100 message->MoveTo(task->message.get()); 255 message->MoveTo(task->message.get());
101 return make_scoped_ptr(task); 256 return make_scoped_ptr(task);
102 } 257 }
103 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { 258 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) {
104 Task* task = new Task(); 259 Task* task = new Task(NOTIFY_ERROR);
105 task->endpoint_to_notify = endpoint; 260 task->endpoint_to_notify = endpoint;
106 return make_scoped_ptr(task); 261 return make_scoped_ptr(task);
107 } 262 }
108 263
109 ~Task() {} 264 ~Task() {}
110 265
111 bool IsIncomingMessageTask() const { return !!message; } 266 bool IsMessageTask() const { return type == MESSAGE; }
112 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } 267 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
113 268
114 scoped_ptr<Message> message; 269 scoped_ptr<Message> message;
115 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; 270 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
116 271
272 enum Type { MESSAGE, NOTIFY_ERROR };
273 Type type;
274
117 private: 275 private:
118 Task() {} 276 explicit Task(Type in_type) : type(in_type) {}
119 }; 277 };
120 278
121 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, 279 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
122 ScopedMessagePipeHandle message_pipe) 280 ScopedMessagePipeHandle message_pipe)
123 : RefCountedDeleteOnMessageLoop( 281 : RefCountedDeleteOnMessageLoop(
124 base::MessageLoop::current()->task_runner()), 282 base::MessageLoop::current()->task_runner()),
125 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 283 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
126 header_validator_(this), 284 header_validator_(this),
127 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), 285 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND),
128 encountered_error_(false),
129 control_message_handler_(this), 286 control_message_handler_(this),
130 control_message_proxy_(&connector_), 287 control_message_proxy_(&connector_),
131 next_interface_id_value_(1), 288 next_interface_id_value_(1),
132 posted_to_process_tasks_(false), 289 posted_to_process_tasks_(false),
290 encountered_error_(false),
133 testing_mode_(false) { 291 testing_mode_(false) {
292 // Always participate in sync handle watching, because even if it doesn't
293 // expect sync requests during sync handle watching, it may still need to
294 // dispatch messages to associated endpoints on a different thread.
295 connector_.AllowWokenUpBySyncWatchOnSameThread();
134 connector_.set_incoming_receiver(&header_validator_); 296 connector_.set_incoming_receiver(&header_validator_);
135 connector_.set_connection_error_handler( 297 connector_.set_connection_error_handler(
136 [this]() { OnPipeConnectionError(); }); 298 [this]() { OnPipeConnectionError(); });
137 } 299 }
138 300
139 MultiplexRouter::~MultiplexRouter() { 301 MultiplexRouter::~MultiplexRouter() {
140 base::AutoLock locker(lock_); 302 base::AutoLock locker(lock_);
141 303
304 sync_message_tasks_.clear();
142 tasks_.clear(); 305 tasks_.clear();
143 306
144 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 307 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
145 InterfaceEndpoint* endpoint = iter->second.get(); 308 InterfaceEndpoint* endpoint = iter->second.get();
146 // Increment the iterator before calling UpdateEndpointStateMayRemove() 309 // Increment the iterator before calling UpdateEndpointStateMayRemove()
147 // because it may remove the corresponding value from the map. 310 // because it may remove the corresponding value from the map.
148 ++iter; 311 ++iter;
149 312
150 DCHECK(endpoint->closed()); 313 DCHECK(endpoint->closed());
151 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 314 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
214 377
215 DCHECK(ContainsKey(endpoints_, id)); 378 DCHECK(ContainsKey(endpoints_, id));
216 InterfaceEndpoint* endpoint = endpoints_[id].get(); 379 InterfaceEndpoint* endpoint = endpoints_[id].get();
217 DCHECK(!endpoint->client()); 380 DCHECK(!endpoint->client());
218 DCHECK(!endpoint->closed()); 381 DCHECK(!endpoint->closed());
219 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 382 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
220 383
221 if (!IsMasterInterfaceId(id)) 384 if (!IsMasterInterfaceId(id))
222 control_message_proxy_.NotifyPeerEndpointClosed(id); 385 control_message_proxy_.NotifyPeerEndpointClosed(id);
223 386
224 ProcessTasks(true); 387 ProcessTasks(NO_DIRECT_CLIENT_CALLS);
225 } 388 }
226 389
227 void MultiplexRouter::AttachEndpointClient( 390 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
228 const ScopedInterfaceEndpointHandle& handle, 391 const ScopedInterfaceEndpointHandle& handle,
229 InterfaceEndpointClient* client) { 392 InterfaceEndpointClient* client) {
230 const InterfaceId id = handle.id(); 393 const InterfaceId id = handle.id();
231 394
232 DCHECK(IsValidInterfaceId(id)); 395 DCHECK(IsValidInterfaceId(id));
233 DCHECK(client); 396 DCHECK(client);
234 397
235 base::AutoLock locker(lock_); 398 base::AutoLock locker(lock_);
236 DCHECK(ContainsKey(endpoints_, id)); 399 DCHECK(ContainsKey(endpoints_, id));
237 400
238 InterfaceEndpoint* endpoint = endpoints_[id].get(); 401 InterfaceEndpoint* endpoint = endpoints_[id].get();
239 DCHECK(!endpoint->client()); 402 endpoint->AttachClient(client);
240 DCHECK(!endpoint->closed());
241
242 endpoint->set_task_runner(base::MessageLoop::current()->task_runner());
243 endpoint->set_client(client);
244 403
245 if (endpoint->peer_closed()) 404 if (endpoint->peer_closed())
246 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 405 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
247 ProcessTasks(true); 406 ProcessTasks(NO_DIRECT_CLIENT_CALLS);
407
408 return endpoint;
248 } 409 }
249 410
250 void MultiplexRouter::DetachEndpointClient( 411 void MultiplexRouter::DetachEndpointClient(
251 const ScopedInterfaceEndpointHandle& handle) { 412 const ScopedInterfaceEndpointHandle& handle) {
252 const InterfaceId id = handle.id(); 413 const InterfaceId id = handle.id();
253 414
254 DCHECK(IsValidInterfaceId(id)); 415 DCHECK(IsValidInterfaceId(id));
255 416
256 base::AutoLock locker(lock_); 417 base::AutoLock locker(lock_);
257 DCHECK(ContainsKey(endpoints_, id)); 418 DCHECK(ContainsKey(endpoints_, id));
258 419
259 InterfaceEndpoint* endpoint = endpoints_[id].get(); 420 InterfaceEndpoint* endpoint = endpoints_[id].get();
260 DCHECK(endpoint->client()); 421 endpoint->DetachClient();
261 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
262 DCHECK(!endpoint->closed());
263
264 endpoint->set_task_runner(nullptr);
265 endpoint->set_client(nullptr);
266 }
267
268 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
269 Message* message) {
270 message->set_interface_id(handle.id());
271 return connector_.Accept(message);
272 } 422 }
273 423
274 void MultiplexRouter::RaiseError() { 424 void MultiplexRouter::RaiseError() {
275 if (task_runner_->BelongsToCurrentThread()) { 425 if (task_runner_->BelongsToCurrentThread()) {
276 connector_.RaiseError(); 426 connector_.RaiseError();
277 } else { 427 } else {
278 task_runner_->PostTask(FROM_HERE, 428 task_runner_->PostTask(FROM_HERE,
279 base::Bind(&MultiplexRouter::RaiseError, this)); 429 base::Bind(&MultiplexRouter::RaiseError, this));
280 } 430 }
281 } 431 }
282 432
283 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { 433 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() {
284 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); 434 scoped_ptr<AssociatedGroup> group(new AssociatedGroup);
285 group->router_ = this; 435 group->router_ = this;
286 return group; 436 return group;
287 } 437 }
288 438
289 // static 439 // static
290 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { 440 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) {
291 return associated_group->router_.get(); 441 return associated_group->router_.get();
292 } 442 }
293 443
444 void MultiplexRouter::CloseMessagePipe() {
445 DCHECK(thread_checker_.CalledOnValidThread());
446 connector_.CloseMessagePipe();
447 // CloseMessagePipe() above won't trigger connection error handler.
448 // Explicitly call OnPipeConnectionError() so that associated endpoints will
449 // get notified.
450 OnPipeConnectionError();
451 }
452
294 bool MultiplexRouter::HasAssociatedEndpoints() const { 453 bool MultiplexRouter::HasAssociatedEndpoints() const {
295 DCHECK(thread_checker_.CalledOnValidThread()); 454 DCHECK(thread_checker_.CalledOnValidThread());
296 base::AutoLock locker(lock_); 455 base::AutoLock locker(lock_);
297 456
298 if (endpoints_.size() > 1) 457 if (endpoints_.size() > 1)
299 return true; 458 return true;
300 if (endpoints_.size() == 0) 459 if (endpoints_.size() == 0)
301 return false; 460 return false;
302 461
303 return !ContainsKey(endpoints_, kMasterInterfaceId); 462 return !ContainsKey(endpoints_, kMasterInterfaceId);
304 } 463 }
305 464
306 void MultiplexRouter::EnableTestingMode() { 465 void MultiplexRouter::EnableTestingMode() {
307 DCHECK(thread_checker_.CalledOnValidThread()); 466 DCHECK(thread_checker_.CalledOnValidThread());
308 base::AutoLock locker(lock_); 467 base::AutoLock locker(lock_);
309 468
310 testing_mode_ = true; 469 testing_mode_ = true;
311 connector_.set_enforce_errors_from_incoming_receiver(false); 470 connector_.set_enforce_errors_from_incoming_receiver(false);
312 } 471 }
313 472
314 bool MultiplexRouter::Accept(Message* message) { 473 bool MultiplexRouter::Accept(Message* message) {
315 DCHECK(thread_checker_.CalledOnValidThread()); 474 DCHECK(thread_checker_.CalledOnValidThread());
316 475
317 scoped_refptr<MultiplexRouter> protector(this); 476 scoped_refptr<MultiplexRouter> protector(this);
318 base::AutoLock locker(lock_); 477 base::AutoLock locker(lock_);
319 478
320 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); 479 ClientCallBehavior client_call_behavior =
480 connector_.during_sync_handle_watcher_callback()
481 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
482 : ALLOW_DIRECT_CLIENT_CALLS;
483
484 bool processed =
485 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior);
321 486
322 if (!processed) { 487 if (!processed) {
323 // Either the task queue is not empty or we cannot process the message 488 // Either the task queue is not empty or we cannot process the message
324 // directly. In both cases, there is no need to call ProcessTasks(). 489 // directly. In both cases, there is no need to call ProcessTasks().
325 tasks_.push_back(Task::CreateIncomingMessageTask(message)); 490 tasks_.push_back(Task::CreateMessageTask(message));
491 Task* task = tasks_.back().get();
492
493 if (task->message->has_flag(kMessageIsSync)) {
494 InterfaceId id = task->message->interface_id();
495 sync_message_tasks_[id].push_back(task);
496 auto iter = endpoints_.find(id);
497 if (iter != endpoints_.end())
498 iter->second->SignalSyncMessageEvent();
499 }
326 } else if (!tasks_.empty()) { 500 } else if (!tasks_.empty()) {
327 // Processing the message may result in new tasks (for error notification) 501 // Processing the message may result in new tasks (for error notification)
328 // being added to the queue. In this case, we have to attempt to process the 502 // being added to the queue. In this case, we have to attempt to process the
329 // tasks. 503 // tasks.
330 ProcessTasks(false); 504 ProcessTasks(client_call_behavior);
331 } 505 }
332 506
333 // Always return true. If we see errors during message processing, we will 507 // Always return true. If we see errors during message processing, we will
334 // explicitly call Connector::RaiseError() to disconnect the message pipe. 508 // explicitly call Connector::RaiseError() to disconnect the message pipe.
335 return true; 509 return true;
336 } 510 }
337 511
338 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { 512 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
339 lock_.AssertAcquired(); 513 lock_.AssertAcquired();
340 514
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
381 // Increment the iterator before calling UpdateEndpointStateMayRemove() 555 // Increment the iterator before calling UpdateEndpointStateMayRemove()
382 // because it may remove the corresponding value from the map. 556 // because it may remove the corresponding value from the map.
383 ++iter; 557 ++iter;
384 558
385 if (endpoint->client()) 559 if (endpoint->client())
386 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 560 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
387 561
388 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 562 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
389 } 563 }
390 564
391 ProcessTasks(false); 565 ProcessTasks(connector_.during_sync_handle_watcher_callback()
566 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
567 : ALLOW_DIRECT_CLIENT_CALLS);
392 } 568 }
393 569
394 void MultiplexRouter::ProcessTasks(bool force_async) { 570 void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) {
395 lock_.AssertAcquired(); 571 lock_.AssertAcquired();
396 572
397 if (posted_to_process_tasks_) 573 if (posted_to_process_tasks_)
398 return; 574 return;
399 575
400 while (!tasks_.empty()) { 576 while (!tasks_.empty()) {
401 scoped_ptr<Task> task(std::move(tasks_.front())); 577 scoped_ptr<Task> task(std::move(tasks_.front()));
402 tasks_.pop_front(); 578 tasks_.pop_front();
403 579
580 InterfaceId id = kInvalidInterfaceId;
581 bool sync_message = task->IsMessageTask() && task->message &&
582 task->message->has_flag(kMessageIsSync);
583 if (sync_message) {
584 InterfaceId id = task->message->interface_id();
585 auto& sync_message_queue = sync_message_tasks_[id];
586 DCHECK_EQ(task.get(), sync_message_queue.front());
587 sync_message_queue.pop_front();
588 }
589
404 bool processed = 590 bool processed =
405 task->IsNotifyErrorTask() 591 task->IsNotifyErrorTask()
406 ? ProcessNotifyErrorTask(task.get(), force_async) 592 ? ProcessNotifyErrorTask(task.get(), client_call_behavior)
407 : ProcessIncomingMessage(task->message.get(), force_async); 593 : ProcessIncomingMessage(task->message.get(), client_call_behavior);
408 594
409 if (!processed) { 595 if (!processed) {
410 tasks_.push_front(std::move(task)); 596 tasks_.push_front(std::move(task));
597 if (sync_message) {
598 auto& sync_message_queue = sync_message_tasks_[id];
599 sync_message_queue.push_front(task.get());
600 }
411 break; 601 break;
602 } else {
603 if (sync_message) {
604 auto iter = sync_message_tasks_.find(id);
605 if (iter != sync_message_tasks_.end() && iter->second.empty())
606 sync_message_tasks_.erase(iter);
607 }
412 } 608 }
413 } 609 }
414 } 610 }
415 611
416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { 612 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
613 lock_.AssertAcquired();
614
615 auto iter = sync_message_tasks_.find(id);
616 if (iter == sync_message_tasks_.end())
617 return false;
618
619 MultiplexRouter::Task* task = iter->second.front();
620 iter->second.pop_front();
621
622 DCHECK(task->IsMessageTask());
623 scoped_ptr<Message> message(std::move(task->message));
624
625 // Note: after this call, |task| and |iter| may be invalidated.
626 bool processed = ProcessIncomingMessage(
627 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES);
628 DCHECK(processed);
629
630 iter = sync_message_tasks_.find(id);
631 return iter != sync_message_tasks_.end() && !iter->second.empty();
632 }
633
634 bool MultiplexRouter::ProcessNotifyErrorTask(
635 Task* task,
636 ClientCallBehavior client_call_behavior) {
417 lock_.AssertAcquired(); 637 lock_.AssertAcquired();
418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 638 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
419 if (!endpoint->client()) 639 if (!endpoint->client())
420 return true; 640 return true;
421 641
422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { 642 if (!endpoint->task_runner()->BelongsToCurrentThread() ||
643 client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) {
423 MaybePostToProcessTasks(endpoint->task_runner()); 644 MaybePostToProcessTasks(endpoint->task_runner());
424 return false; 645 return false;
425 } 646 }
426 647
427 InterfaceEndpointClient* client = endpoint->client(); 648 InterfaceEndpointClient* client = endpoint->client();
428 { 649 {
429 // We must unlock before calling into |client| because it may call this 650 // We must unlock before calling into |client| because it may call this
430 // object within NotifyError(). Holding the lock will lead to deadlock. 651 // object within NotifyError(). Holding the lock will lead to deadlock.
431 // 652 //
432 // It is safe to call into |client| without the lock. Because |client| is 653 // It is safe to call into |client| without the lock. Because |client| is
433 // always accessed on the same thread, including DetachEndpointClient(). 654 // always accessed on the same thread, including DetachEndpointClient().
434 base::AutoUnlock unlocker(lock_); 655 base::AutoUnlock unlocker(lock_);
435 client->NotifyError(); 656 client->NotifyError();
436 } 657 }
437 return true; 658 return true;
438 } 659 }
439 660
440 bool MultiplexRouter::ProcessIncomingMessage(Message* message, 661 bool MultiplexRouter::ProcessIncomingMessage(
441 bool force_async) { 662 Message* message,
663 ClientCallBehavior client_call_behavior) {
442 lock_.AssertAcquired(); 664 lock_.AssertAcquired();
665
666 if (!message) {
667 // This is a sync message and has been processed during sync handle
668 // watching.
669 return true;
670 }
671
443 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 672 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
444 if (!control_message_handler_.Accept(message)) 673 if (!control_message_handler_.Accept(message))
445 RaiseErrorInNonTestingMode(); 674 RaiseErrorInNonTestingMode();
446 return true; 675 return true;
447 } 676 }
448 677
449 InterfaceId id = message->interface_id(); 678 InterfaceId id = message->interface_id();
450 DCHECK(IsValidInterfaceId(id)); 679 DCHECK(IsValidInterfaceId(id));
451 680
452 bool inserted = false; 681 bool inserted = false;
(...skipping 14 matching lines...) Expand all
467 696
468 if (endpoint->closed()) 697 if (endpoint->closed())
469 return true; 698 return true;
470 699
471 if (!endpoint->client()) { 700 if (!endpoint->client()) {
472 // We need to wait until a client is attached in order to dispatch further 701 // We need to wait until a client is attached in order to dispatch further
473 // messages. 702 // messages.
474 return false; 703 return false;
475 } 704 }
476 705
477 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { 706 bool can_direct_call =
707 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) ||
708 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES &&
709 message->has_flag(kMessageIsSync));
710
711 if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) {
478 MaybePostToProcessTasks(endpoint->task_runner()); 712 MaybePostToProcessTasks(endpoint->task_runner());
479 return false; 713 return false;
480 } 714 }
481 715
482 InterfaceEndpointClient* client = endpoint->client(); 716 InterfaceEndpointClient* client = endpoint->client();
483 bool result = false; 717 bool result = false;
484 { 718 {
485 // We must unlock before calling into |client| because it may call this 719 // We must unlock before calling into |client| because it may call this
486 // object within HandleIncomingMessage(). Holding the lock will lead to 720 // object within HandleIncomingMessage(). Holding the lock will lead to
487 // deadlock. 721 // deadlock.
(...skipping 18 matching lines...) Expand all
506 posted_to_process_tasks_ = true; 740 posted_to_process_tasks_ = true;
507 task_runner->PostTask( 741 task_runner->PostTask(
508 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 742 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
509 } 743 }
510 744
511 void MultiplexRouter::LockAndCallProcessTasks() { 745 void MultiplexRouter::LockAndCallProcessTasks() {
512 // There is no need to hold a ref to this class in this case because this is 746 // There is no need to hold a ref to this class in this case because this is
513 // always called using base::Bind(), which holds a ref. 747 // always called using base::Bind(), which holds a ref.
514 base::AutoLock locker(lock_); 748 base::AutoLock locker(lock_);
515 posted_to_process_tasks_ = false; 749 posted_to_process_tasks_ = false;
516 ProcessTasks(false); 750 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS);
517 } 751 }
518 752
519 void MultiplexRouter::UpdateEndpointStateMayRemove( 753 void MultiplexRouter::UpdateEndpointStateMayRemove(
520 InterfaceEndpoint* endpoint, 754 InterfaceEndpoint* endpoint,
521 EndpointStateUpdateType type) { 755 EndpointStateUpdateType type) {
522 switch (type) { 756 switch (type) {
523 case ENDPOINT_CLOSED: 757 case ENDPOINT_CLOSED:
524 endpoint->set_closed(); 758 endpoint->set_closed();
525 break; 759 break;
526 case PEER_ENDPOINT_CLOSED: 760 case PEER_ENDPOINT_CLOSED:
527 endpoint->set_peer_closed(); 761 endpoint->set_peer_closed();
762 // If the interface endpoint is performing a sync watch, this makes sure
763 // it is notified and eventually exits the sync watch.
764 endpoint->SignalSyncMessageEvent();
528 break; 765 break;
529 } 766 }
530 if (endpoint->closed() && endpoint->peer_closed()) 767 if (endpoint->closed() && endpoint->peer_closed())
531 endpoints_.erase(endpoint->id()); 768 endpoints_.erase(endpoint->id());
532 } 769 }
533 770
534 void MultiplexRouter::RaiseErrorInNonTestingMode() { 771 void MultiplexRouter::RaiseErrorInNonTestingMode() {
535 lock_.AssertAcquired(); 772 lock_.AssertAcquired();
536 if (!testing_mode_) 773 if (!testing_mode_)
537 RaiseError(); 774 RaiseError();
(...skipping 16 matching lines...) Expand all
554 *inserted = true; 791 *inserted = true;
555 } else { 792 } else {
556 endpoint = iter->second.get(); 793 endpoint = iter->second.get();
557 } 794 }
558 795
559 return endpoint; 796 return endpoint;
560 } 797 }
561 798
562 } // namespace internal 799 } // namespace internal
563 } // namespace mojo 800 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/router.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698