Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 1823683006: Mojo C++ bindings: sync call support for associated interfaces and master interfaces (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/macros.h" 12 #include "base/macros.h"
13 #include "base/message_loop/message_loop.h" 13 #include "base/message_loop/message_loop.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/stl_util.h" 15 #include "base/stl_util.h"
16 #include "mojo/public/cpp/bindings/associated_group.h" 16 #include "mojo/public/cpp/bindings/associated_group.h"
17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
18 #include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h"
19 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
18 20
19 namespace mojo { 21 namespace mojo {
20 namespace internal { 22 namespace internal {
21 23
22 // InterfaceEndpoint stores the information of an interface endpoint registered 24 // InterfaceEndpoint stores the information of an interface endpoint registered
23 // with the router. Always accessed under the router's lock. 25 // with the router.
24 // No one other than the router's |endpoints_| and |tasks_| should hold refs to 26 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
25 // this object. 27 // this object.
26 class MultiplexRouter::InterfaceEndpoint 28 class MultiplexRouter::InterfaceEndpoint
27 : public base::RefCounted<InterfaceEndpoint> { 29 : public base::RefCounted<InterfaceEndpoint>,
30 public InterfaceEndpointController {
28 public: 31 public:
29 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) 32 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
30 : router_lock_(&router->lock_), 33 : router_(router),
31 id_(id), 34 id_(id),
32 closed_(false), 35 closed_(false),
33 peer_closed_(false), 36 peer_closed_(false),
34 client_(nullptr) { 37 client_(nullptr),
35 router_lock_->AssertAcquired(); 38 event_signalled_(false) {}
36 } 39
40 // ---------------------------------------------------------------------------
41 // The following public methods are safe to call from any threads without
42 // locking.
37 43
38 InterfaceId id() const { return id_; } 44 InterfaceId id() const { return id_; }
39 45
46 // ---------------------------------------------------------------------------
47 // The following public methods are called under the router's lock.
48
40 bool closed() const { return closed_; } 49 bool closed() const { return closed_; }
41 void set_closed() { 50 void set_closed() {
42 router_lock_->AssertAcquired(); 51 router_->lock_.AssertAcquired();
43 closed_ = true; 52 closed_ = true;
44 } 53 }
45 54
46 bool peer_closed() const { return peer_closed_; } 55 bool peer_closed() const { return peer_closed_; }
47 void set_peer_closed() { 56 void set_peer_closed() {
48 router_lock_->AssertAcquired(); 57 router_->lock_.AssertAcquired();
49 peer_closed_ = true; 58 peer_closed_ = true;
50 } 59 }
51 60
52 base::SingleThreadTaskRunner* task_runner() const { 61 base::SingleThreadTaskRunner* task_runner() const {
53 return task_runner_.get(); 62 return task_runner_.get();
54 } 63 }
55 void set_task_runner(
56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
57 router_lock_->AssertAcquired();
58 task_runner_ = std::move(task_runner);
59 }
60 64
61 InterfaceEndpointClient* client() const { return client_; } 65 InterfaceEndpointClient* client() const { return client_; }
62 void set_client(InterfaceEndpointClient* client) { 66
63 router_lock_->AssertAcquired(); 67 void AttachClient(InterfaceEndpointClient* client) {
68 router_->lock_.AssertAcquired();
69 DCHECK(!client_);
70 DCHECK(!closed_);
71
72 task_runner_ = base::MessageLoop::current()->task_runner();
64 client_ = client; 73 client_ = client;
65 } 74 }
66 75
76 // It should be called on the same thread as the corresponding AttachClient()
Ken Rockot(use gerrit already) 2016/03/29 06:36:49 nit: should = must? Also perhaps "This" or "This m
yzshen1 2016/03/29 16:19:01 Done. English is hard. :)
77 // call.
78 void DetachClient() {
79 router_->lock_.AssertAcquired();
80 DCHECK(client_);
81 DCHECK(task_runner_->BelongsToCurrentThread());
82 DCHECK(!closed_);
83
84 task_runner_ = nullptr;
85 client_ = nullptr;
86 sync_watcher_.reset();
87 }
88
89 void SignalSyncMessageEvent() {
90 router_->lock_.AssertAcquired();
91 if (event_signalled_)
92 return;
93
94 EnsureEventMessagePipeExists();
95 event_signalled_ = true;
96 char dummy_message = '\0';
97 MojoResult result =
98 WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1,
Ken Rockot(use gerrit already) 2016/03/29 06:36:49 nit: It is perfectly legal to write a zero-length
yzshen1 2016/03/29 16:19:01 Done.
99 nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
100 DCHECK_EQ(MOJO_RESULT_OK, result);
101 }
102
103 // ---------------------------------------------------------------------------
104 // The following public methods (i.e., InterfaceEndpointController
105 // implementation) are called by the client on the same thread as the
106 // AttachClient() call. They are called outside of the router's lock.
107
108 bool SendMessage(Message* message) override {
109 DCHECK(task_runner_->BelongsToCurrentThread());
110 message->set_interface_id(id_);
111 return router_->connector_.Accept(message);
112 }
113
114 void AllowWokenUpBySyncWatchOnSameThread() override {
115 DCHECK(task_runner_->BelongsToCurrentThread());
116
117 EnsureSyncWatcherExists();
118 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
119 }
120
121 bool SyncWatch(const bool* should_stop) override {
122 DCHECK(task_runner_->BelongsToCurrentThread());
123
124 EnsureSyncWatcherExists();
125 return sync_watcher_->SyncWatch(should_stop);
126 }
127
67 private: 128 private:
68 friend class base::RefCounted<InterfaceEndpoint>; 129 friend class base::RefCounted<InterfaceEndpoint>;
69 130
70 ~InterfaceEndpoint() { 131 ~InterfaceEndpoint() override {
71 router_lock_->AssertAcquired(); 132 router_->lock_.AssertAcquired();
72 133
73 DCHECK(!client_); 134 DCHECK(!client_);
74 DCHECK(closed_); 135 DCHECK(closed_);
75 DCHECK(peer_closed_); 136 DCHECK(peer_closed_);
76 } 137 DCHECK(!sync_watcher_);
77 138 }
78 base::Lock* const router_lock_; 139
140 void OnHandleReady(MojoResult result) {
141 DCHECK(task_runner_->BelongsToCurrentThread());
142 scoped_refptr<InterfaceEndpoint> self_protector(this);
143 scoped_refptr<MultiplexRouter> router_protector(router_);
144
145 // Because we never close |sync_message_event_{sender,receiver}_| before
146 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
147 DCHECK_EQ(MOJO_RESULT_OK, result);
148 bool reset_sync_watcher = false;
149 {
150 base::AutoLock locker(router_->lock_);
151
152 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
153
154 if (!more_to_process)
155 ResetSyncMessageSignal();
156
157 // Currently there are no queued sync messages and the peer has closed so
158 // there won't be incoming sync messages in the future.
159 reset_sync_watcher = !more_to_process && peer_closed_;
160 }
161 if (reset_sync_watcher) {
162 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
163 // on the call stack, resetting the sync watcher will allow it to exit
164 // when the call stack unwinds to that frame.
165 sync_watcher_.reset();
166 }
167 }
168
169 void EnsureSyncWatcherExists() {
170 DCHECK(task_runner_->BelongsToCurrentThread());
171 if (sync_watcher_)
172 return;
173
174 {
175 base::AutoLock locker(router_->lock_);
176 EnsureEventMessagePipeExists();
177
178 auto iter = router_->sync_message_tasks_.find(id_);
179 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
180 SignalSyncMessageEvent();
181 }
182
183 sync_watcher_.reset(new SyncHandleWatcher(
184 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
185 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
186 }
187
188 void EnsureEventMessagePipeExists() {
189 router_->lock_.AssertAcquired();
190
191 if (sync_message_event_receiver_.is_valid())
192 return;
193
194 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
195 &sync_message_event_receiver_);
196 DCHECK_EQ(MOJO_RESULT_OK, result);
197 }
198
199 void ResetSyncMessageSignal() {
200 router_->lock_.AssertAcquired();
201
202 if (!event_signalled_)
203 return;
204
205 DCHECK(sync_message_event_receiver_.is_valid());
206 char dummy_message = 0;
207 uint32_t size = 1;
208 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
Ken Rockot(use gerrit already) 2016/03/29 06:36:49 nit: It's fine to pass nullptr for the read buffer
yzshen1 2016/03/29 16:19:01 Done.
209 &dummy_message, &size, nullptr, nullptr,
210 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
211 DCHECK_EQ(MOJO_RESULT_OK, result);
212 event_signalled_ = false;
213 }
214
215 // ---------------------------------------------------------------------------
216 // The following members are safe to access from any threads.
217
218 MultiplexRouter* const router_;
79 const InterfaceId id_; 219 const InterfaceId id_;
80 220
221 // ---------------------------------------------------------------------------
222 // The following members are accessed under the router's lock.
223
81 // Whether the endpoint has been closed. 224 // Whether the endpoint has been closed.
82 bool closed_; 225 bool closed_;
83 // Whether the peer endpoint has been closed. 226 // Whether the peer endpoint has been closed.
84 bool peer_closed_; 227 bool peer_closed_;
85 228
86 // The task runner on which |client_| can be accessed. 229 // The task runner on which |client_|'s methods can be called.
87 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 230 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
88 // Not owned. It is null if no client is attached to this endpoint. 231 // Not owned. It is null if no client is attached to this endpoint.
89 InterfaceEndpointClient* client_; 232 InterfaceEndpointClient* client_;
90 233
234 // A message pipe used as an event to signal that sync messages are available.
235 // The message pipe handles are initialized under the router's lock and remain
236 // unchanged afterwards. They may be accessed outside of the router's lock
237 // later.
238 ScopedMessagePipeHandle sync_message_event_sender_;
239 ScopedMessagePipeHandle sync_message_event_receiver_;
240 bool event_signalled_;
241
242 // ---------------------------------------------------------------------------
243 // The following members are only valid while a client is attached. They are
244 // used exclusively on the client's thread. They may be accessed outside of
245 // the router's lock.
246
247 scoped_ptr<SyncHandleWatcher> sync_watcher_;
248
91 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); 249 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
92 }; 250 };
93 251
94 struct MultiplexRouter::Task { 252 struct MultiplexRouter::Task {
95 public: 253 public:
96 // Doesn't take ownership of |message| but takes its contents. 254 // Doesn't take ownership of |message| but takes its contents.
97 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { 255 static scoped_ptr<Task> CreateMessageTask(Message* message) {
98 Task* task = new Task(); 256 Task* task = new Task(MESSAGE);
99 task->message.reset(new Message); 257 task->message.reset(new Message);
100 message->MoveTo(task->message.get()); 258 message->MoveTo(task->message.get());
101 return make_scoped_ptr(task); 259 return make_scoped_ptr(task);
102 } 260 }
103 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { 261 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) {
104 Task* task = new Task(); 262 Task* task = new Task(NOTIFY_ERROR);
105 task->endpoint_to_notify = endpoint; 263 task->endpoint_to_notify = endpoint;
106 return make_scoped_ptr(task); 264 return make_scoped_ptr(task);
107 } 265 }
108 266
109 ~Task() {} 267 ~Task() {}
110 268
111 bool IsIncomingMessageTask() const { return !!message; } 269 bool IsMessageTask() const { return type == MESSAGE; }
112 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } 270 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
113 271
114 scoped_ptr<Message> message; 272 scoped_ptr<Message> message;
115 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; 273 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
116 274
275 enum Type { MESSAGE, NOTIFY_ERROR };
276 Type type;
277
117 private: 278 private:
118 Task() {} 279 explicit Task(Type in_type) : type(in_type) {}
119 }; 280 };
120 281
121 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, 282 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
122 ScopedMessagePipeHandle message_pipe) 283 ScopedMessagePipeHandle message_pipe)
123 : RefCountedDeleteOnMessageLoop( 284 : RefCountedDeleteOnMessageLoop(
124 base::MessageLoop::current()->task_runner()), 285 base::MessageLoop::current()->task_runner()),
125 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 286 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
126 header_validator_(this), 287 header_validator_(this),
127 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), 288 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND),
128 encountered_error_(false),
129 control_message_handler_(this), 289 control_message_handler_(this),
130 control_message_proxy_(&connector_), 290 control_message_proxy_(&connector_),
131 next_interface_id_value_(1), 291 next_interface_id_value_(1),
132 posted_to_process_tasks_(false), 292 posted_to_process_tasks_(false),
293 encountered_error_(false),
133 testing_mode_(false) { 294 testing_mode_(false) {
295 // Always participate in sync handle watching, because even if it doesn't
296 // expect sync requests during sync handle watching, it may still need to
297 // dispatch messages to associated endpoints on a different thread.
298 connector_.AllowWokenUpBySyncWatchOnSameThread();
134 connector_.set_incoming_receiver(&header_validator_); 299 connector_.set_incoming_receiver(&header_validator_);
135 connector_.set_connection_error_handler( 300 connector_.set_connection_error_handler(
136 [this]() { OnPipeConnectionError(); }); 301 [this]() { OnPipeConnectionError(); });
137 } 302 }
138 303
139 MultiplexRouter::~MultiplexRouter() { 304 MultiplexRouter::~MultiplexRouter() {
140 base::AutoLock locker(lock_); 305 base::AutoLock locker(lock_);
141 306
307 sync_message_tasks_.clear();
142 tasks_.clear(); 308 tasks_.clear();
143 309
144 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 310 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
145 InterfaceEndpoint* endpoint = iter->second.get(); 311 InterfaceEndpoint* endpoint = iter->second.get();
146 // Increment the iterator before calling UpdateEndpointStateMayRemove() 312 // Increment the iterator before calling UpdateEndpointStateMayRemove()
147 // because it may remove the corresponding value from the map. 313 // because it may remove the corresponding value from the map.
148 ++iter; 314 ++iter;
149 315
150 DCHECK(endpoint->closed()); 316 DCHECK(endpoint->closed());
151 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 317 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
214 380
215 DCHECK(ContainsKey(endpoints_, id)); 381 DCHECK(ContainsKey(endpoints_, id));
216 InterfaceEndpoint* endpoint = endpoints_[id].get(); 382 InterfaceEndpoint* endpoint = endpoints_[id].get();
217 DCHECK(!endpoint->client()); 383 DCHECK(!endpoint->client());
218 DCHECK(!endpoint->closed()); 384 DCHECK(!endpoint->closed());
219 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 385 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
220 386
221 if (!IsMasterInterfaceId(id)) 387 if (!IsMasterInterfaceId(id))
222 control_message_proxy_.NotifyPeerEndpointClosed(id); 388 control_message_proxy_.NotifyPeerEndpointClosed(id);
223 389
224 ProcessTasks(true); 390 ProcessTasks(NO_DIRECT_CLIENT_CALLS);
225 } 391 }
226 392
227 void MultiplexRouter::AttachEndpointClient( 393 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
228 const ScopedInterfaceEndpointHandle& handle, 394 const ScopedInterfaceEndpointHandle& handle,
229 InterfaceEndpointClient* client) { 395 InterfaceEndpointClient* client) {
230 const InterfaceId id = handle.id(); 396 const InterfaceId id = handle.id();
231 397
232 DCHECK(IsValidInterfaceId(id)); 398 DCHECK(IsValidInterfaceId(id));
233 DCHECK(client); 399 DCHECK(client);
234 400
235 base::AutoLock locker(lock_); 401 base::AutoLock locker(lock_);
236 DCHECK(ContainsKey(endpoints_, id)); 402 DCHECK(ContainsKey(endpoints_, id));
237 403
238 InterfaceEndpoint* endpoint = endpoints_[id].get(); 404 InterfaceEndpoint* endpoint = endpoints_[id].get();
239 DCHECK(!endpoint->client()); 405 endpoint->AttachClient(client);
240 DCHECK(!endpoint->closed());
241
242 endpoint->set_task_runner(base::MessageLoop::current()->task_runner());
243 endpoint->set_client(client);
244 406
245 if (endpoint->peer_closed()) 407 if (endpoint->peer_closed())
246 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 408 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
247 ProcessTasks(true); 409 ProcessTasks(NO_DIRECT_CLIENT_CALLS);
410
411 return endpoint;
248 } 412 }
249 413
250 void MultiplexRouter::DetachEndpointClient( 414 void MultiplexRouter::DetachEndpointClient(
251 const ScopedInterfaceEndpointHandle& handle) { 415 const ScopedInterfaceEndpointHandle& handle) {
252 const InterfaceId id = handle.id(); 416 const InterfaceId id = handle.id();
253 417
254 DCHECK(IsValidInterfaceId(id)); 418 DCHECK(IsValidInterfaceId(id));
255 419
256 base::AutoLock locker(lock_); 420 base::AutoLock locker(lock_);
257 DCHECK(ContainsKey(endpoints_, id)); 421 DCHECK(ContainsKey(endpoints_, id));
258 422
259 InterfaceEndpoint* endpoint = endpoints_[id].get(); 423 InterfaceEndpoint* endpoint = endpoints_[id].get();
260 DCHECK(endpoint->client()); 424 endpoint->DetachClient();
261 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
262 DCHECK(!endpoint->closed());
263
264 endpoint->set_task_runner(nullptr);
265 endpoint->set_client(nullptr);
266 }
267
268 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
269 Message* message) {
270 message->set_interface_id(handle.id());
271 return connector_.Accept(message);
272 } 425 }
273 426
274 void MultiplexRouter::RaiseError() { 427 void MultiplexRouter::RaiseError() {
275 if (task_runner_->BelongsToCurrentThread()) { 428 if (task_runner_->BelongsToCurrentThread()) {
276 connector_.RaiseError(); 429 connector_.RaiseError();
277 } else { 430 } else {
278 task_runner_->PostTask(FROM_HERE, 431 task_runner_->PostTask(FROM_HERE,
279 base::Bind(&MultiplexRouter::RaiseError, this)); 432 base::Bind(&MultiplexRouter::RaiseError, this));
280 } 433 }
281 } 434 }
282 435
283 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { 436 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() {
284 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); 437 scoped_ptr<AssociatedGroup> group(new AssociatedGroup);
285 group->router_ = this; 438 group->router_ = this;
286 return group; 439 return group;
287 } 440 }
288 441
289 // static 442 // static
290 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { 443 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) {
291 return associated_group->router_.get(); 444 return associated_group->router_.get();
292 } 445 }
293 446
447 void MultiplexRouter::CloseMessagePipe() {
448 DCHECK(thread_checker_.CalledOnValidThread());
449 connector_.CloseMessagePipe();
450 // CloseMessagePipe() above won't trigger connection error handler.
451 // Explicitly call OnPipeConnectionError() so that associated endpoints will
452 // get notified.
453 OnPipeConnectionError();
454 }
455
294 bool MultiplexRouter::HasAssociatedEndpoints() const { 456 bool MultiplexRouter::HasAssociatedEndpoints() const {
295 DCHECK(thread_checker_.CalledOnValidThread()); 457 DCHECK(thread_checker_.CalledOnValidThread());
296 base::AutoLock locker(lock_); 458 base::AutoLock locker(lock_);
297 459
298 if (endpoints_.size() > 1) 460 if (endpoints_.size() > 1)
299 return true; 461 return true;
300 if (endpoints_.size() == 0) 462 if (endpoints_.size() == 0)
301 return false; 463 return false;
302 464
303 return !ContainsKey(endpoints_, kMasterInterfaceId); 465 return !ContainsKey(endpoints_, kMasterInterfaceId);
304 } 466 }
305 467
306 void MultiplexRouter::EnableTestingMode() { 468 void MultiplexRouter::EnableTestingMode() {
307 DCHECK(thread_checker_.CalledOnValidThread()); 469 DCHECK(thread_checker_.CalledOnValidThread());
308 base::AutoLock locker(lock_); 470 base::AutoLock locker(lock_);
309 471
310 testing_mode_ = true; 472 testing_mode_ = true;
311 connector_.set_enforce_errors_from_incoming_receiver(false); 473 connector_.set_enforce_errors_from_incoming_receiver(false);
312 } 474 }
313 475
314 bool MultiplexRouter::Accept(Message* message) { 476 bool MultiplexRouter::Accept(Message* message) {
315 DCHECK(thread_checker_.CalledOnValidThread()); 477 DCHECK(thread_checker_.CalledOnValidThread());
316 478
317 scoped_refptr<MultiplexRouter> protector(this); 479 scoped_refptr<MultiplexRouter> protector(this);
318 base::AutoLock locker(lock_); 480 base::AutoLock locker(lock_);
319 481
320 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); 482 ClientCallBehavior client_call_behavior =
483 connector_.during_sync_handle_watcher_callback()
484 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
485 : ALLOW_DIRECT_CLIENT_CALLS;
486
487 bool processed =
488 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior);
321 489
322 if (!processed) { 490 if (!processed) {
323 // Either the task queue is not empty or we cannot process the message 491 // Either the task queue is not empty or we cannot process the message
324 // directly. In both cases, there is no need to call ProcessTasks(). 492 // directly. In both cases, there is no need to call ProcessTasks().
325 tasks_.push_back(Task::CreateIncomingMessageTask(message)); 493 tasks_.push_back(Task::CreateMessageTask(message));
494 Task* task = tasks_.back().get();
495
496 if (task->message->has_flag(kMessageIsSync)) {
497 InterfaceId id = task->message->interface_id();
498 sync_message_tasks_[id].push_back(task);
499 auto iter = endpoints_.find(id);
500 if (iter != endpoints_.end())
501 iter->second->SignalSyncMessageEvent();
502 }
326 } else if (!tasks_.empty()) { 503 } else if (!tasks_.empty()) {
327 // Processing the message may result in new tasks (for error notification) 504 // Processing the message may result in new tasks (for error notification)
328 // being added to the queue. In this case, we have to attempt to process the 505 // being added to the queue. In this case, we have to attempt to process the
329 // tasks. 506 // tasks.
330 ProcessTasks(false); 507 ProcessTasks(client_call_behavior);
331 } 508 }
332 509
333 // Always return true. If we see errors during message processing, we will 510 // Always return true. If we see errors during message processing, we will
334 // explicitly call Connector::RaiseError() to disconnect the message pipe. 511 // explicitly call Connector::RaiseError() to disconnect the message pipe.
335 return true; 512 return true;
336 } 513 }
337 514
338 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { 515 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
339 lock_.AssertAcquired(); 516 lock_.AssertAcquired();
340 517
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
381 // Increment the iterator before calling UpdateEndpointStateMayRemove() 558 // Increment the iterator before calling UpdateEndpointStateMayRemove()
382 // because it may remove the corresponding value from the map. 559 // because it may remove the corresponding value from the map.
383 ++iter; 560 ++iter;
384 561
385 if (endpoint->client()) 562 if (endpoint->client())
386 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 563 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
387 564
388 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 565 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
389 } 566 }
390 567
391 ProcessTasks(false); 568 ProcessTasks(connector_.during_sync_handle_watcher_callback()
569 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
570 : ALLOW_DIRECT_CLIENT_CALLS);
392 } 571 }
393 572
394 void MultiplexRouter::ProcessTasks(bool force_async) { 573 void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) {
395 lock_.AssertAcquired(); 574 lock_.AssertAcquired();
396 575
397 if (posted_to_process_tasks_) 576 if (posted_to_process_tasks_)
398 return; 577 return;
399 578
400 while (!tasks_.empty()) { 579 while (!tasks_.empty()) {
401 scoped_ptr<Task> task(std::move(tasks_.front())); 580 scoped_ptr<Task> task(std::move(tasks_.front()));
402 tasks_.pop_front(); 581 tasks_.pop_front();
403 582
583 InterfaceId id = kInvalidInterfaceId;
584 bool sync_message = task->IsMessageTask() && task->message &&
585 task->message->has_flag(kMessageIsSync);
586 if (sync_message) {
587 InterfaceId id = task->message->interface_id();
588 auto& sync_message_queue = sync_message_tasks_[id];
589 DCHECK_EQ(task.get(), sync_message_queue.front());
590 sync_message_queue.pop_front();
591 }
592
404 bool processed = 593 bool processed =
405 task->IsNotifyErrorTask() 594 task->IsNotifyErrorTask()
406 ? ProcessNotifyErrorTask(task.get(), force_async) 595 ? ProcessNotifyErrorTask(task.get(), client_call_behavior)
407 : ProcessIncomingMessage(task->message.get(), force_async); 596 : ProcessIncomingMessage(task->message.get(), client_call_behavior);
408 597
409 if (!processed) { 598 if (!processed) {
410 tasks_.push_front(std::move(task)); 599 tasks_.push_front(std::move(task));
600 if (sync_message) {
601 auto& sync_message_queue = sync_message_tasks_[id];
602 sync_message_queue.push_front(task.get());
603 }
411 break; 604 break;
605 } else {
606 if (sync_message) {
607 auto iter = sync_message_tasks_.find(id);
608 if (iter != sync_message_tasks_.end() && iter->second.empty())
609 sync_message_tasks_.erase(iter);
610 }
412 } 611 }
413 } 612 }
414 } 613 }
415 614
416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { 615 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
616 lock_.AssertAcquired();
617
618 auto iter = sync_message_tasks_.find(id);
619 if (iter == sync_message_tasks_.end())
620 return false;
621
622 MultiplexRouter::Task* task = iter->second.front();
623 iter->second.pop_front();
624
625 DCHECK(task->IsMessageTask());
626 scoped_ptr<Message> message(std::move(task->message));
627
628 // Note: after this call, |task| and |iter| may be invalidated.
629 bool processed = ProcessIncomingMessage(
630 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES);
631 DCHECK(processed);
632
633 iter = sync_message_tasks_.find(id);
634 return iter != sync_message_tasks_.end() && !iter->second.empty();
635 }
636
637 bool MultiplexRouter::ProcessNotifyErrorTask(
638 Task* task,
639 ClientCallBehavior client_call_behavior) {
417 lock_.AssertAcquired(); 640 lock_.AssertAcquired();
418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 641 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
419 if (!endpoint->client()) 642 if (!endpoint->client())
420 return true; 643 return true;
421 644
422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { 645 if (!endpoint->task_runner()->BelongsToCurrentThread() ||
646 client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) {
423 MaybePostToProcessTasks(endpoint->task_runner()); 647 MaybePostToProcessTasks(endpoint->task_runner());
424 return false; 648 return false;
425 } 649 }
426 650
427 InterfaceEndpointClient* client = endpoint->client(); 651 InterfaceEndpointClient* client = endpoint->client();
428 { 652 {
429 // We must unlock before calling into |client| because it may call this 653 // We must unlock before calling into |client| because it may call this
430 // object within NotifyError(). Holding the lock will lead to deadlock. 654 // object within NotifyError(). Holding the lock will lead to deadlock.
431 // 655 //
432 // It is safe to call into |client| without the lock. Because |client| is 656 // It is safe to call into |client| without the lock. Because |client| is
433 // always accessed on the same thread, including DetachEndpointClient(). 657 // always accessed on the same thread, including DetachEndpointClient().
434 base::AutoUnlock unlocker(lock_); 658 base::AutoUnlock unlocker(lock_);
435 client->NotifyError(); 659 client->NotifyError();
436 } 660 }
437 return true; 661 return true;
438 } 662 }
439 663
440 bool MultiplexRouter::ProcessIncomingMessage(Message* message, 664 bool MultiplexRouter::ProcessIncomingMessage(
441 bool force_async) { 665 Message* message,
666 ClientCallBehavior client_call_behavior) {
442 lock_.AssertAcquired(); 667 lock_.AssertAcquired();
668
669 if (!message) {
670 // This is a sync message and has been processed during sync handle
671 // watching.
672 return true;
673 }
674
443 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 675 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
444 if (!control_message_handler_.Accept(message)) 676 if (!control_message_handler_.Accept(message))
445 RaiseErrorInNonTestingMode(); 677 RaiseErrorInNonTestingMode();
446 return true; 678 return true;
447 } 679 }
448 680
449 InterfaceId id = message->interface_id(); 681 InterfaceId id = message->interface_id();
450 DCHECK(IsValidInterfaceId(id)); 682 DCHECK(IsValidInterfaceId(id));
451 683
452 bool inserted = false; 684 bool inserted = false;
(...skipping 14 matching lines...) Expand all
467 699
468 if (endpoint->closed()) 700 if (endpoint->closed())
469 return true; 701 return true;
470 702
471 if (!endpoint->client()) { 703 if (!endpoint->client()) {
472 // We need to wait until a client is attached in order to dispatch further 704 // We need to wait until a client is attached in order to dispatch further
473 // messages. 705 // messages.
474 return false; 706 return false;
475 } 707 }
476 708
477 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { 709 bool can_direct_call =
710 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) ||
711 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES &&
712 message->has_flag(kMessageIsSync));
713
714 if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) {
478 MaybePostToProcessTasks(endpoint->task_runner()); 715 MaybePostToProcessTasks(endpoint->task_runner());
479 return false; 716 return false;
480 } 717 }
481 718
482 InterfaceEndpointClient* client = endpoint->client(); 719 InterfaceEndpointClient* client = endpoint->client();
483 bool result = false; 720 bool result = false;
484 { 721 {
485 // We must unlock before calling into |client| because it may call this 722 // We must unlock before calling into |client| because it may call this
486 // object within HandleIncomingMessage(). Holding the lock will lead to 723 // object within HandleIncomingMessage(). Holding the lock will lead to
487 // deadlock. 724 // deadlock.
(...skipping 18 matching lines...) Expand all
506 posted_to_process_tasks_ = true; 743 posted_to_process_tasks_ = true;
507 task_runner->PostTask( 744 task_runner->PostTask(
508 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 745 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
509 } 746 }
510 747
511 void MultiplexRouter::LockAndCallProcessTasks() { 748 void MultiplexRouter::LockAndCallProcessTasks() {
512 // There is no need to hold a ref to this class in this case because this is 749 // There is no need to hold a ref to this class in this case because this is
513 // always called using base::Bind(), which holds a ref. 750 // always called using base::Bind(), which holds a ref.
514 base::AutoLock locker(lock_); 751 base::AutoLock locker(lock_);
515 posted_to_process_tasks_ = false; 752 posted_to_process_tasks_ = false;
516 ProcessTasks(false); 753 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS);
517 } 754 }
518 755
519 void MultiplexRouter::UpdateEndpointStateMayRemove( 756 void MultiplexRouter::UpdateEndpointStateMayRemove(
520 InterfaceEndpoint* endpoint, 757 InterfaceEndpoint* endpoint,
521 EndpointStateUpdateType type) { 758 EndpointStateUpdateType type) {
522 switch (type) { 759 switch (type) {
523 case ENDPOINT_CLOSED: 760 case ENDPOINT_CLOSED:
524 endpoint->set_closed(); 761 endpoint->set_closed();
525 break; 762 break;
526 case PEER_ENDPOINT_CLOSED: 763 case PEER_ENDPOINT_CLOSED:
527 endpoint->set_peer_closed(); 764 endpoint->set_peer_closed();
765 // If the interface endpoint is performing a sync watch, this makes sure
766 // it is notified and eventually exits the sync watch.
767 endpoint->SignalSyncMessageEvent();
528 break; 768 break;
529 } 769 }
530 if (endpoint->closed() && endpoint->peer_closed()) 770 if (endpoint->closed() && endpoint->peer_closed())
531 endpoints_.erase(endpoint->id()); 771 endpoints_.erase(endpoint->id());
532 } 772 }
533 773
534 void MultiplexRouter::RaiseErrorInNonTestingMode() { 774 void MultiplexRouter::RaiseErrorInNonTestingMode() {
535 lock_.AssertAcquired(); 775 lock_.AssertAcquired();
536 if (!testing_mode_) 776 if (!testing_mode_)
537 RaiseError(); 777 RaiseError();
(...skipping 16 matching lines...) Expand all
554 *inserted = true; 794 *inserted = true;
555 } else { 795 } else {
556 endpoint = iter->second.get(); 796 endpoint = iter->second.get();
557 } 797 }
558 798
559 return endpoint; 799 return endpoint;
560 } 800 }
561 801
562 } // namespace internal 802 } // namespace internal
563 } // namespace mojo 803 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698