OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/location.h" | 12 #include "base/location.h" |
13 #include "base/macros.h" | 13 #include "base/macros.h" |
14 #include "base/memory/ptr_util.h" | 14 #include "base/memory/ptr_util.h" |
15 #include "base/single_thread_task_runner.h" | 15 #include "base/sequenced_task_runner.h" |
16 #include "base/stl_util.h" | 16 #include "base/stl_util.h" |
| 17 #include "base/threading/sequenced_task_runner_handle.h" |
17 #include "base/threading/thread_task_runner_handle.h" | 18 #include "base/threading/thread_task_runner_handle.h" |
18 #include "mojo/public/cpp/bindings/associated_group.h" | 19 #include "mojo/public/cpp/bindings/associated_group.h" |
19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" | 20 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" |
20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" | 21 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" |
21 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" | 22 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" |
22 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | 23 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
23 | 24 |
24 namespace mojo { | 25 namespace mojo { |
25 namespace internal { | 26 namespace internal { |
26 | 27 |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
63 | 64 |
64 const base::Optional<DisconnectReason>& disconnect_reason() const { | 65 const base::Optional<DisconnectReason>& disconnect_reason() const { |
65 return disconnect_reason_; | 66 return disconnect_reason_; |
66 } | 67 } |
67 void set_disconnect_reason( | 68 void set_disconnect_reason( |
68 const base::Optional<DisconnectReason>& disconnect_reason) { | 69 const base::Optional<DisconnectReason>& disconnect_reason) { |
69 router_->AssertLockAcquired(); | 70 router_->AssertLockAcquired(); |
70 disconnect_reason_ = disconnect_reason; | 71 disconnect_reason_ = disconnect_reason; |
71 } | 72 } |
72 | 73 |
73 base::SingleThreadTaskRunner* task_runner() const { | 74 base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); } |
74 return task_runner_.get(); | |
75 } | |
76 | 75 |
77 InterfaceEndpointClient* client() const { return client_; } | 76 InterfaceEndpointClient* client() const { return client_; } |
78 | 77 |
79 void AttachClient(InterfaceEndpointClient* client, | 78 void AttachClient(InterfaceEndpointClient* client, |
80 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 79 scoped_refptr<base::SequencedTaskRunner> runner) { |
81 router_->AssertLockAcquired(); | 80 router_->AssertLockAcquired(); |
82 DCHECK(!client_); | 81 DCHECK(!client_); |
83 DCHECK(!closed_); | 82 DCHECK(!closed_); |
84 DCHECK(runner->BelongsToCurrentThread()); | 83 DCHECK(runner->RunsTasksOnCurrentThread()); |
85 | 84 |
86 task_runner_ = std::move(runner); | 85 task_runner_ = std::move(runner); |
87 client_ = client; | 86 client_ = client; |
88 } | 87 } |
89 | 88 |
90 // This method must be called on the same thread as the corresponding | 89 // This method must be called on the same thread as the corresponding |
91 // AttachClient() call. | 90 // AttachClient() call. |
92 void DetachClient() { | 91 void DetachClient() { |
93 router_->AssertLockAcquired(); | 92 router_->AssertLockAcquired(); |
94 DCHECK(client_); | 93 DCHECK(client_); |
95 DCHECK(task_runner_->BelongsToCurrentThread()); | 94 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
96 DCHECK(!closed_); | 95 DCHECK(!closed_); |
97 | 96 |
98 task_runner_ = nullptr; | 97 task_runner_ = nullptr; |
99 client_ = nullptr; | 98 client_ = nullptr; |
100 sync_watcher_.reset(); | 99 sync_watcher_.reset(); |
101 } | 100 } |
102 | 101 |
103 void SignalSyncMessageEvent() { | 102 void SignalSyncMessageEvent() { |
104 router_->AssertLockAcquired(); | 103 router_->AssertLockAcquired(); |
105 if (event_signalled_) | 104 if (event_signalled_) |
(...skipping 20 matching lines...) Expand all Loading... |
126 DCHECK_EQ(MOJO_RESULT_OK, result); | 125 DCHECK_EQ(MOJO_RESULT_OK, result); |
127 event_signalled_ = false; | 126 event_signalled_ = false; |
128 } | 127 } |
129 | 128 |
130 // --------------------------------------------------------------------------- | 129 // --------------------------------------------------------------------------- |
131 // The following public methods (i.e., InterfaceEndpointController | 130 // The following public methods (i.e., InterfaceEndpointController |
132 // implementation) are called by the client on the same thread as the | 131 // implementation) are called by the client on the same thread as the |
133 // AttachClient() call. They are called outside of the router's lock. | 132 // AttachClient() call. They are called outside of the router's lock. |
134 | 133 |
135 bool SendMessage(Message* message) override { | 134 bool SendMessage(Message* message) override { |
136 DCHECK(task_runner_->BelongsToCurrentThread()); | 135 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
137 message->set_interface_id(id_); | 136 message->set_interface_id(id_); |
138 return router_->connector_.Accept(message); | 137 return router_->connector_.Accept(message); |
139 } | 138 } |
140 | 139 |
141 void AllowWokenUpBySyncWatchOnSameThread() override { | 140 void AllowWokenUpBySyncWatchOnSameThread() override { |
142 DCHECK(task_runner_->BelongsToCurrentThread()); | 141 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
143 | 142 |
144 EnsureSyncWatcherExists(); | 143 EnsureSyncWatcherExists(); |
145 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 144 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
146 } | 145 } |
147 | 146 |
148 bool SyncWatch(const bool* should_stop) override { | 147 bool SyncWatch(const bool* should_stop) override { |
149 DCHECK(task_runner_->BelongsToCurrentThread()); | 148 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
150 | 149 |
151 EnsureSyncWatcherExists(); | 150 EnsureSyncWatcherExists(); |
152 return sync_watcher_->SyncWatch(should_stop); | 151 return sync_watcher_->SyncWatch(should_stop); |
153 } | 152 } |
154 | 153 |
155 private: | 154 private: |
156 friend class base::RefCounted<InterfaceEndpoint>; | 155 friend class base::RefCounted<InterfaceEndpoint>; |
157 | 156 |
158 ~InterfaceEndpoint() override { | 157 ~InterfaceEndpoint() override { |
159 router_->AssertLockAcquired(); | 158 router_->AssertLockAcquired(); |
160 | 159 |
161 DCHECK(!client_); | 160 DCHECK(!client_); |
162 DCHECK(closed_); | 161 DCHECK(closed_); |
163 DCHECK(peer_closed_); | 162 DCHECK(peer_closed_); |
164 DCHECK(!sync_watcher_); | 163 DCHECK(!sync_watcher_); |
165 } | 164 } |
166 | 165 |
167 void OnHandleReady(MojoResult result) { | 166 void OnHandleReady(MojoResult result) { |
168 DCHECK(task_runner_->BelongsToCurrentThread()); | 167 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
169 scoped_refptr<InterfaceEndpoint> self_protector(this); | 168 scoped_refptr<InterfaceEndpoint> self_protector(this); |
170 scoped_refptr<MultiplexRouter> router_protector(router_); | 169 scoped_refptr<MultiplexRouter> router_protector(router_); |
171 | 170 |
172 // Because we never close |sync_message_event_{sender,receiver}_| before | 171 // Because we never close |sync_message_event_{sender,receiver}_| before |
173 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. | 172 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. |
174 DCHECK_EQ(MOJO_RESULT_OK, result); | 173 DCHECK_EQ(MOJO_RESULT_OK, result); |
175 bool reset_sync_watcher = false; | 174 bool reset_sync_watcher = false; |
176 { | 175 { |
177 MayAutoLock locker(router_->lock_.get()); | 176 MayAutoLock locker(router_->lock_.get()); |
178 | 177 |
179 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); | 178 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
180 | 179 |
181 if (!more_to_process) | 180 if (!more_to_process) |
182 ResetSyncMessageSignal(); | 181 ResetSyncMessageSignal(); |
183 | 182 |
184 // Currently there are no queued sync messages and the peer has closed so | 183 // Currently there are no queued sync messages and the peer has closed so |
185 // there won't be incoming sync messages in the future. | 184 // there won't be incoming sync messages in the future. |
186 reset_sync_watcher = !more_to_process && peer_closed_; | 185 reset_sync_watcher = !more_to_process && peer_closed_; |
187 } | 186 } |
188 if (reset_sync_watcher) { | 187 if (reset_sync_watcher) { |
189 // If a SyncWatch() call (or multiple ones) of this interface endpoint is | 188 // If a SyncWatch() call (or multiple ones) of this interface endpoint is |
190 // on the call stack, resetting the sync watcher will allow it to exit | 189 // on the call stack, resetting the sync watcher will allow it to exit |
191 // when the call stack unwinds to that frame. | 190 // when the call stack unwinds to that frame. |
192 sync_watcher_.reset(); | 191 sync_watcher_.reset(); |
193 } | 192 } |
194 } | 193 } |
195 | 194 |
196 void EnsureSyncWatcherExists() { | 195 void EnsureSyncWatcherExists() { |
197 DCHECK(task_runner_->BelongsToCurrentThread()); | 196 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
198 if (sync_watcher_) | 197 if (sync_watcher_) |
199 return; | 198 return; |
200 | 199 |
201 { | 200 { |
202 MayAutoLock locker(router_->lock_.get()); | 201 MayAutoLock locker(router_->lock_.get()); |
203 EnsureEventMessagePipeExists(); | 202 EnsureEventMessagePipeExists(); |
204 | 203 |
205 auto iter = router_->sync_message_tasks_.find(id_); | 204 auto iter = router_->sync_message_tasks_.find(id_); |
206 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) | 205 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
207 SignalSyncMessageEvent(); | 206 SignalSyncMessageEvent(); |
(...skipping 25 matching lines...) Expand all Loading... |
233 // The following members are accessed under the router's lock. | 232 // The following members are accessed under the router's lock. |
234 | 233 |
235 // Whether the endpoint has been closed. | 234 // Whether the endpoint has been closed. |
236 bool closed_; | 235 bool closed_; |
237 // Whether the peer endpoint has been closed. | 236 // Whether the peer endpoint has been closed. |
238 bool peer_closed_; | 237 bool peer_closed_; |
239 | 238 |
240 base::Optional<DisconnectReason> disconnect_reason_; | 239 base::Optional<DisconnectReason> disconnect_reason_; |
241 | 240 |
242 // The task runner on which |client_|'s methods can be called. | 241 // The task runner on which |client_|'s methods can be called. |
243 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 242 scoped_refptr<base::SequencedTaskRunner> task_runner_; |
244 // Not owned. It is null if no client is attached to this endpoint. | 243 // Not owned. It is null if no client is attached to this endpoint. |
245 InterfaceEndpointClient* client_; | 244 InterfaceEndpointClient* client_; |
246 | 245 |
247 // A message pipe used as an event to signal that sync messages are available. | 246 // A message pipe used as an event to signal that sync messages are available. |
248 // The message pipe handles are initialized under the router's lock and remain | 247 // The message pipe handles are initialized under the router's lock and remain |
249 // unchanged afterwards. They may be accessed outside of the router's lock | 248 // unchanged afterwards. They may be accessed outside of the router's lock |
250 // later. | 249 // later. |
251 ScopedMessagePipeHandle sync_message_event_sender_; | 250 ScopedMessagePipeHandle sync_message_event_sender_; |
252 ScopedMessagePipeHandle sync_message_event_receiver_; | 251 ScopedMessagePipeHandle sync_message_event_receiver_; |
253 bool event_signalled_; | 252 bool event_signalled_; |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
289 Type type; | 288 Type type; |
290 | 289 |
291 private: | 290 private: |
292 explicit Task(Type in_type) : type(in_type) {} | 291 explicit Task(Type in_type) : type(in_type) {} |
293 }; | 292 }; |
294 | 293 |
295 MultiplexRouter::MultiplexRouter( | 294 MultiplexRouter::MultiplexRouter( |
296 ScopedMessagePipeHandle message_pipe, | 295 ScopedMessagePipeHandle message_pipe, |
297 Config config, | 296 Config config, |
298 bool set_interface_id_namesapce_bit, | 297 bool set_interface_id_namesapce_bit, |
299 scoped_refptr<base::SingleThreadTaskRunner> runner) | 298 scoped_refptr<base::SequencedTaskRunner> runner) |
300 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 299 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
301 task_runner_(runner), | 300 task_runner_(runner), |
302 header_validator_(nullptr), | 301 header_validator_(nullptr), |
303 filters_(this), | 302 filters_(this), |
304 connector_(std::move(message_pipe), | 303 connector_(std::move(message_pipe), |
305 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND | 304 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND |
306 : Connector::SINGLE_THREADED_SEND, | 305 : Connector::SINGLE_THREADED_SEND, |
307 std::move(runner)), | 306 std::move(runner)), |
308 lock_(config == MULTI_INTERFACE ? new base::Lock : nullptr), | 307 lock_(config == MULTI_INTERFACE ? new base::Lock : nullptr), |
309 control_message_handler_(this), | 308 control_message_handler_(this), |
310 control_message_proxy_(&connector_), | 309 control_message_proxy_(&connector_), |
311 next_interface_id_value_(1), | 310 next_interface_id_value_(1), |
312 posted_to_process_tasks_(false), | 311 posted_to_process_tasks_(false), |
313 encountered_error_(false), | 312 encountered_error_(false), |
314 paused_(false), | 313 paused_(false), |
315 testing_mode_(false) { | 314 testing_mode_(false) { |
316 DCHECK(task_runner_->BelongsToCurrentThread()); | 315 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 316 DCHECK(config == SINGLE_INTERFACE || base::ThreadTaskRunnerHandle::IsSet()); |
317 | 317 |
318 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || | 318 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || |
319 config == MULTI_INTERFACE) { | 319 config == MULTI_INTERFACE) { |
320 // Always participate in sync handle watching in multi-interface mode, | 320 // Always participate in sync handle watching in multi-interface mode, |
321 // because even if it doesn't expect sync requests during sync handle | 321 // because even if it doesn't expect sync requests during sync handle |
322 // watching, it may still need to dispatch messages to associated endpoints | 322 // watching, it may still need to dispatch messages to associated endpoints |
323 // on a different thread. | 323 // on a different thread. |
324 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 324 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
325 } | 325 } |
326 connector_.set_incoming_receiver(&filters_); | 326 connector_.set_incoming_receiver(&filters_); |
(...skipping 28 matching lines...) Expand all Loading... |
355 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 355 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
356 } else { | 356 } else { |
357 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 357 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
358 } | 358 } |
359 } | 359 } |
360 | 360 |
361 DCHECK(endpoints_.empty()); | 361 DCHECK(endpoints_.empty()); |
362 } | 362 } |
363 | 363 |
364 void MultiplexRouter::SetMasterInterfaceName(const char* name) { | 364 void MultiplexRouter::SetMasterInterfaceName(const char* name) { |
365 DCHECK(thread_checker_.CalledOnValidThread()); | 365 DCHECK(sequence_checker_.CalledOnValidSequence()); |
366 header_validator_->SetDescription( | 366 header_validator_->SetDescription( |
367 std::string(name) + " [master] MessageHeaderValidator"); | 367 std::string(name) + " [master] MessageHeaderValidator"); |
368 control_message_handler_.SetDescription( | 368 control_message_handler_.SetDescription( |
369 std::string(name) + " [master] PipeControlMessageHandler"); | 369 std::string(name) + " [master] PipeControlMessageHandler"); |
370 connector_.SetWatcherHeapProfilerTag(name); | 370 connector_.SetWatcherHeapProfilerTag(name); |
371 } | 371 } |
372 | 372 |
373 void MultiplexRouter::CreateEndpointHandlePair( | 373 void MultiplexRouter::CreateEndpointHandlePair( |
374 ScopedInterfaceEndpointHandle* local_endpoint, | 374 ScopedInterfaceEndpointHandle* local_endpoint, |
375 ScopedInterfaceEndpointHandle* remote_endpoint) { | 375 ScopedInterfaceEndpointHandle* remote_endpoint) { |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
442 MayAutoUnlock unlocker(lock_.get()); | 442 MayAutoUnlock unlocker(lock_.get()); |
443 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); | 443 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); |
444 } | 444 } |
445 | 445 |
446 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 446 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
447 } | 447 } |
448 | 448 |
449 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( | 449 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
450 const ScopedInterfaceEndpointHandle& handle, | 450 const ScopedInterfaceEndpointHandle& handle, |
451 InterfaceEndpointClient* client, | 451 InterfaceEndpointClient* client, |
452 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 452 scoped_refptr<base::SequencedTaskRunner> runner) { |
453 const InterfaceId id = handle.id(); | 453 const InterfaceId id = handle.id(); |
454 | 454 |
455 DCHECK(IsValidInterfaceId(id)); | 455 DCHECK(IsValidInterfaceId(id)); |
456 DCHECK(client); | 456 DCHECK(client); |
457 | 457 |
458 MayAutoLock locker(lock_.get()); | 458 MayAutoLock locker(lock_.get()); |
459 DCHECK(base::ContainsKey(endpoints_, id)); | 459 DCHECK(base::ContainsKey(endpoints_, id)); |
460 | 460 |
461 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 461 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
462 endpoint->AttachClient(client, std::move(runner)); | 462 endpoint->AttachClient(client, std::move(runner)); |
(...skipping 12 matching lines...) Expand all Loading... |
475 DCHECK(IsValidInterfaceId(id)); | 475 DCHECK(IsValidInterfaceId(id)); |
476 | 476 |
477 MayAutoLock locker(lock_.get()); | 477 MayAutoLock locker(lock_.get()); |
478 DCHECK(base::ContainsKey(endpoints_, id)); | 478 DCHECK(base::ContainsKey(endpoints_, id)); |
479 | 479 |
480 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 480 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
481 endpoint->DetachClient(); | 481 endpoint->DetachClient(); |
482 } | 482 } |
483 | 483 |
484 void MultiplexRouter::RaiseError() { | 484 void MultiplexRouter::RaiseError() { |
485 if (task_runner_->BelongsToCurrentThread()) { | 485 if (task_runner_->RunsTasksOnCurrentThread()) { |
486 connector_.RaiseError(); | 486 connector_.RaiseError(); |
487 } else { | 487 } else { |
488 task_runner_->PostTask(FROM_HERE, | 488 task_runner_->PostTask(FROM_HERE, |
489 base::Bind(&MultiplexRouter::RaiseError, this)); | 489 base::Bind(&MultiplexRouter::RaiseError, this)); |
490 } | 490 } |
491 } | 491 } |
492 | 492 |
493 void MultiplexRouter::CloseMessagePipe() { | 493 void MultiplexRouter::CloseMessagePipe() { |
494 DCHECK(thread_checker_.CalledOnValidThread()); | 494 DCHECK(sequence_checker_.CalledOnValidSequence()); |
495 connector_.CloseMessagePipe(); | 495 connector_.CloseMessagePipe(); |
496 // CloseMessagePipe() above won't trigger connection error handler. | 496 // CloseMessagePipe() above won't trigger connection error handler. |
497 // Explicitly call OnPipeConnectionError() so that associated endpoints will | 497 // Explicitly call OnPipeConnectionError() so that associated endpoints will |
498 // get notified. | 498 // get notified. |
499 OnPipeConnectionError(); | 499 OnPipeConnectionError(); |
500 } | 500 } |
501 | 501 |
502 void MultiplexRouter::PauseIncomingMethodCallProcessing() { | 502 void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
503 DCHECK(thread_checker_.CalledOnValidThread()); | 503 DCHECK(sequence_checker_.CalledOnValidSequence()); |
504 connector_.PauseIncomingMethodCallProcessing(); | 504 connector_.PauseIncomingMethodCallProcessing(); |
505 | 505 |
506 MayAutoLock locker(lock_.get()); | 506 MayAutoLock locker(lock_.get()); |
507 paused_ = true; | 507 paused_ = true; |
508 | 508 |
509 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) | 509 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) |
510 iter->second->ResetSyncMessageSignal(); | 510 iter->second->ResetSyncMessageSignal(); |
511 } | 511 } |
512 | 512 |
513 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { | 513 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
514 DCHECK(thread_checker_.CalledOnValidThread()); | 514 DCHECK(sequence_checker_.CalledOnValidSequence()); |
515 connector_.ResumeIncomingMethodCallProcessing(); | 515 connector_.ResumeIncomingMethodCallProcessing(); |
516 | 516 |
517 MayAutoLock locker(lock_.get()); | 517 MayAutoLock locker(lock_.get()); |
518 paused_ = false; | 518 paused_ = false; |
519 | 519 |
520 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { | 520 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { |
521 auto sync_iter = sync_message_tasks_.find(iter->first); | 521 auto sync_iter = sync_message_tasks_.find(iter->first); |
522 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) | 522 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) |
523 iter->second->SignalSyncMessageEvent(); | 523 iter->second->SignalSyncMessageEvent(); |
524 } | 524 } |
525 | 525 |
526 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 526 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
527 } | 527 } |
528 | 528 |
529 bool MultiplexRouter::HasAssociatedEndpoints() const { | 529 bool MultiplexRouter::HasAssociatedEndpoints() const { |
530 DCHECK(thread_checker_.CalledOnValidThread()); | 530 DCHECK(sequence_checker_.CalledOnValidSequence()); |
531 MayAutoLock locker(lock_.get()); | 531 MayAutoLock locker(lock_.get()); |
532 | 532 |
533 if (endpoints_.size() > 1) | 533 if (endpoints_.size() > 1) |
534 return true; | 534 return true; |
535 if (endpoints_.size() == 0) | 535 if (endpoints_.size() == 0) |
536 return false; | 536 return false; |
537 | 537 |
538 return !base::ContainsKey(endpoints_, kMasterInterfaceId); | 538 return !base::ContainsKey(endpoints_, kMasterInterfaceId); |
539 } | 539 } |
540 | 540 |
541 void MultiplexRouter::EnableTestingMode() { | 541 void MultiplexRouter::EnableTestingMode() { |
542 DCHECK(thread_checker_.CalledOnValidThread()); | 542 DCHECK(sequence_checker_.CalledOnValidSequence()); |
543 MayAutoLock locker(lock_.get()); | 543 MayAutoLock locker(lock_.get()); |
544 | 544 |
545 testing_mode_ = true; | 545 testing_mode_ = true; |
546 connector_.set_enforce_errors_from_incoming_receiver(false); | 546 connector_.set_enforce_errors_from_incoming_receiver(false); |
547 } | 547 } |
548 | 548 |
549 bool MultiplexRouter::Accept(Message* message) { | 549 bool MultiplexRouter::Accept(Message* message) { |
550 DCHECK(thread_checker_.CalledOnValidThread()); | 550 DCHECK(sequence_checker_.CalledOnValidSequence()); |
551 | 551 |
552 scoped_refptr<MultiplexRouter> protector(this); | 552 scoped_refptr<MultiplexRouter> protector(this); |
553 MayAutoLock locker(lock_.get()); | 553 MayAutoLock locker(lock_.get()); |
554 | 554 |
555 DCHECK(!paused_); | 555 DCHECK(!paused_); |
556 | 556 |
557 ClientCallBehavior client_call_behavior = | 557 ClientCallBehavior client_call_behavior = |
558 connector_.during_sync_handle_watcher_callback() | 558 connector_.during_sync_handle_watcher_callback() |
559 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 559 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
560 : ALLOW_DIRECT_CLIENT_CALLS; | 560 : ALLOW_DIRECT_CLIENT_CALLS; |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
627 DCHECK(!endpoint->closed()); | 627 DCHECK(!endpoint->closed()); |
628 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 628 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
629 | 629 |
630 MayAutoUnlock unlocker(lock_.get()); | 630 MayAutoUnlock unlocker(lock_.get()); |
631 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); | 631 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); |
632 | 632 |
633 return true; | 633 return true; |
634 } | 634 } |
635 | 635 |
636 void MultiplexRouter::OnPipeConnectionError() { | 636 void MultiplexRouter::OnPipeConnectionError() { |
637 DCHECK(thread_checker_.CalledOnValidThread()); | 637 DCHECK(sequence_checker_.CalledOnValidSequence()); |
638 | 638 |
639 scoped_refptr<MultiplexRouter> protector(this); | 639 scoped_refptr<MultiplexRouter> protector(this); |
640 MayAutoLock locker(lock_.get()); | 640 MayAutoLock locker(lock_.get()); |
641 | 641 |
642 encountered_error_ = true; | 642 encountered_error_ = true; |
643 | 643 |
644 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 644 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
645 InterfaceEndpoint* endpoint = iter->second.get(); | 645 InterfaceEndpoint* endpoint = iter->second.get(); |
646 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 646 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
647 // because it may remove the corresponding value from the map. | 647 // because it may remove the corresponding value from the map. |
648 ++iter; | 648 ++iter; |
649 | 649 |
650 if (endpoint->client()) | 650 if (endpoint->client()) |
651 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 651 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
652 | 652 |
653 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 653 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
654 } | 654 } |
655 | 655 |
656 ProcessTasks(connector_.during_sync_handle_watcher_callback() | 656 ProcessTasks(connector_.during_sync_handle_watcher_callback() |
657 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 657 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
658 : ALLOW_DIRECT_CLIENT_CALLS, | 658 : ALLOW_DIRECT_CLIENT_CALLS, |
659 connector_.task_runner()); | 659 connector_.task_runner()); |
660 } | 660 } |
661 | 661 |
662 void MultiplexRouter::ProcessTasks( | 662 void MultiplexRouter::ProcessTasks( |
663 ClientCallBehavior client_call_behavior, | 663 ClientCallBehavior client_call_behavior, |
664 base::SingleThreadTaskRunner* current_task_runner) { | 664 base::SequencedTaskRunner* current_task_runner) { |
665 AssertLockAcquired(); | 665 AssertLockAcquired(); |
666 | 666 |
667 if (posted_to_process_tasks_) | 667 if (posted_to_process_tasks_) |
668 return; | 668 return; |
669 | 669 |
670 while (!tasks_.empty() && !paused_) { | 670 while (!tasks_.empty() && !paused_) { |
671 std::unique_ptr<Task> task(std::move(tasks_.front())); | 671 std::unique_ptr<Task> task(std::move(tasks_.front())); |
672 tasks_.pop_front(); | 672 tasks_.pop_front(); |
673 | 673 |
674 InterfaceId id = kInvalidInterfaceId; | 674 InterfaceId id = kInvalidInterfaceId; |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
734 sync_message_tasks_.erase(iter); | 734 sync_message_tasks_.erase(iter); |
735 return false; | 735 return false; |
736 } | 736 } |
737 | 737 |
738 return true; | 738 return true; |
739 } | 739 } |
740 | 740 |
741 bool MultiplexRouter::ProcessNotifyErrorTask( | 741 bool MultiplexRouter::ProcessNotifyErrorTask( |
742 Task* task, | 742 Task* task, |
743 ClientCallBehavior client_call_behavior, | 743 ClientCallBehavior client_call_behavior, |
744 base::SingleThreadTaskRunner* current_task_runner) { | 744 base::SequencedTaskRunner* current_task_runner) { |
745 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 745 DCHECK(!current_task_runner || |
| 746 current_task_runner->RunsTasksOnCurrentThread()); |
746 DCHECK(!paused_); | 747 DCHECK(!paused_); |
747 | 748 |
748 AssertLockAcquired(); | 749 AssertLockAcquired(); |
749 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 750 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
750 if (!endpoint->client()) | 751 if (!endpoint->client()) |
751 return true; | 752 return true; |
752 | 753 |
753 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || | 754 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || |
754 endpoint->task_runner() != current_task_runner) { | 755 endpoint->task_runner() != current_task_runner) { |
755 MaybePostToProcessTasks(endpoint->task_runner()); | 756 MaybePostToProcessTasks(endpoint->task_runner()); |
756 return false; | 757 return false; |
757 } | 758 } |
758 | 759 |
759 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 760 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
760 | 761 |
761 InterfaceEndpointClient* client = endpoint->client(); | 762 InterfaceEndpointClient* client = endpoint->client(); |
762 base::Optional<DisconnectReason> disconnect_reason( | 763 base::Optional<DisconnectReason> disconnect_reason( |
763 endpoint->disconnect_reason()); | 764 endpoint->disconnect_reason()); |
764 | 765 |
765 { | 766 { |
766 // We must unlock before calling into |client| because it may call this | 767 // We must unlock before calling into |client| because it may call this |
767 // object within NotifyError(). Holding the lock will lead to deadlock. | 768 // object within NotifyError(). Holding the lock will lead to deadlock. |
768 // | 769 // |
769 // It is safe to call into |client| without the lock. Because |client| is | 770 // It is safe to call into |client| without the lock. Because |client| is |
770 // always accessed on the same thread, including DetachEndpointClient(). | 771 // always accessed on the same thread, including DetachEndpointClient(). |
771 MayAutoUnlock unlocker(lock_.get()); | 772 MayAutoUnlock unlocker(lock_.get()); |
772 client->NotifyError(disconnect_reason); | 773 client->NotifyError(disconnect_reason); |
773 } | 774 } |
774 return true; | 775 return true; |
775 } | 776 } |
776 | 777 |
777 bool MultiplexRouter::ProcessIncomingMessage( | 778 bool MultiplexRouter::ProcessIncomingMessage( |
778 Message* message, | 779 Message* message, |
779 ClientCallBehavior client_call_behavior, | 780 ClientCallBehavior client_call_behavior, |
780 base::SingleThreadTaskRunner* current_task_runner) { | 781 base::SequencedTaskRunner* current_task_runner) { |
781 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 782 DCHECK(!current_task_runner || |
| 783 current_task_runner->RunsTasksOnCurrentThread()); |
782 DCHECK(!paused_); | 784 DCHECK(!paused_); |
783 DCHECK(message); | 785 DCHECK(message); |
784 AssertLockAcquired(); | 786 AssertLockAcquired(); |
785 | 787 |
786 if (message->IsNull()) { | 788 if (message->IsNull()) { |
787 // This is a sync message and has been processed during sync handle | 789 // This is a sync message and has been processed during sync handle |
788 // watching. | 790 // watching. |
789 return true; | 791 return true; |
790 } | 792 } |
791 | 793 |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
827 | 829 |
828 if (!endpoint->client()) { | 830 if (!endpoint->client()) { |
829 // We need to wait until a client is attached in order to dispatch further | 831 // We need to wait until a client is attached in order to dispatch further |
830 // messages. | 832 // messages. |
831 return false; | 833 return false; |
832 } | 834 } |
833 | 835 |
834 bool can_direct_call; | 836 bool can_direct_call; |
835 if (message->has_flag(Message::kFlagIsSync)) { | 837 if (message->has_flag(Message::kFlagIsSync)) { |
836 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && | 838 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && |
837 endpoint->task_runner()->BelongsToCurrentThread(); | 839 endpoint->task_runner()->RunsTasksOnCurrentThread(); |
838 } else { | 840 } else { |
839 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && | 841 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && |
840 endpoint->task_runner() == current_task_runner; | 842 endpoint->task_runner() == current_task_runner; |
841 } | 843 } |
842 | 844 |
843 if (!can_direct_call) { | 845 if (!can_direct_call) { |
844 MaybePostToProcessTasks(endpoint->task_runner()); | 846 MaybePostToProcessTasks(endpoint->task_runner()); |
845 return false; | 847 return false; |
846 } | 848 } |
847 | 849 |
848 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 850 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
849 | 851 |
850 InterfaceEndpointClient* client = endpoint->client(); | 852 InterfaceEndpointClient* client = endpoint->client(); |
851 bool result = false; | 853 bool result = false; |
852 { | 854 { |
853 // We must unlock before calling into |client| because it may call this | 855 // We must unlock before calling into |client| because it may call this |
854 // object within HandleIncomingMessage(). Holding the lock will lead to | 856 // object within HandleIncomingMessage(). Holding the lock will lead to |
855 // deadlock. | 857 // deadlock. |
856 // | 858 // |
857 // It is safe to call into |client| without the lock. Because |client| is | 859 // It is safe to call into |client| without the lock. Because |client| is |
858 // always accessed on the same thread, including DetachEndpointClient(). | 860 // always accessed on the same thread, including DetachEndpointClient(). |
859 MayAutoUnlock unlocker(lock_.get()); | 861 MayAutoUnlock unlocker(lock_.get()); |
860 result = client->HandleIncomingMessage(message); | 862 result = client->HandleIncomingMessage(message); |
861 } | 863 } |
862 if (!result) | 864 if (!result) |
863 RaiseErrorInNonTestingMode(); | 865 RaiseErrorInNonTestingMode(); |
864 | 866 |
865 return true; | 867 return true; |
866 } | 868 } |
867 | 869 |
868 void MultiplexRouter::MaybePostToProcessTasks( | 870 void MultiplexRouter::MaybePostToProcessTasks( |
869 base::SingleThreadTaskRunner* task_runner) { | 871 base::SequencedTaskRunner* task_runner) { |
870 AssertLockAcquired(); | 872 AssertLockAcquired(); |
871 if (posted_to_process_tasks_) | 873 if (posted_to_process_tasks_) |
872 return; | 874 return; |
873 | 875 |
874 posted_to_process_tasks_ = true; | 876 posted_to_process_tasks_ = true; |
875 posted_to_task_runner_ = task_runner; | 877 posted_to_task_runner_ = task_runner; |
876 task_runner->PostTask( | 878 task_runner->PostTask( |
877 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 879 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
878 } | 880 } |
879 | 881 |
880 void MultiplexRouter::LockAndCallProcessTasks() { | 882 void MultiplexRouter::LockAndCallProcessTasks() { |
881 // There is no need to hold a ref to this class in this case because this is | 883 // There is no need to hold a ref to this class in this case because this is |
882 // always called using base::Bind(), which holds a ref. | 884 // always called using base::Bind(), which holds a ref. |
883 MayAutoLock locker(lock_.get()); | 885 MayAutoLock locker(lock_.get()); |
884 posted_to_process_tasks_ = false; | 886 posted_to_process_tasks_ = false; |
885 scoped_refptr<base::SingleThreadTaskRunner> runner( | 887 scoped_refptr<base::SequencedTaskRunner> runner( |
886 std::move(posted_to_task_runner_)); | 888 std::move(posted_to_task_runner_)); |
887 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); | 889 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); |
888 } | 890 } |
889 | 891 |
890 void MultiplexRouter::UpdateEndpointStateMayRemove( | 892 void MultiplexRouter::UpdateEndpointStateMayRemove( |
891 InterfaceEndpoint* endpoint, | 893 InterfaceEndpoint* endpoint, |
892 EndpointStateUpdateType type) { | 894 EndpointStateUpdateType type) { |
893 switch (type) { | 895 switch (type) { |
894 case ENDPOINT_CLOSED: | 896 case ENDPOINT_CLOSED: |
895 endpoint->set_closed(); | 897 endpoint->set_closed(); |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
935 | 937 |
936 void MultiplexRouter::AssertLockAcquired() { | 938 void MultiplexRouter::AssertLockAcquired() { |
937 #if DCHECK_IS_ON() | 939 #if DCHECK_IS_ON() |
938 if (lock_) | 940 if (lock_) |
939 lock_->AssertAcquired(); | 941 lock_->AssertAcquired(); |
940 #endif | 942 #endif |
941 } | 943 } |
942 | 944 |
943 } // namespace internal | 945 } // namespace internal |
944 } // namespace mojo | 946 } // namespace mojo |
OLD | NEW |