Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(232)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 1823683006: Mojo C++ bindings: sync call support for associated interfaces and master interfaces (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/router.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/macros.h" 12 #include "base/macros.h"
13 #include "base/message_loop/message_loop.h" 13 #include "base/message_loop/message_loop.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/stl_util.h" 15 #include "base/stl_util.h"
16 #include "mojo/public/cpp/bindings/associated_group.h" 16 #include "mojo/public/cpp/bindings/associated_group.h"
17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
18 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
18 19
19 namespace mojo { 20 namespace mojo {
20 namespace internal { 21 namespace internal {
21 22
22 // InterfaceEndpoint stores the information of an interface endpoint registered 23 // InterfaceEndpoint stores the information of an interface endpoint registered
23 // with the router. Always accessed under the router's lock. 24 // with the router. Always accessed under the router's lock.
24 // No one other than the router's |endpoints_| and |tasks_| should hold refs to 25 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
25 // this object. 26 // this object.
27 // TODO(yzshen): update the comment about lock.
26 class MultiplexRouter::InterfaceEndpoint 28 class MultiplexRouter::InterfaceEndpoint
27 : public base::RefCounted<InterfaceEndpoint> { 29 : public base::RefCounted<InterfaceEndpoint>,
30 public InterfaceEndpointController {
28 public: 31 public:
29 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) 32 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
30 : router_lock_(&router->lock_), 33 : router_(router),
31 id_(id), 34 id_(id),
32 closed_(false), 35 closed_(false),
33 peer_closed_(false), 36 peer_closed_(false),
34 client_(nullptr) { 37 client_(nullptr),
35 router_lock_->AssertAcquired(); 38 event_signalled_(false) {
39 router_->lock_.AssertAcquired();
40
41 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
42 &sync_message_event_receiver_);
43 DCHECK_EQ(MOJO_RESULT_OK, result);
36 } 44 }
37 45
38 InterfaceId id() const { return id_; } 46 InterfaceId id() const { return id_; }
39 47
40 bool closed() const { return closed_; } 48 bool closed() const { return closed_; }
41 void set_closed() { 49 void set_closed() {
42 router_lock_->AssertAcquired(); 50 router_->lock_.AssertAcquired();
43 closed_ = true; 51 closed_ = true;
44 } 52 }
45 53
46 bool peer_closed() const { return peer_closed_; } 54 bool peer_closed() const { return peer_closed_; }
47 void set_peer_closed() { 55 void set_peer_closed() {
48 router_lock_->AssertAcquired(); 56 router_->lock_.AssertAcquired();
49 peer_closed_ = true; 57 peer_closed_ = true;
50 } 58 }
51 59
52 base::SingleThreadTaskRunner* task_runner() const { 60 base::SingleThreadTaskRunner* task_runner() const {
53 return task_runner_.get(); 61 return task_runner_.get();
54 } 62 }
55 void set_task_runner( 63 void set_task_runner(
56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { 64 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
57 router_lock_->AssertAcquired(); 65 router_->lock_.AssertAcquired();
58 task_runner_ = std::move(task_runner); 66 task_runner_ = std::move(task_runner);
59 } 67 }
60 68
61 InterfaceEndpointClient* client() const { return client_; } 69 InterfaceEndpointClient* client() const { return client_; }
62 void set_client(InterfaceEndpointClient* client) { 70 void set_client(InterfaceEndpointClient* client) {
63 router_lock_->AssertAcquired(); 71 router_->lock_.AssertAcquired();
64 client_ = client; 72 client_ = client;
65 } 73 }
66 74
75 void ClearSyncHandleWatcher() {
76 router_->lock_.AssertAcquired();
77 sync_watcher_.reset();
78 }
79
80 void SignalSyncMessageReceived() {
81 router_->lock_.AssertAcquired();
82
83 if (event_signalled_)
84 return;
85
86 event_signalled_ = true;
87 char dummy_message = '\0';
88 MojoResult result =
89 WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1,
90 nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
91 DCHECK_EQ(MOJO_RESULT_OK, result);
92 }
93
94 void ResetSyncMessageSignal() {
95 router_->lock_.AssertAcquired();
96
97 if (!event_signalled_)
98 return;
99
100 char dummy_message = 0;
101 uint32_t size = 1;
102 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
103 &dummy_message, &size, nullptr, nullptr,
104 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
105 DCHECK_EQ(MOJO_RESULT_OK, result);
106 event_signalled_ = false;
107 }
108
109 bool SendMessage(Message* message) override {
110 DCHECK(task_runner_->BelongsToCurrentThread());
111 message->set_interface_id(id_);
112 return router_->connector_.Accept(message);
113 }
114
115 void AllowWokenUpBySyncWatchOnSameThread() override {
116 DCHECK(task_runner_->BelongsToCurrentThread());
117
118 {
119 base::AutoLock locker(router_->lock_);
120
121 auto iter = router_->sync_message_tasks_.find(id_);
122 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
123 SignalSyncMessageReceived();
124 }
125
126 EnsureSyncWatcherExists();
127 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
128 }
129
130 bool SyncWatch(const bool* should_stop) override {
131 DCHECK(task_runner_->BelongsToCurrentThread());
132
133 EnsureSyncWatcherExists();
134 return sync_watcher_->SyncWatch(should_stop);
135 }
136
67 private: 137 private:
68 friend class base::RefCounted<InterfaceEndpoint>; 138 friend class base::RefCounted<InterfaceEndpoint>;
69 139
70 ~InterfaceEndpoint() { 140 ~InterfaceEndpoint() override {
71 router_lock_->AssertAcquired(); 141 router_->lock_.AssertAcquired();
72 142
73 DCHECK(!client_); 143 DCHECK(!client_);
74 DCHECK(closed_); 144 DCHECK(closed_);
75 DCHECK(peer_closed_); 145 DCHECK(peer_closed_);
146 DCHECK(!sync_watcher_);
76 } 147 }
77 148
78 base::Lock* const router_lock_; 149 void OnHandleReady(MojoResult result);
150
151 void EnsureSyncWatcherExists() {
152 if (sync_watcher_)
153 return;
154
155 sync_watcher_.reset(new SyncHandleWatcher(
156 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
157 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
158 }
159
160 MultiplexRouter* const router_;
79 const InterfaceId id_; 161 const InterfaceId id_;
80 162
81 // Whether the endpoint has been closed. 163 // Whether the endpoint has been closed.
82 bool closed_; 164 bool closed_;
83 // Whether the peer endpoint has been closed. 165 // Whether the peer endpoint has been closed.
84 bool peer_closed_; 166 bool peer_closed_;
85 167
86 // The task runner on which |client_| can be accessed. 168 // The task runner on which |client_| can be accessed.
87 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 169 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
88 // Not owned. It is null if no client is attached to this endpoint. 170 // Not owned. It is null if no client is attached to this endpoint.
89 InterfaceEndpointClient* client_; 171 InterfaceEndpointClient* client_;
90 172
173 // TODO(yzshen): is it too expensive? Do I need to lazy init?
174 ScopedMessagePipeHandle sync_message_event_sender_;
175 ScopedMessagePipeHandle sync_message_event_receiver_;
176 bool event_signalled_;
177 scoped_ptr<SyncHandleWatcher> sync_watcher_;
178
179 // TODO(yzshen): The handling of sync watching is quite similar to what
180 // Connector does. Consider unifying them.
181 //
182 // If non-zero, |sync_message_event_receiver_| should be registered with
183 // SyncHandleRegistry.
184 size_t register_sync_handle_watch_count_;
185 scoped_refptr<base::RefCountedData<bool>> should_stop_sync_handle_watch_;
186
91 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); 187 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
92 }; 188 };
93 189
94 struct MultiplexRouter::Task { 190 struct MultiplexRouter::Task {
95 public: 191 public:
96 // Doesn't take ownership of |message| but takes its contents. 192 // Doesn't take ownership of |message| but takes its contents.
97 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { 193 static scoped_ptr<Task> CreateMessageTask(Message* message) {
98 Task* task = new Task(); 194 Task* task = new Task(MESSAGE);
99 task->message.reset(new Message); 195 task->message.reset(new Message);
100 message->MoveTo(task->message.get()); 196 message->MoveTo(task->message.get());
101 return make_scoped_ptr(task); 197 return make_scoped_ptr(task);
102 } 198 }
103 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { 199 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) {
104 Task* task = new Task(); 200 Task* task = new Task(NOTIFY_ERROR);
105 task->endpoint_to_notify = endpoint; 201 task->endpoint_to_notify = endpoint;
106 return make_scoped_ptr(task); 202 return make_scoped_ptr(task);
107 } 203 }
108 204
109 ~Task() {} 205 ~Task() {}
110 206
111 bool IsIncomingMessageTask() const { return !!message; } 207 bool IsMessageTask() const { return type == MESSAGE; }
112 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } 208 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
113 209
114 scoped_ptr<Message> message; 210 scoped_ptr<Message> message;
115 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; 211 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
116 212
213 enum Type { MESSAGE, NOTIFY_ERROR };
214
215 Type type;
216
117 private: 217 private:
118 Task() {} 218 explicit Task(Type in_type) : type(in_type) {}
119 }; 219 };
120 220
221 void MultiplexRouter::InterfaceEndpoint::OnHandleReady(MojoResult result) {
222 DCHECK(task_runner_->BelongsToCurrentThread());
223 scoped_refptr<InterfaceEndpoint> self_protector(this);
224 scoped_refptr<MultiplexRouter> router_protector(router_);
225
226 {
227 base::AutoLock locker(router_->lock_);
228 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
229
230 if (!more_to_process)
231 ResetSyncMessageSignal();
232
233 bool no_more_sync_messages = !more_to_process && peer_closed_;
234 bool sync_handle_watch_failed = result != MOJO_RESULT_OK;
235
236 if (no_more_sync_messages || sync_handle_watch_failed)
237 sync_watcher_.reset();
238 }
239 }
240
121 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, 241 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
122 ScopedMessagePipeHandle message_pipe) 242 ScopedMessagePipeHandle message_pipe)
123 : RefCountedDeleteOnMessageLoop( 243 : RefCountedDeleteOnMessageLoop(
124 base::MessageLoop::current()->task_runner()), 244 base::MessageLoop::current()->task_runner()),
125 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 245 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
126 header_validator_(this), 246 header_validator_(this),
127 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), 247 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND),
128 encountered_error_(false), 248 encountered_error_(false),
129 control_message_handler_(this), 249 control_message_handler_(this),
130 control_message_proxy_(&connector_), 250 control_message_proxy_(&connector_),
131 next_interface_id_value_(1), 251 next_interface_id_value_(1),
132 posted_to_process_tasks_(false), 252 posted_to_process_tasks_(false),
133 testing_mode_(false) { 253 testing_mode_(false) {
254 // Always participate in sync handle watch, because it may want to dispatch
255 // messages to associated endpoints on a different thread; or it want to
256 // dispatch sync requests to the master binding or associated bindings on the
257 // same thread.
258 connector_.AllowWokenUpBySyncWatchOnSameThread();
134 connector_.set_incoming_receiver(&header_validator_); 259 connector_.set_incoming_receiver(&header_validator_);
135 connector_.set_connection_error_handler( 260 connector_.set_connection_error_handler(
136 [this]() { OnPipeConnectionError(); }); 261 [this]() { OnPipeConnectionError(); });
137 } 262 }
138 263
139 MultiplexRouter::~MultiplexRouter() { 264 MultiplexRouter::~MultiplexRouter() {
140 base::AutoLock locker(lock_); 265 base::AutoLock locker(lock_);
141 266
267 sync_message_tasks_.clear();
142 tasks_.clear(); 268 tasks_.clear();
143 269
144 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 270 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
145 InterfaceEndpoint* endpoint = iter->second.get(); 271 InterfaceEndpoint* endpoint = iter->second.get();
146 // Increment the iterator before calling UpdateEndpointStateMayRemove() 272 // Increment the iterator before calling UpdateEndpointStateMayRemove()
147 // because it may remove the corresponding value from the map. 273 // because it may remove the corresponding value from the map.
148 ++iter; 274 ++iter;
149 275
150 DCHECK(endpoint->closed()); 276 DCHECK(endpoint->closed());
151 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 277 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
214 340
215 DCHECK(ContainsKey(endpoints_, id)); 341 DCHECK(ContainsKey(endpoints_, id));
216 InterfaceEndpoint* endpoint = endpoints_[id].get(); 342 InterfaceEndpoint* endpoint = endpoints_[id].get();
217 DCHECK(!endpoint->client()); 343 DCHECK(!endpoint->client());
218 DCHECK(!endpoint->closed()); 344 DCHECK(!endpoint->closed());
219 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 345 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
220 346
221 if (!IsMasterInterfaceId(id)) 347 if (!IsMasterInterfaceId(id))
222 control_message_proxy_.NotifyPeerEndpointClosed(id); 348 control_message_proxy_.NotifyPeerEndpointClosed(id);
223 349
224 ProcessTasks(true); 350 ProcessTasks(NO_DIRECT_CLIENT_CALLS);
225 } 351 }
226 352
227 void MultiplexRouter::AttachEndpointClient( 353 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
228 const ScopedInterfaceEndpointHandle& handle, 354 const ScopedInterfaceEndpointHandle& handle,
229 InterfaceEndpointClient* client) { 355 InterfaceEndpointClient* client) {
230 const InterfaceId id = handle.id(); 356 const InterfaceId id = handle.id();
231 357
232 DCHECK(IsValidInterfaceId(id)); 358 DCHECK(IsValidInterfaceId(id));
233 DCHECK(client); 359 DCHECK(client);
234 360
235 base::AutoLock locker(lock_); 361 base::AutoLock locker(lock_);
236 DCHECK(ContainsKey(endpoints_, id)); 362 DCHECK(ContainsKey(endpoints_, id));
237 363
238 InterfaceEndpoint* endpoint = endpoints_[id].get(); 364 InterfaceEndpoint* endpoint = endpoints_[id].get();
239 DCHECK(!endpoint->client()); 365 DCHECK(!endpoint->client());
240 DCHECK(!endpoint->closed()); 366 DCHECK(!endpoint->closed());
241 367
242 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); 368 endpoint->set_task_runner(base::MessageLoop::current()->task_runner());
243 endpoint->set_client(client); 369 endpoint->set_client(client);
244 370
245 if (endpoint->peer_closed()) 371 if (endpoint->peer_closed())
246 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 372 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
247 ProcessTasks(true); 373 ProcessTasks(NO_DIRECT_CLIENT_CALLS);
374
375 return endpoint;
248 } 376 }
249 377
250 void MultiplexRouter::DetachEndpointClient( 378 void MultiplexRouter::DetachEndpointClient(
251 const ScopedInterfaceEndpointHandle& handle) { 379 const ScopedInterfaceEndpointHandle& handle) {
252 const InterfaceId id = handle.id(); 380 const InterfaceId id = handle.id();
253 381
254 DCHECK(IsValidInterfaceId(id)); 382 DCHECK(IsValidInterfaceId(id));
255 383
256 base::AutoLock locker(lock_); 384 base::AutoLock locker(lock_);
257 DCHECK(ContainsKey(endpoints_, id)); 385 DCHECK(ContainsKey(endpoints_, id));
258 386
259 InterfaceEndpoint* endpoint = endpoints_[id].get(); 387 InterfaceEndpoint* endpoint = endpoints_[id].get();
260 DCHECK(endpoint->client()); 388 DCHECK(endpoint->client());
261 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 389 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
262 DCHECK(!endpoint->closed()); 390 DCHECK(!endpoint->closed());
263 391
264 endpoint->set_task_runner(nullptr); 392 endpoint->set_task_runner(nullptr);
265 endpoint->set_client(nullptr); 393 endpoint->set_client(nullptr);
266 } 394 endpoint->ClearSyncHandleWatcher();
267
268 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
269 Message* message) {
270 message->set_interface_id(handle.id());
271 return connector_.Accept(message);
272 } 395 }
273 396
274 void MultiplexRouter::RaiseError() { 397 void MultiplexRouter::RaiseError() {
275 if (task_runner_->BelongsToCurrentThread()) { 398 if (task_runner_->BelongsToCurrentThread()) {
276 connector_.RaiseError(); 399 connector_.RaiseError();
277 } else { 400 } else {
278 task_runner_->PostTask(FROM_HERE, 401 task_runner_->PostTask(FROM_HERE,
279 base::Bind(&MultiplexRouter::RaiseError, this)); 402 base::Bind(&MultiplexRouter::RaiseError, this));
280 } 403 }
281 } 404 }
(...skipping 28 matching lines...) Expand all
310 testing_mode_ = true; 433 testing_mode_ = true;
311 connector_.set_enforce_errors_from_incoming_receiver(false); 434 connector_.set_enforce_errors_from_incoming_receiver(false);
312 } 435 }
313 436
314 bool MultiplexRouter::Accept(Message* message) { 437 bool MultiplexRouter::Accept(Message* message) {
315 DCHECK(thread_checker_.CalledOnValidThread()); 438 DCHECK(thread_checker_.CalledOnValidThread());
316 439
317 scoped_refptr<MultiplexRouter> protector(this); 440 scoped_refptr<MultiplexRouter> protector(this);
318 base::AutoLock locker(lock_); 441 base::AutoLock locker(lock_);
319 442
320 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); 443 ClientCallBehavior client_call_behavior =
444 connector_.during_sync_handle_watcher_callback()
445 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
446 : ALLOW_DIRECT_CLIENT_CALLS;
447
448 bool processed =
449 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior);
321 450
322 if (!processed) { 451 if (!processed) {
323 // Either the task queue is not empty or we cannot process the message 452 // Either the task queue is not empty or we cannot process the message
324 // directly. In both cases, there is no need to call ProcessTasks(). 453 // directly. In both cases, there is no need to call ProcessTasks().
325 tasks_.push_back(Task::CreateIncomingMessageTask(message)); 454 tasks_.push_back(Task::CreateMessageTask(message));
455 Task* task = tasks_.back().get();
456
457 if (task->message->has_flag(kMessageIsSync)) {
458 InterfaceId id = task->message->interface_id();
459 sync_message_tasks_[id].push_back(task);
460 auto iter = endpoints_.find(id);
461 if (iter != endpoints_.end())
462 iter->second->SignalSyncMessageReceived();
463 }
326 } else if (!tasks_.empty()) { 464 } else if (!tasks_.empty()) {
327 // Processing the message may result in new tasks (for error notification) 465 // Processing the message may result in new tasks (for error notification)
328 // being added to the queue. In this case, we have to attempt to process the 466 // being added to the queue. In this case, we have to attempt to process the
329 // tasks. 467 // tasks.
330 ProcessTasks(false); 468 ProcessTasks(client_call_behavior);
331 } 469 }
332 470
333 // Always return true. If we see errors during message processing, we will 471 // Always return true. If we see errors during message processing, we will
334 // explicitly call Connector::RaiseError() to disconnect the message pipe. 472 // explicitly call Connector::RaiseError() to disconnect the message pipe.
335 return true; 473 return true;
336 } 474 }
337 475
338 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { 476 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
339 lock_.AssertAcquired(); 477 lock_.AssertAcquired();
340 478
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
381 // Increment the iterator before calling UpdateEndpointStateMayRemove() 519 // Increment the iterator before calling UpdateEndpointStateMayRemove()
382 // because it may remove the corresponding value from the map. 520 // because it may remove the corresponding value from the map.
383 ++iter; 521 ++iter;
384 522
385 if (endpoint->client()) 523 if (endpoint->client())
386 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 524 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
387 525
388 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 526 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
389 } 527 }
390 528
391 ProcessTasks(false); 529 ProcessTasks(connector_.during_sync_handle_watcher_callback()
530 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
531 : ALLOW_DIRECT_CLIENT_CALLS);
392 } 532 }
393 533
394 void MultiplexRouter::ProcessTasks(bool force_async) { 534 void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) {
395 lock_.AssertAcquired(); 535 lock_.AssertAcquired();
396 536
397 if (posted_to_process_tasks_) 537 if (posted_to_process_tasks_)
398 return; 538 return;
399 539
400 while (!tasks_.empty()) { 540 while (!tasks_.empty()) {
401 scoped_ptr<Task> task(std::move(tasks_.front())); 541 scoped_ptr<Task> task(std::move(tasks_.front()));
402 tasks_.pop_front(); 542 tasks_.pop_front();
403 543
544 InterfaceId id = kInvalidInterfaceId;
545 if (task->IsMessageTask() && task->message &&
546 task->message->has_flag(kMessageIsSync)) {
547 InterfaceId id = task->message->interface_id();
548 auto& sync_message_queue = sync_message_tasks_[id];
549 DCHECK_EQ(task.get(), sync_message_queue.front());
550 sync_message_queue.pop_front();
551 }
552
404 bool processed = 553 bool processed =
405 task->IsNotifyErrorTask() 554 task->IsNotifyErrorTask()
406 ? ProcessNotifyErrorTask(task.get(), force_async) 555 ? ProcessNotifyErrorTask(task.get(), client_call_behavior)
407 : ProcessIncomingMessage(task->message.get(), force_async); 556 : ProcessIncomingMessage(task->message.get(), client_call_behavior);
408 557
409 if (!processed) { 558 if (!processed) {
410 tasks_.push_front(std::move(task)); 559 tasks_.push_front(std::move(task));
560 if (IsValidInterfaceId(id)) {
561 auto& sync_message_queue = sync_message_tasks_[id];
562 sync_message_queue.push_front(task.get());
563 }
411 break; 564 break;
565 } else {
566 if (IsValidInterfaceId(id)) {
567 auto iter = sync_message_tasks_.find(id);
568 if (iter != sync_message_tasks_.end() && iter->second.empty())
569 sync_message_tasks_.erase(iter);
570 }
412 } 571 }
413 } 572 }
414 } 573 }
415 574
416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { 575 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
576 lock_.AssertAcquired();
577
578 auto iter = sync_message_tasks_.find(id);
579 if (iter == sync_message_tasks_.end())
580 return false;
581
582 MultiplexRouter::Task* task = iter->second.front();
583 iter->second.pop_front();
584
585 DCHECK(task->IsMessageTask());
586 scoped_ptr<Message> message(std::move(task->message));
587
588 // Note: after this call, |task|, |tasks| and |iter| may be invalidated.
589 bool processed = ProcessIncomingMessage(
590 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES);
591 DCHECK(processed);
592
593 iter = sync_message_tasks_.find(id);
594 return iter != sync_message_tasks_.end() && !iter->second.empty();
595 }
596
597 bool MultiplexRouter::ProcessNotifyErrorTask(
598 Task* task,
599 ClientCallBehavior client_call_behavior) {
417 lock_.AssertAcquired(); 600 lock_.AssertAcquired();
418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 601 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
419 if (!endpoint->client()) 602 if (!endpoint->client())
420 return true; 603 return true;
421 604
422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { 605 if (!endpoint->task_runner()->BelongsToCurrentThread() ||
606 client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) {
423 MaybePostToProcessTasks(endpoint->task_runner()); 607 MaybePostToProcessTasks(endpoint->task_runner());
424 return false; 608 return false;
425 } 609 }
426 610
427 InterfaceEndpointClient* client = endpoint->client(); 611 InterfaceEndpointClient* client = endpoint->client();
428 { 612 {
429 // We must unlock before calling into |client| because it may call this 613 // We must unlock before calling into |client| because it may call this
430 // object within NotifyError(). Holding the lock will lead to deadlock. 614 // object within NotifyError(). Holding the lock will lead to deadlock.
431 // 615 //
432 // It is safe to call into |client| without the lock. Because |client| is 616 // It is safe to call into |client| without the lock. Because |client| is
433 // always accessed on the same thread, including DetachEndpointClient(). 617 // always accessed on the same thread, including DetachEndpointClient().
434 base::AutoUnlock unlocker(lock_); 618 base::AutoUnlock unlocker(lock_);
435 client->NotifyError(); 619 client->NotifyError();
436 } 620 }
437 return true; 621 return true;
438 } 622 }
439 623
440 bool MultiplexRouter::ProcessIncomingMessage(Message* message, 624 bool MultiplexRouter::ProcessIncomingMessage(
441 bool force_async) { 625 Message* message,
626 ClientCallBehavior client_call_behavior) {
442 lock_.AssertAcquired(); 627 lock_.AssertAcquired();
628
629 if (!message) {
630 // This is a sync message and has been processed during sync handle
631 // watching.
632 return true;
633 }
634
443 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 635 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
444 if (!control_message_handler_.Accept(message)) 636 if (!control_message_handler_.Accept(message))
445 RaiseErrorInNonTestingMode(); 637 RaiseErrorInNonTestingMode();
446 return true; 638 return true;
447 } 639 }
448 640
449 InterfaceId id = message->interface_id(); 641 InterfaceId id = message->interface_id();
450 DCHECK(IsValidInterfaceId(id)); 642 DCHECK(IsValidInterfaceId(id));
451 643
452 bool inserted = false; 644 bool inserted = false;
453 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 645 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
454 if (inserted) { 646 if (inserted) {
455 DCHECK(!IsMasterInterfaceId(id)); 647 DCHECK(!IsMasterInterfaceId(id));
456 648
457 // Currently, it is legitimate to receive messages for an endpoint 649 // Currently, it is legitimate to receive messages for an endpoint
458 // that is not registered. For example, the endpoint is transferred in 650 // that is not registered. For example, the endpoint is transferred in
459 // a message that is discarded. Once we add support to specify all 651 // a message that is discarded. Once we add support to specify all
460 // enclosing endpoints in message header, we should be able to remove 652 // enclosing endpoints in message header, we should be able to remove
461 // this. 653 // this.
462 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 654 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
463 655
464 control_message_proxy_.NotifyPeerEndpointClosed(id); 656 control_message_proxy_.NotifyPeerEndpointClosed(id);
465 return true; 657 return true;
466 } 658 }
467 659
468 if (endpoint->closed()) 660 if (endpoint->closed())
469 return true; 661 return true;
470 662
471 if (!endpoint->client()) { 663 if (!endpoint->client())
472 // We need to wait until a client is attached in order to dispatch further
473 // messages.
474 return false; 664 return false;
475 }
476 665
477 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { 666 bool can_direct_call =
667 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) ||
668 (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES &&
669 message->has_flag(kMessageIsSync));
670
671 if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) {
478 MaybePostToProcessTasks(endpoint->task_runner()); 672 MaybePostToProcessTasks(endpoint->task_runner());
479 return false; 673 return false;
480 } 674 }
481 675
482 InterfaceEndpointClient* client = endpoint->client(); 676 InterfaceEndpointClient* client = endpoint->client();
483 bool result = false; 677 bool result = false;
484 { 678 {
485 // We must unlock before calling into |client| because it may call this 679 // We must unlock before calling into |client| because it may call this
486 // object within HandleIncomingMessage(). Holding the lock will lead to 680 // object within HandleIncomingMessage(). Holding the lock will lead to
487 // deadlock. 681 // deadlock.
(...skipping 18 matching lines...) Expand all
506 posted_to_process_tasks_ = true; 700 posted_to_process_tasks_ = true;
507 task_runner->PostTask( 701 task_runner->PostTask(
508 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 702 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
509 } 703 }
510 704
511 void MultiplexRouter::LockAndCallProcessTasks() { 705 void MultiplexRouter::LockAndCallProcessTasks() {
512 // There is no need to hold a ref to this class in this case because this is 706 // There is no need to hold a ref to this class in this case because this is
513 // always called using base::Bind(), which holds a ref. 707 // always called using base::Bind(), which holds a ref.
514 base::AutoLock locker(lock_); 708 base::AutoLock locker(lock_);
515 posted_to_process_tasks_ = false; 709 posted_to_process_tasks_ = false;
516 ProcessTasks(false); 710 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS);
517 } 711 }
518 712
519 void MultiplexRouter::UpdateEndpointStateMayRemove( 713 void MultiplexRouter::UpdateEndpointStateMayRemove(
520 InterfaceEndpoint* endpoint, 714 InterfaceEndpoint* endpoint,
521 EndpointStateUpdateType type) { 715 EndpointStateUpdateType type) {
522 switch (type) { 716 switch (type) {
523 case ENDPOINT_CLOSED: 717 case ENDPOINT_CLOSED:
524 endpoint->set_closed(); 718 endpoint->set_closed();
525 break; 719 break;
526 case PEER_ENDPOINT_CLOSED: 720 case PEER_ENDPOINT_CLOSED:
527 endpoint->set_peer_closed(); 721 endpoint->set_peer_closed();
722 endpoint->SignalSyncMessageReceived();
528 break; 723 break;
529 } 724 }
530 if (endpoint->closed() && endpoint->peer_closed()) 725 if (endpoint->closed() && endpoint->peer_closed())
531 endpoints_.erase(endpoint->id()); 726 endpoints_.erase(endpoint->id());
532 } 727 }
533 728
534 void MultiplexRouter::RaiseErrorInNonTestingMode() { 729 void MultiplexRouter::RaiseErrorInNonTestingMode() {
535 lock_.AssertAcquired(); 730 lock_.AssertAcquired();
536 if (!testing_mode_) 731 if (!testing_mode_)
537 RaiseError(); 732 RaiseError();
(...skipping 16 matching lines...) Expand all
554 *inserted = true; 749 *inserted = true;
555 } else { 750 } else {
556 endpoint = iter->second.get(); 751 endpoint = iter->second.get();
557 } 752 }
558 753
559 return endpoint; 754 return endpoint;
560 } 755 }
561 756
562 } // namespace internal 757 } // namespace internal
563 } // namespace mojo 758 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/router.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698