| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/location.h" | 12 #include "base/location.h" |
| 13 #include "base/macros.h" | 13 #include "base/macros.h" |
| 14 #include "base/memory/ptr_util.h" | 14 #include "base/memory/ptr_util.h" |
| 15 #include "base/single_thread_task_runner.h" | 15 #include "base/sequenced_task_runner.h" |
| 16 #include "base/stl_util.h" | 16 #include "base/stl_util.h" |
| 17 #include "base/threading/sequenced_task_runner_handle.h" |
| 17 #include "base/threading/thread_task_runner_handle.h" | 18 #include "base/threading/thread_task_runner_handle.h" |
| 18 #include "mojo/public/cpp/bindings/associated_group.h" | 19 #include "mojo/public/cpp/bindings/associated_group.h" |
| 19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" | 20 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" |
| 20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" | 21 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" |
| 21 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" | 22 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" |
| 22 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | 23 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
| 23 | 24 |
| 24 namespace mojo { | 25 namespace mojo { |
| 25 namespace internal { | 26 namespace internal { |
| 26 | 27 |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 63 | 64 |
| 64 const base::Optional<DisconnectReason>& disconnect_reason() const { | 65 const base::Optional<DisconnectReason>& disconnect_reason() const { |
| 65 return disconnect_reason_; | 66 return disconnect_reason_; |
| 66 } | 67 } |
| 67 void set_disconnect_reason( | 68 void set_disconnect_reason( |
| 68 const base::Optional<DisconnectReason>& disconnect_reason) { | 69 const base::Optional<DisconnectReason>& disconnect_reason) { |
| 69 router_->AssertLockAcquired(); | 70 router_->AssertLockAcquired(); |
| 70 disconnect_reason_ = disconnect_reason; | 71 disconnect_reason_ = disconnect_reason; |
| 71 } | 72 } |
| 72 | 73 |
| 73 base::SingleThreadTaskRunner* task_runner() const { | 74 base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); } |
| 74 return task_runner_.get(); | |
| 75 } | |
| 76 | 75 |
| 77 InterfaceEndpointClient* client() const { return client_; } | 76 InterfaceEndpointClient* client() const { return client_; } |
| 78 | 77 |
| 79 void AttachClient(InterfaceEndpointClient* client, | 78 void AttachClient(InterfaceEndpointClient* client, |
| 80 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 79 scoped_refptr<base::SequencedTaskRunner> runner) { |
| 81 router_->AssertLockAcquired(); | 80 router_->AssertLockAcquired(); |
| 82 DCHECK(!client_); | 81 DCHECK(!client_); |
| 83 DCHECK(!closed_); | 82 DCHECK(!closed_); |
| 84 DCHECK(runner->BelongsToCurrentThread()); | 83 DCHECK(runner->RunsTasksOnCurrentThread()); |
| 85 | 84 |
| 86 task_runner_ = std::move(runner); | 85 task_runner_ = std::move(runner); |
| 87 client_ = client; | 86 client_ = client; |
| 88 } | 87 } |
| 89 | 88 |
| 90 // This method must be called on the same thread as the corresponding | 89 // This method must be called on the same thread as the corresponding |
| 91 // AttachClient() call. | 90 // AttachClient() call. |
| 92 void DetachClient() { | 91 void DetachClient() { |
| 93 router_->AssertLockAcquired(); | 92 router_->AssertLockAcquired(); |
| 94 DCHECK(client_); | 93 DCHECK(client_); |
| 95 DCHECK(task_runner_->BelongsToCurrentThread()); | 94 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 96 DCHECK(!closed_); | 95 DCHECK(!closed_); |
| 97 | 96 |
| 98 task_runner_ = nullptr; | 97 task_runner_ = nullptr; |
| 99 client_ = nullptr; | 98 client_ = nullptr; |
| 100 sync_watcher_.reset(); | 99 sync_watcher_.reset(); |
| 101 } | 100 } |
| 102 | 101 |
| 103 void SignalSyncMessageEvent() { | 102 void SignalSyncMessageEvent() { |
| 104 router_->AssertLockAcquired(); | 103 router_->AssertLockAcquired(); |
| 105 if (event_signalled_) | 104 if (event_signalled_) |
| (...skipping 20 matching lines...) Expand all Loading... |
| 126 DCHECK_EQ(MOJO_RESULT_OK, result); | 125 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 127 event_signalled_ = false; | 126 event_signalled_ = false; |
| 128 } | 127 } |
| 129 | 128 |
| 130 // --------------------------------------------------------------------------- | 129 // --------------------------------------------------------------------------- |
| 131 // The following public methods (i.e., InterfaceEndpointController | 130 // The following public methods (i.e., InterfaceEndpointController |
| 132 // implementation) are called by the client on the same thread as the | 131 // implementation) are called by the client on the same thread as the |
| 133 // AttachClient() call. They are called outside of the router's lock. | 132 // AttachClient() call. They are called outside of the router's lock. |
| 134 | 133 |
| 135 bool SendMessage(Message* message) override { | 134 bool SendMessage(Message* message) override { |
| 136 DCHECK(task_runner_->BelongsToCurrentThread()); | 135 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 137 message->set_interface_id(id_); | 136 message->set_interface_id(id_); |
| 138 return router_->connector_.Accept(message); | 137 return router_->connector_.Accept(message); |
| 139 } | 138 } |
| 140 | 139 |
| 141 void AllowWokenUpBySyncWatchOnSameThread() override { | 140 void AllowWokenUpBySyncWatchOnSameThread() override { |
| 142 DCHECK(task_runner_->BelongsToCurrentThread()); | 141 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 143 | 142 |
| 144 EnsureSyncWatcherExists(); | 143 EnsureSyncWatcherExists(); |
| 145 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 144 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 146 } | 145 } |
| 147 | 146 |
| 148 bool SyncWatch(const bool* should_stop) override { | 147 bool SyncWatch(const bool* should_stop) override { |
| 149 DCHECK(task_runner_->BelongsToCurrentThread()); | 148 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 150 | 149 |
| 151 EnsureSyncWatcherExists(); | 150 EnsureSyncWatcherExists(); |
| 152 return sync_watcher_->SyncWatch(should_stop); | 151 return sync_watcher_->SyncWatch(should_stop); |
| 153 } | 152 } |
| 154 | 153 |
| 155 private: | 154 private: |
| 156 friend class base::RefCounted<InterfaceEndpoint>; | 155 friend class base::RefCounted<InterfaceEndpoint>; |
| 157 | 156 |
| 158 ~InterfaceEndpoint() override { | 157 ~InterfaceEndpoint() override { |
| 159 router_->AssertLockAcquired(); | 158 router_->AssertLockAcquired(); |
| 160 | 159 |
| 161 DCHECK(!client_); | 160 DCHECK(!client_); |
| 162 DCHECK(closed_); | 161 DCHECK(closed_); |
| 163 DCHECK(peer_closed_); | 162 DCHECK(peer_closed_); |
| 164 DCHECK(!sync_watcher_); | 163 DCHECK(!sync_watcher_); |
| 165 } | 164 } |
| 166 | 165 |
| 167 void OnHandleReady(MojoResult result) { | 166 void OnHandleReady(MojoResult result) { |
| 168 DCHECK(task_runner_->BelongsToCurrentThread()); | 167 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 169 scoped_refptr<InterfaceEndpoint> self_protector(this); | 168 scoped_refptr<InterfaceEndpoint> self_protector(this); |
| 170 scoped_refptr<MultiplexRouter> router_protector(router_); | 169 scoped_refptr<MultiplexRouter> router_protector(router_); |
| 171 | 170 |
| 172 // Because we never close |sync_message_event_{sender,receiver}_| before | 171 // Because we never close |sync_message_event_{sender,receiver}_| before |
| 173 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. | 172 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. |
| 174 DCHECK_EQ(MOJO_RESULT_OK, result); | 173 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 175 bool reset_sync_watcher = false; | 174 bool reset_sync_watcher = false; |
| 176 { | 175 { |
| 177 MayAutoLock locker(router_->lock_.get()); | 176 MayAutoLock locker(router_->lock_.get()); |
| 178 | 177 |
| 179 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); | 178 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
| 180 | 179 |
| 181 if (!more_to_process) | 180 if (!more_to_process) |
| 182 ResetSyncMessageSignal(); | 181 ResetSyncMessageSignal(); |
| 183 | 182 |
| 184 // Currently there are no queued sync messages and the peer has closed so | 183 // Currently there are no queued sync messages and the peer has closed so |
| 185 // there won't be incoming sync messages in the future. | 184 // there won't be incoming sync messages in the future. |
| 186 reset_sync_watcher = !more_to_process && peer_closed_; | 185 reset_sync_watcher = !more_to_process && peer_closed_; |
| 187 } | 186 } |
| 188 if (reset_sync_watcher) { | 187 if (reset_sync_watcher) { |
| 189 // If a SyncWatch() call (or multiple ones) of this interface endpoint is | 188 // If a SyncWatch() call (or multiple ones) of this interface endpoint is |
| 190 // on the call stack, resetting the sync watcher will allow it to exit | 189 // on the call stack, resetting the sync watcher will allow it to exit |
| 191 // when the call stack unwinds to that frame. | 190 // when the call stack unwinds to that frame. |
| 192 sync_watcher_.reset(); | 191 sync_watcher_.reset(); |
| 193 } | 192 } |
| 194 } | 193 } |
| 195 | 194 |
| 196 void EnsureSyncWatcherExists() { | 195 void EnsureSyncWatcherExists() { |
| 197 DCHECK(task_runner_->BelongsToCurrentThread()); | 196 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 198 if (sync_watcher_) | 197 if (sync_watcher_) |
| 199 return; | 198 return; |
| 200 | 199 |
| 201 { | 200 { |
| 202 MayAutoLock locker(router_->lock_.get()); | 201 MayAutoLock locker(router_->lock_.get()); |
| 203 EnsureEventMessagePipeExists(); | 202 EnsureEventMessagePipeExists(); |
| 204 | 203 |
| 205 auto iter = router_->sync_message_tasks_.find(id_); | 204 auto iter = router_->sync_message_tasks_.find(id_); |
| 206 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) | 205 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
| 207 SignalSyncMessageEvent(); | 206 SignalSyncMessageEvent(); |
| (...skipping 25 matching lines...) Expand all Loading... |
| 233 // The following members are accessed under the router's lock. | 232 // The following members are accessed under the router's lock. |
| 234 | 233 |
| 235 // Whether the endpoint has been closed. | 234 // Whether the endpoint has been closed. |
| 236 bool closed_; | 235 bool closed_; |
| 237 // Whether the peer endpoint has been closed. | 236 // Whether the peer endpoint has been closed. |
| 238 bool peer_closed_; | 237 bool peer_closed_; |
| 239 | 238 |
| 240 base::Optional<DisconnectReason> disconnect_reason_; | 239 base::Optional<DisconnectReason> disconnect_reason_; |
| 241 | 240 |
| 242 // The task runner on which |client_|'s methods can be called. | 241 // The task runner on which |client_|'s methods can be called. |
| 243 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 242 scoped_refptr<base::SequencedTaskRunner> task_runner_; |
| 244 // Not owned. It is null if no client is attached to this endpoint. | 243 // Not owned. It is null if no client is attached to this endpoint. |
| 245 InterfaceEndpointClient* client_; | 244 InterfaceEndpointClient* client_; |
| 246 | 245 |
| 247 // A message pipe used as an event to signal that sync messages are available. | 246 // A message pipe used as an event to signal that sync messages are available. |
| 248 // The message pipe handles are initialized under the router's lock and remain | 247 // The message pipe handles are initialized under the router's lock and remain |
| 249 // unchanged afterwards. They may be accessed outside of the router's lock | 248 // unchanged afterwards. They may be accessed outside of the router's lock |
| 250 // later. | 249 // later. |
| 251 ScopedMessagePipeHandle sync_message_event_sender_; | 250 ScopedMessagePipeHandle sync_message_event_sender_; |
| 252 ScopedMessagePipeHandle sync_message_event_receiver_; | 251 ScopedMessagePipeHandle sync_message_event_receiver_; |
| 253 bool event_signalled_; | 252 bool event_signalled_; |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 289 Type type; | 288 Type type; |
| 290 | 289 |
| 291 private: | 290 private: |
| 292 explicit Task(Type in_type) : type(in_type) {} | 291 explicit Task(Type in_type) : type(in_type) {} |
| 293 }; | 292 }; |
| 294 | 293 |
| 295 MultiplexRouter::MultiplexRouter( | 294 MultiplexRouter::MultiplexRouter( |
| 296 ScopedMessagePipeHandle message_pipe, | 295 ScopedMessagePipeHandle message_pipe, |
| 297 Config config, | 296 Config config, |
| 298 bool set_interface_id_namesapce_bit, | 297 bool set_interface_id_namesapce_bit, |
| 299 scoped_refptr<base::SingleThreadTaskRunner> runner) | 298 scoped_refptr<base::SequencedTaskRunner> runner) |
| 300 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 299 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 301 task_runner_(runner), | 300 task_runner_(runner), |
| 302 header_validator_(nullptr), | 301 header_validator_(nullptr), |
| 303 filters_(this), | 302 filters_(this), |
| 304 connector_(std::move(message_pipe), | 303 connector_(std::move(message_pipe), |
| 305 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND | 304 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND |
| 306 : Connector::SINGLE_THREADED_SEND, | 305 : Connector::SINGLE_THREADED_SEND, |
| 307 std::move(runner)), | 306 std::move(runner)), |
| 308 lock_(config == MULTI_INTERFACE ? new base::Lock : nullptr), | 307 lock_(config == MULTI_INTERFACE ? new base::Lock : nullptr), |
| 309 control_message_handler_(this), | 308 control_message_handler_(this), |
| 310 control_message_proxy_(&connector_), | 309 control_message_proxy_(&connector_), |
| 311 next_interface_id_value_(1), | 310 next_interface_id_value_(1), |
| 312 posted_to_process_tasks_(false), | 311 posted_to_process_tasks_(false), |
| 313 encountered_error_(false), | 312 encountered_error_(false), |
| 314 paused_(false), | 313 paused_(false), |
| 315 testing_mode_(false) { | 314 testing_mode_(false) { |
| 316 DCHECK(task_runner_->BelongsToCurrentThread()); | 315 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 316 DCHECK(config == SINGLE_INTERFACE || base::ThreadTaskRunnerHandle::IsSet()); |
| 317 | 317 |
| 318 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || | 318 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || |
| 319 config == MULTI_INTERFACE) { | 319 config == MULTI_INTERFACE) { |
| 320 // Always participate in sync handle watching in multi-interface mode, | 320 // Always participate in sync handle watching in multi-interface mode, |
| 321 // because even if it doesn't expect sync requests during sync handle | 321 // because even if it doesn't expect sync requests during sync handle |
| 322 // watching, it may still need to dispatch messages to associated endpoints | 322 // watching, it may still need to dispatch messages to associated endpoints |
| 323 // on a different thread. | 323 // on a different thread. |
| 324 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 324 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
| 325 } | 325 } |
| 326 connector_.set_incoming_receiver(&filters_); | 326 connector_.set_incoming_receiver(&filters_); |
| (...skipping 28 matching lines...) Expand all Loading... |
| 355 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 355 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 356 } else { | 356 } else { |
| 357 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 357 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 358 } | 358 } |
| 359 } | 359 } |
| 360 | 360 |
| 361 DCHECK(endpoints_.empty()); | 361 DCHECK(endpoints_.empty()); |
| 362 } | 362 } |
| 363 | 363 |
| 364 void MultiplexRouter::SetMasterInterfaceName(const char* name) { | 364 void MultiplexRouter::SetMasterInterfaceName(const char* name) { |
| 365 DCHECK(thread_checker_.CalledOnValidThread()); | 365 DCHECK(sequence_checker_.CalledOnValidSequence()); |
| 366 header_validator_->SetDescription( | 366 header_validator_->SetDescription( |
| 367 std::string(name) + " [master] MessageHeaderValidator"); | 367 std::string(name) + " [master] MessageHeaderValidator"); |
| 368 control_message_handler_.SetDescription( | 368 control_message_handler_.SetDescription( |
| 369 std::string(name) + " [master] PipeControlMessageHandler"); | 369 std::string(name) + " [master] PipeControlMessageHandler"); |
| 370 connector_.SetWatcherHeapProfilerTag(name); | 370 connector_.SetWatcherHeapProfilerTag(name); |
| 371 } | 371 } |
| 372 | 372 |
| 373 void MultiplexRouter::CreateEndpointHandlePair( | 373 void MultiplexRouter::CreateEndpointHandlePair( |
| 374 ScopedInterfaceEndpointHandle* local_endpoint, | 374 ScopedInterfaceEndpointHandle* local_endpoint, |
| 375 ScopedInterfaceEndpointHandle* remote_endpoint) { | 375 ScopedInterfaceEndpointHandle* remote_endpoint) { |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 442 MayAutoUnlock unlocker(lock_.get()); | 442 MayAutoUnlock unlocker(lock_.get()); |
| 443 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); | 443 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); |
| 444 } | 444 } |
| 445 | 445 |
| 446 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 446 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 447 } | 447 } |
| 448 | 448 |
| 449 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( | 449 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
| 450 const ScopedInterfaceEndpointHandle& handle, | 450 const ScopedInterfaceEndpointHandle& handle, |
| 451 InterfaceEndpointClient* client, | 451 InterfaceEndpointClient* client, |
| 452 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 452 scoped_refptr<base::SequencedTaskRunner> runner) { |
| 453 const InterfaceId id = handle.id(); | 453 const InterfaceId id = handle.id(); |
| 454 | 454 |
| 455 DCHECK(IsValidInterfaceId(id)); | 455 DCHECK(IsValidInterfaceId(id)); |
| 456 DCHECK(client); | 456 DCHECK(client); |
| 457 | 457 |
| 458 MayAutoLock locker(lock_.get()); | 458 MayAutoLock locker(lock_.get()); |
| 459 DCHECK(base::ContainsKey(endpoints_, id)); | 459 DCHECK(base::ContainsKey(endpoints_, id)); |
| 460 | 460 |
| 461 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 461 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 462 endpoint->AttachClient(client, std::move(runner)); | 462 endpoint->AttachClient(client, std::move(runner)); |
| (...skipping 12 matching lines...) Expand all Loading... |
| 475 DCHECK(IsValidInterfaceId(id)); | 475 DCHECK(IsValidInterfaceId(id)); |
| 476 | 476 |
| 477 MayAutoLock locker(lock_.get()); | 477 MayAutoLock locker(lock_.get()); |
| 478 DCHECK(base::ContainsKey(endpoints_, id)); | 478 DCHECK(base::ContainsKey(endpoints_, id)); |
| 479 | 479 |
| 480 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 480 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 481 endpoint->DetachClient(); | 481 endpoint->DetachClient(); |
| 482 } | 482 } |
| 483 | 483 |
| 484 void MultiplexRouter::RaiseError() { | 484 void MultiplexRouter::RaiseError() { |
| 485 if (task_runner_->BelongsToCurrentThread()) { | 485 if (task_runner_->RunsTasksOnCurrentThread()) { |
| 486 connector_.RaiseError(); | 486 connector_.RaiseError(); |
| 487 } else { | 487 } else { |
| 488 task_runner_->PostTask(FROM_HERE, | 488 task_runner_->PostTask(FROM_HERE, |
| 489 base::Bind(&MultiplexRouter::RaiseError, this)); | 489 base::Bind(&MultiplexRouter::RaiseError, this)); |
| 490 } | 490 } |
| 491 } | 491 } |
| 492 | 492 |
| 493 void MultiplexRouter::CloseMessagePipe() { | 493 void MultiplexRouter::CloseMessagePipe() { |
| 494 DCHECK(thread_checker_.CalledOnValidThread()); | 494 DCHECK(sequence_checker_.CalledOnValidSequence()); |
| 495 connector_.CloseMessagePipe(); | 495 connector_.CloseMessagePipe(); |
| 496 // CloseMessagePipe() above won't trigger connection error handler. | 496 // CloseMessagePipe() above won't trigger connection error handler. |
| 497 // Explicitly call OnPipeConnectionError() so that associated endpoints will | 497 // Explicitly call OnPipeConnectionError() so that associated endpoints will |
| 498 // get notified. | 498 // get notified. |
| 499 OnPipeConnectionError(); | 499 OnPipeConnectionError(); |
| 500 } | 500 } |
| 501 | 501 |
| 502 void MultiplexRouter::PauseIncomingMethodCallProcessing() { | 502 void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
| 503 DCHECK(thread_checker_.CalledOnValidThread()); | 503 DCHECK(sequence_checker_.CalledOnValidSequence()); |
| 504 connector_.PauseIncomingMethodCallProcessing(); | 504 connector_.PauseIncomingMethodCallProcessing(); |
| 505 | 505 |
| 506 MayAutoLock locker(lock_.get()); | 506 MayAutoLock locker(lock_.get()); |
| 507 paused_ = true; | 507 paused_ = true; |
| 508 | 508 |
| 509 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) | 509 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) |
| 510 iter->second->ResetSyncMessageSignal(); | 510 iter->second->ResetSyncMessageSignal(); |
| 511 } | 511 } |
| 512 | 512 |
| 513 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { | 513 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
| 514 DCHECK(thread_checker_.CalledOnValidThread()); | 514 DCHECK(sequence_checker_.CalledOnValidSequence()); |
| 515 connector_.ResumeIncomingMethodCallProcessing(); | 515 connector_.ResumeIncomingMethodCallProcessing(); |
| 516 | 516 |
| 517 MayAutoLock locker(lock_.get()); | 517 MayAutoLock locker(lock_.get()); |
| 518 paused_ = false; | 518 paused_ = false; |
| 519 | 519 |
| 520 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { | 520 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { |
| 521 auto sync_iter = sync_message_tasks_.find(iter->first); | 521 auto sync_iter = sync_message_tasks_.find(iter->first); |
| 522 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) | 522 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) |
| 523 iter->second->SignalSyncMessageEvent(); | 523 iter->second->SignalSyncMessageEvent(); |
| 524 } | 524 } |
| 525 | 525 |
| 526 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 526 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 527 } | 527 } |
| 528 | 528 |
| 529 bool MultiplexRouter::HasAssociatedEndpoints() const { | 529 bool MultiplexRouter::HasAssociatedEndpoints() const { |
| 530 DCHECK(thread_checker_.CalledOnValidThread()); | 530 DCHECK(sequence_checker_.CalledOnValidSequence()); |
| 531 MayAutoLock locker(lock_.get()); | 531 MayAutoLock locker(lock_.get()); |
| 532 | 532 |
| 533 if (endpoints_.size() > 1) | 533 if (endpoints_.size() > 1) |
| 534 return true; | 534 return true; |
| 535 if (endpoints_.size() == 0) | 535 if (endpoints_.size() == 0) |
| 536 return false; | 536 return false; |
| 537 | 537 |
| 538 return !base::ContainsKey(endpoints_, kMasterInterfaceId); | 538 return !base::ContainsKey(endpoints_, kMasterInterfaceId); |
| 539 } | 539 } |
| 540 | 540 |
| 541 void MultiplexRouter::EnableTestingMode() { | 541 void MultiplexRouter::EnableTestingMode() { |
| 542 DCHECK(thread_checker_.CalledOnValidThread()); | 542 DCHECK(sequence_checker_.CalledOnValidSequence()); |
| 543 MayAutoLock locker(lock_.get()); | 543 MayAutoLock locker(lock_.get()); |
| 544 | 544 |
| 545 testing_mode_ = true; | 545 testing_mode_ = true; |
| 546 connector_.set_enforce_errors_from_incoming_receiver(false); | 546 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 547 } | 547 } |
| 548 | 548 |
| 549 bool MultiplexRouter::Accept(Message* message) { | 549 bool MultiplexRouter::Accept(Message* message) { |
| 550 DCHECK(thread_checker_.CalledOnValidThread()); | 550 DCHECK(sequence_checker_.CalledOnValidSequence()); |
| 551 | 551 |
| 552 scoped_refptr<MultiplexRouter> protector(this); | 552 scoped_refptr<MultiplexRouter> protector(this); |
| 553 MayAutoLock locker(lock_.get()); | 553 MayAutoLock locker(lock_.get()); |
| 554 | 554 |
| 555 DCHECK(!paused_); | 555 DCHECK(!paused_); |
| 556 | 556 |
| 557 ClientCallBehavior client_call_behavior = | 557 ClientCallBehavior client_call_behavior = |
| 558 connector_.during_sync_handle_watcher_callback() | 558 connector_.during_sync_handle_watcher_callback() |
| 559 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 559 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 560 : ALLOW_DIRECT_CLIENT_CALLS; | 560 : ALLOW_DIRECT_CLIENT_CALLS; |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 627 DCHECK(!endpoint->closed()); | 627 DCHECK(!endpoint->closed()); |
| 628 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 628 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 629 | 629 |
| 630 MayAutoUnlock unlocker(lock_.get()); | 630 MayAutoUnlock unlocker(lock_.get()); |
| 631 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); | 631 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); |
| 632 | 632 |
| 633 return true; | 633 return true; |
| 634 } | 634 } |
| 635 | 635 |
| 636 void MultiplexRouter::OnPipeConnectionError() { | 636 void MultiplexRouter::OnPipeConnectionError() { |
| 637 DCHECK(thread_checker_.CalledOnValidThread()); | 637 DCHECK(sequence_checker_.CalledOnValidSequence()); |
| 638 | 638 |
| 639 scoped_refptr<MultiplexRouter> protector(this); | 639 scoped_refptr<MultiplexRouter> protector(this); |
| 640 MayAutoLock locker(lock_.get()); | 640 MayAutoLock locker(lock_.get()); |
| 641 | 641 |
| 642 encountered_error_ = true; | 642 encountered_error_ = true; |
| 643 | 643 |
| 644 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 644 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 645 InterfaceEndpoint* endpoint = iter->second.get(); | 645 InterfaceEndpoint* endpoint = iter->second.get(); |
| 646 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 646 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 647 // because it may remove the corresponding value from the map. | 647 // because it may remove the corresponding value from the map. |
| 648 ++iter; | 648 ++iter; |
| 649 | 649 |
| 650 if (endpoint->client()) | 650 if (endpoint->client()) |
| 651 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 651 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 652 | 652 |
| 653 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 653 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 654 } | 654 } |
| 655 | 655 |
| 656 ProcessTasks(connector_.during_sync_handle_watcher_callback() | 656 ProcessTasks(connector_.during_sync_handle_watcher_callback() |
| 657 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 657 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 658 : ALLOW_DIRECT_CLIENT_CALLS, | 658 : ALLOW_DIRECT_CLIENT_CALLS, |
| 659 connector_.task_runner()); | 659 connector_.task_runner()); |
| 660 } | 660 } |
| 661 | 661 |
| 662 void MultiplexRouter::ProcessTasks( | 662 void MultiplexRouter::ProcessTasks( |
| 663 ClientCallBehavior client_call_behavior, | 663 ClientCallBehavior client_call_behavior, |
| 664 base::SingleThreadTaskRunner* current_task_runner) { | 664 base::SequencedTaskRunner* current_task_runner) { |
| 665 AssertLockAcquired(); | 665 AssertLockAcquired(); |
| 666 | 666 |
| 667 if (posted_to_process_tasks_) | 667 if (posted_to_process_tasks_) |
| 668 return; | 668 return; |
| 669 | 669 |
| 670 while (!tasks_.empty() && !paused_) { | 670 while (!tasks_.empty() && !paused_) { |
| 671 std::unique_ptr<Task> task(std::move(tasks_.front())); | 671 std::unique_ptr<Task> task(std::move(tasks_.front())); |
| 672 tasks_.pop_front(); | 672 tasks_.pop_front(); |
| 673 | 673 |
| 674 InterfaceId id = kInvalidInterfaceId; | 674 InterfaceId id = kInvalidInterfaceId; |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 734 sync_message_tasks_.erase(iter); | 734 sync_message_tasks_.erase(iter); |
| 735 return false; | 735 return false; |
| 736 } | 736 } |
| 737 | 737 |
| 738 return true; | 738 return true; |
| 739 } | 739 } |
| 740 | 740 |
| 741 bool MultiplexRouter::ProcessNotifyErrorTask( | 741 bool MultiplexRouter::ProcessNotifyErrorTask( |
| 742 Task* task, | 742 Task* task, |
| 743 ClientCallBehavior client_call_behavior, | 743 ClientCallBehavior client_call_behavior, |
| 744 base::SingleThreadTaskRunner* current_task_runner) { | 744 base::SequencedTaskRunner* current_task_runner) { |
| 745 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 745 DCHECK(!current_task_runner || |
| 746 current_task_runner->RunsTasksOnCurrentThread()); |
| 746 DCHECK(!paused_); | 747 DCHECK(!paused_); |
| 747 | 748 |
| 748 AssertLockAcquired(); | 749 AssertLockAcquired(); |
| 749 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 750 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 750 if (!endpoint->client()) | 751 if (!endpoint->client()) |
| 751 return true; | 752 return true; |
| 752 | 753 |
| 753 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || | 754 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || |
| 754 endpoint->task_runner() != current_task_runner) { | 755 endpoint->task_runner() != current_task_runner) { |
| 755 MaybePostToProcessTasks(endpoint->task_runner()); | 756 MaybePostToProcessTasks(endpoint->task_runner()); |
| 756 return false; | 757 return false; |
| 757 } | 758 } |
| 758 | 759 |
| 759 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 760 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
| 760 | 761 |
| 761 InterfaceEndpointClient* client = endpoint->client(); | 762 InterfaceEndpointClient* client = endpoint->client(); |
| 762 base::Optional<DisconnectReason> disconnect_reason( | 763 base::Optional<DisconnectReason> disconnect_reason( |
| 763 endpoint->disconnect_reason()); | 764 endpoint->disconnect_reason()); |
| 764 | 765 |
| 765 { | 766 { |
| 766 // We must unlock before calling into |client| because it may call this | 767 // We must unlock before calling into |client| because it may call this |
| 767 // object within NotifyError(). Holding the lock will lead to deadlock. | 768 // object within NotifyError(). Holding the lock will lead to deadlock. |
| 768 // | 769 // |
| 769 // It is safe to call into |client| without the lock. Because |client| is | 770 // It is safe to call into |client| without the lock. Because |client| is |
| 770 // always accessed on the same thread, including DetachEndpointClient(). | 771 // always accessed on the same thread, including DetachEndpointClient(). |
| 771 MayAutoUnlock unlocker(lock_.get()); | 772 MayAutoUnlock unlocker(lock_.get()); |
| 772 client->NotifyError(disconnect_reason); | 773 client->NotifyError(disconnect_reason); |
| 773 } | 774 } |
| 774 return true; | 775 return true; |
| 775 } | 776 } |
| 776 | 777 |
| 777 bool MultiplexRouter::ProcessIncomingMessage( | 778 bool MultiplexRouter::ProcessIncomingMessage( |
| 778 Message* message, | 779 Message* message, |
| 779 ClientCallBehavior client_call_behavior, | 780 ClientCallBehavior client_call_behavior, |
| 780 base::SingleThreadTaskRunner* current_task_runner) { | 781 base::SequencedTaskRunner* current_task_runner) { |
| 781 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 782 DCHECK(!current_task_runner || |
| 783 current_task_runner->RunsTasksOnCurrentThread()); |
| 782 DCHECK(!paused_); | 784 DCHECK(!paused_); |
| 783 DCHECK(message); | 785 DCHECK(message); |
| 784 AssertLockAcquired(); | 786 AssertLockAcquired(); |
| 785 | 787 |
| 786 if (message->IsNull()) { | 788 if (message->IsNull()) { |
| 787 // This is a sync message and has been processed during sync handle | 789 // This is a sync message and has been processed during sync handle |
| 788 // watching. | 790 // watching. |
| 789 return true; | 791 return true; |
| 790 } | 792 } |
| 791 | 793 |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 827 | 829 |
| 828 if (!endpoint->client()) { | 830 if (!endpoint->client()) { |
| 829 // We need to wait until a client is attached in order to dispatch further | 831 // We need to wait until a client is attached in order to dispatch further |
| 830 // messages. | 832 // messages. |
| 831 return false; | 833 return false; |
| 832 } | 834 } |
| 833 | 835 |
| 834 bool can_direct_call; | 836 bool can_direct_call; |
| 835 if (message->has_flag(Message::kFlagIsSync)) { | 837 if (message->has_flag(Message::kFlagIsSync)) { |
| 836 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && | 838 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && |
| 837 endpoint->task_runner()->BelongsToCurrentThread(); | 839 endpoint->task_runner()->RunsTasksOnCurrentThread(); |
| 838 } else { | 840 } else { |
| 839 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && | 841 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && |
| 840 endpoint->task_runner() == current_task_runner; | 842 endpoint->task_runner() == current_task_runner; |
| 841 } | 843 } |
| 842 | 844 |
| 843 if (!can_direct_call) { | 845 if (!can_direct_call) { |
| 844 MaybePostToProcessTasks(endpoint->task_runner()); | 846 MaybePostToProcessTasks(endpoint->task_runner()); |
| 845 return false; | 847 return false; |
| 846 } | 848 } |
| 847 | 849 |
| 848 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 850 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
| 849 | 851 |
| 850 InterfaceEndpointClient* client = endpoint->client(); | 852 InterfaceEndpointClient* client = endpoint->client(); |
| 851 bool result = false; | 853 bool result = false; |
| 852 { | 854 { |
| 853 // We must unlock before calling into |client| because it may call this | 855 // We must unlock before calling into |client| because it may call this |
| 854 // object within HandleIncomingMessage(). Holding the lock will lead to | 856 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 855 // deadlock. | 857 // deadlock. |
| 856 // | 858 // |
| 857 // It is safe to call into |client| without the lock. Because |client| is | 859 // It is safe to call into |client| without the lock. Because |client| is |
| 858 // always accessed on the same thread, including DetachEndpointClient(). | 860 // always accessed on the same thread, including DetachEndpointClient(). |
| 859 MayAutoUnlock unlocker(lock_.get()); | 861 MayAutoUnlock unlocker(lock_.get()); |
| 860 result = client->HandleIncomingMessage(message); | 862 result = client->HandleIncomingMessage(message); |
| 861 } | 863 } |
| 862 if (!result) | 864 if (!result) |
| 863 RaiseErrorInNonTestingMode(); | 865 RaiseErrorInNonTestingMode(); |
| 864 | 866 |
| 865 return true; | 867 return true; |
| 866 } | 868 } |
| 867 | 869 |
| 868 void MultiplexRouter::MaybePostToProcessTasks( | 870 void MultiplexRouter::MaybePostToProcessTasks( |
| 869 base::SingleThreadTaskRunner* task_runner) { | 871 base::SequencedTaskRunner* task_runner) { |
| 870 AssertLockAcquired(); | 872 AssertLockAcquired(); |
| 871 if (posted_to_process_tasks_) | 873 if (posted_to_process_tasks_) |
| 872 return; | 874 return; |
| 873 | 875 |
| 874 posted_to_process_tasks_ = true; | 876 posted_to_process_tasks_ = true; |
| 875 posted_to_task_runner_ = task_runner; | 877 posted_to_task_runner_ = task_runner; |
| 876 task_runner->PostTask( | 878 task_runner->PostTask( |
| 877 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 879 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 878 } | 880 } |
| 879 | 881 |
| 880 void MultiplexRouter::LockAndCallProcessTasks() { | 882 void MultiplexRouter::LockAndCallProcessTasks() { |
| 881 // There is no need to hold a ref to this class in this case because this is | 883 // There is no need to hold a ref to this class in this case because this is |
| 882 // always called using base::Bind(), which holds a ref. | 884 // always called using base::Bind(), which holds a ref. |
| 883 MayAutoLock locker(lock_.get()); | 885 MayAutoLock locker(lock_.get()); |
| 884 posted_to_process_tasks_ = false; | 886 posted_to_process_tasks_ = false; |
| 885 scoped_refptr<base::SingleThreadTaskRunner> runner( | 887 scoped_refptr<base::SequencedTaskRunner> runner( |
| 886 std::move(posted_to_task_runner_)); | 888 std::move(posted_to_task_runner_)); |
| 887 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); | 889 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); |
| 888 } | 890 } |
| 889 | 891 |
| 890 void MultiplexRouter::UpdateEndpointStateMayRemove( | 892 void MultiplexRouter::UpdateEndpointStateMayRemove( |
| 891 InterfaceEndpoint* endpoint, | 893 InterfaceEndpoint* endpoint, |
| 892 EndpointStateUpdateType type) { | 894 EndpointStateUpdateType type) { |
| 893 switch (type) { | 895 switch (type) { |
| 894 case ENDPOINT_CLOSED: | 896 case ENDPOINT_CLOSED: |
| 895 endpoint->set_closed(); | 897 endpoint->set_closed(); |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 935 | 937 |
| 936 void MultiplexRouter::AssertLockAcquired() { | 938 void MultiplexRouter::AssertLockAcquired() { |
| 937 #if DCHECK_IS_ON() | 939 #if DCHECK_IS_ON() |
| 938 if (lock_) | 940 if (lock_) |
| 939 lock_->AssertAcquired(); | 941 lock_->AssertAcquired(); |
| 940 #endif | 942 #endif |
| 941 } | 943 } |
| 942 | 944 |
| 943 } // namespace internal | 945 } // namespace internal |
| 944 } // namespace mojo | 946 } // namespace mojo |
| OLD | NEW |