| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/location.h" | 12 #include "base/location.h" |
| 13 #include "base/macros.h" | 13 #include "base/macros.h" |
| 14 #include "base/memory/ptr_util.h" | 14 #include "base/memory/ptr_util.h" |
| 15 #include "base/single_thread_task_runner.h" | 15 #include "base/single_thread_task_runner.h" |
| 16 #include "base/stl_util.h" | 16 #include "base/stl_util.h" |
| 17 #include "base/threading/thread_task_runner_handle.h" | 17 #include "base/threading/thread_task_runner_handle.h" |
| 18 #include "mojo/public/cpp/bindings/associated_group.h" | 18 #include "mojo/public/cpp/bindings/associated_group.h" |
| 19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" | 19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" |
| 20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" | 20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" |
| 21 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" |
| 21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | 22 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
| 22 | 23 |
| 23 namespace mojo { | 24 namespace mojo { |
| 24 namespace internal { | 25 namespace internal { |
| 25 | 26 |
| 26 // InterfaceEndpoint stores the information of an interface endpoint registered | 27 // InterfaceEndpoint stores the information of an interface endpoint registered |
| 27 // with the router. | 28 // with the router. |
| 28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to | 29 // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
| 29 // this object. | 30 // this object. |
| 30 class MultiplexRouter::InterfaceEndpoint | 31 class MultiplexRouter::InterfaceEndpoint |
| (...skipping 12 matching lines...) Expand all Loading... |
| 43 // The following public methods are safe to call from any threads without | 44 // The following public methods are safe to call from any threads without |
| 44 // locking. | 45 // locking. |
| 45 | 46 |
| 46 InterfaceId id() const { return id_; } | 47 InterfaceId id() const { return id_; } |
| 47 | 48 |
| 48 // --------------------------------------------------------------------------- | 49 // --------------------------------------------------------------------------- |
| 49 // The following public methods are called under the router's lock. | 50 // The following public methods are called under the router's lock. |
| 50 | 51 |
| 51 bool closed() const { return closed_; } | 52 bool closed() const { return closed_; } |
| 52 void set_closed() { | 53 void set_closed() { |
| 53 router_->lock_.AssertAcquired(); | 54 router_->AssertLockAcquired(); |
| 54 closed_ = true; | 55 closed_ = true; |
| 55 } | 56 } |
| 56 | 57 |
| 57 bool peer_closed() const { return peer_closed_; } | 58 bool peer_closed() const { return peer_closed_; } |
| 58 void set_peer_closed() { | 59 void set_peer_closed() { |
| 59 router_->lock_.AssertAcquired(); | 60 router_->AssertLockAcquired(); |
| 60 peer_closed_ = true; | 61 peer_closed_ = true; |
| 61 } | 62 } |
| 62 | 63 |
| 63 base::SingleThreadTaskRunner* task_runner() const { | 64 base::SingleThreadTaskRunner* task_runner() const { |
| 64 return task_runner_.get(); | 65 return task_runner_.get(); |
| 65 } | 66 } |
| 66 | 67 |
| 67 InterfaceEndpointClient* client() const { return client_; } | 68 InterfaceEndpointClient* client() const { return client_; } |
| 68 | 69 |
| 69 void AttachClient(InterfaceEndpointClient* client, | 70 void AttachClient(InterfaceEndpointClient* client, |
| 70 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 71 scoped_refptr<base::SingleThreadTaskRunner> runner) { |
| 71 router_->lock_.AssertAcquired(); | 72 router_->AssertLockAcquired(); |
| 72 DCHECK(!client_); | 73 DCHECK(!client_); |
| 73 DCHECK(!closed_); | 74 DCHECK(!closed_); |
| 74 DCHECK(runner->BelongsToCurrentThread()); | 75 DCHECK(runner->BelongsToCurrentThread()); |
| 75 | 76 |
| 76 task_runner_ = std::move(runner); | 77 task_runner_ = std::move(runner); |
| 77 client_ = client; | 78 client_ = client; |
| 78 } | 79 } |
| 79 | 80 |
| 80 // This method must be called on the same thread as the corresponding | 81 // This method must be called on the same thread as the corresponding |
| 81 // AttachClient() call. | 82 // AttachClient() call. |
| 82 void DetachClient() { | 83 void DetachClient() { |
| 83 router_->lock_.AssertAcquired(); | 84 router_->AssertLockAcquired(); |
| 84 DCHECK(client_); | 85 DCHECK(client_); |
| 85 DCHECK(task_runner_->BelongsToCurrentThread()); | 86 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 86 DCHECK(!closed_); | 87 DCHECK(!closed_); |
| 87 | 88 |
| 88 task_runner_ = nullptr; | 89 task_runner_ = nullptr; |
| 89 client_ = nullptr; | 90 client_ = nullptr; |
| 90 sync_watcher_.reset(); | 91 sync_watcher_.reset(); |
| 91 } | 92 } |
| 92 | 93 |
| 93 void SignalSyncMessageEvent() { | 94 void SignalSyncMessageEvent() { |
| 94 router_->lock_.AssertAcquired(); | 95 router_->AssertLockAcquired(); |
| 95 if (event_signalled_) | 96 if (event_signalled_) |
| 96 return; | 97 return; |
| 97 | 98 |
| 98 EnsureEventMessagePipeExists(); | 99 EnsureEventMessagePipeExists(); |
| 99 event_signalled_ = true; | 100 event_signalled_ = true; |
| 100 MojoResult result = | 101 MojoResult result = |
| 101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, | 102 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, |
| 102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); | 103 0, MOJO_WRITE_MESSAGE_FLAG_NONE); |
| 103 DCHECK_EQ(MOJO_RESULT_OK, result); | 104 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 104 } | 105 } |
| 105 | 106 |
| 106 void ResetSyncMessageSignal() { | 107 void ResetSyncMessageSignal() { |
| 107 router_->lock_.AssertAcquired(); | 108 router_->AssertLockAcquired(); |
| 108 | 109 |
| 109 if (!event_signalled_) | 110 if (!event_signalled_) |
| 110 return; | 111 return; |
| 111 | 112 |
| 112 DCHECK(sync_message_event_receiver_.is_valid()); | 113 DCHECK(sync_message_event_receiver_.is_valid()); |
| 113 MojoResult result = | 114 MojoResult result = |
| 114 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, | 115 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, |
| 115 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | 116 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| 116 DCHECK_EQ(MOJO_RESULT_OK, result); | 117 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 117 event_signalled_ = false; | 118 event_signalled_ = false; |
| (...skipping 21 matching lines...) Expand all Loading... |
| 139 DCHECK(task_runner_->BelongsToCurrentThread()); | 140 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 140 | 141 |
| 141 EnsureSyncWatcherExists(); | 142 EnsureSyncWatcherExists(); |
| 142 return sync_watcher_->SyncWatch(should_stop); | 143 return sync_watcher_->SyncWatch(should_stop); |
| 143 } | 144 } |
| 144 | 145 |
| 145 private: | 146 private: |
| 146 friend class base::RefCounted<InterfaceEndpoint>; | 147 friend class base::RefCounted<InterfaceEndpoint>; |
| 147 | 148 |
| 148 ~InterfaceEndpoint() override { | 149 ~InterfaceEndpoint() override { |
| 149 router_->lock_.AssertAcquired(); | 150 router_->AssertLockAcquired(); |
| 150 | 151 |
| 151 DCHECK(!client_); | 152 DCHECK(!client_); |
| 152 DCHECK(closed_); | 153 DCHECK(closed_); |
| 153 DCHECK(peer_closed_); | 154 DCHECK(peer_closed_); |
| 154 DCHECK(!sync_watcher_); | 155 DCHECK(!sync_watcher_); |
| 155 } | 156 } |
| 156 | 157 |
| 157 void OnHandleReady(MojoResult result) { | 158 void OnHandleReady(MojoResult result) { |
| 158 DCHECK(task_runner_->BelongsToCurrentThread()); | 159 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 159 scoped_refptr<InterfaceEndpoint> self_protector(this); | 160 scoped_refptr<InterfaceEndpoint> self_protector(this); |
| 160 scoped_refptr<MultiplexRouter> router_protector(router_); | 161 scoped_refptr<MultiplexRouter> router_protector(router_); |
| 161 | 162 |
| 162 // Because we never close |sync_message_event_{sender,receiver}_| before | 163 // Because we never close |sync_message_event_{sender,receiver}_| before |
| 163 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. | 164 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. |
| 164 DCHECK_EQ(MOJO_RESULT_OK, result); | 165 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 165 bool reset_sync_watcher = false; | 166 bool reset_sync_watcher = false; |
| 166 { | 167 { |
| 167 base::AutoLock locker(router_->lock_); | 168 MayAutoLock locker(router_->lock_.get()); |
| 168 | 169 |
| 169 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); | 170 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
| 170 | 171 |
| 171 if (!more_to_process) | 172 if (!more_to_process) |
| 172 ResetSyncMessageSignal(); | 173 ResetSyncMessageSignal(); |
| 173 | 174 |
| 174 // Currently there are no queued sync messages and the peer has closed so | 175 // Currently there are no queued sync messages and the peer has closed so |
| 175 // there won't be incoming sync messages in the future. | 176 // there won't be incoming sync messages in the future. |
| 176 reset_sync_watcher = !more_to_process && peer_closed_; | 177 reset_sync_watcher = !more_to_process && peer_closed_; |
| 177 } | 178 } |
| 178 if (reset_sync_watcher) { | 179 if (reset_sync_watcher) { |
| 179 // If a SyncWatch() call (or multiple ones) of this interface endpoint is | 180 // If a SyncWatch() call (or multiple ones) of this interface endpoint is |
| 180 // on the call stack, resetting the sync watcher will allow it to exit | 181 // on the call stack, resetting the sync watcher will allow it to exit |
| 181 // when the call stack unwinds to that frame. | 182 // when the call stack unwinds to that frame. |
| 182 sync_watcher_.reset(); | 183 sync_watcher_.reset(); |
| 183 } | 184 } |
| 184 } | 185 } |
| 185 | 186 |
| 186 void EnsureSyncWatcherExists() { | 187 void EnsureSyncWatcherExists() { |
| 187 DCHECK(task_runner_->BelongsToCurrentThread()); | 188 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 188 if (sync_watcher_) | 189 if (sync_watcher_) |
| 189 return; | 190 return; |
| 190 | 191 |
| 191 { | 192 { |
| 192 base::AutoLock locker(router_->lock_); | 193 MayAutoLock locker(router_->lock_.get()); |
| 193 EnsureEventMessagePipeExists(); | 194 EnsureEventMessagePipeExists(); |
| 194 | 195 |
| 195 auto iter = router_->sync_message_tasks_.find(id_); | 196 auto iter = router_->sync_message_tasks_.find(id_); |
| 196 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) | 197 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
| 197 SignalSyncMessageEvent(); | 198 SignalSyncMessageEvent(); |
| 198 } | 199 } |
| 199 | 200 |
| 200 sync_watcher_.reset(new SyncHandleWatcher( | 201 sync_watcher_.reset(new SyncHandleWatcher( |
| 201 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 202 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 202 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); | 203 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); |
| 203 } | 204 } |
| 204 | 205 |
| 205 void EnsureEventMessagePipeExists() { | 206 void EnsureEventMessagePipeExists() { |
| 206 router_->lock_.AssertAcquired(); | 207 router_->AssertLockAcquired(); |
| 207 | 208 |
| 208 if (sync_message_event_receiver_.is_valid()) | 209 if (sync_message_event_receiver_.is_valid()) |
| 209 return; | 210 return; |
| 210 | 211 |
| 211 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, | 212 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, |
| 212 &sync_message_event_receiver_); | 213 &sync_message_event_receiver_); |
| 213 DCHECK_EQ(MOJO_RESULT_OK, result); | 214 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 214 } | 215 } |
| 215 | 216 |
| 216 // --------------------------------------------------------------------------- | 217 // --------------------------------------------------------------------------- |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 274 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | 275 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
| 275 | 276 |
| 276 enum Type { MESSAGE, NOTIFY_ERROR }; | 277 enum Type { MESSAGE, NOTIFY_ERROR }; |
| 277 Type type; | 278 Type type; |
| 278 | 279 |
| 279 private: | 280 private: |
| 280 explicit Task(Type in_type) : type(in_type) {} | 281 explicit Task(Type in_type) : type(in_type) {} |
| 281 }; | 282 }; |
| 282 | 283 |
| 283 MultiplexRouter::MultiplexRouter( | 284 MultiplexRouter::MultiplexRouter( |
| 285 ScopedMessagePipeHandle message_pipe, |
| 286 Config config, |
| 284 bool set_interface_id_namesapce_bit, | 287 bool set_interface_id_namesapce_bit, |
| 285 ScopedMessagePipeHandle message_pipe, | |
| 286 scoped_refptr<base::SingleThreadTaskRunner> runner) | 288 scoped_refptr<base::SingleThreadTaskRunner> runner) |
| 287 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 289 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 288 task_runner_(runner), | 290 task_runner_(runner), |
| 289 header_validator_(nullptr), | 291 header_validator_(nullptr), |
| 290 filters_(this), | 292 filters_(this), |
| 291 connector_(std::move(message_pipe), | 293 connector_(std::move(message_pipe), |
| 292 Connector::MULTI_THREADED_SEND, | 294 config == SINGLE_INTERFACE ? Connector::SINGLE_THREADED_SEND |
| 295 : Connector::MULTI_THREADED_SEND, |
| 293 std::move(runner)), | 296 std::move(runner)), |
| 297 lock_(config == SINGLE_INTERFACE ? nullptr : new base::Lock), |
| 294 control_message_handler_(this), | 298 control_message_handler_(this), |
| 295 control_message_proxy_(&connector_), | 299 control_message_proxy_(&connector_), |
| 296 next_interface_id_value_(1), | 300 next_interface_id_value_(1), |
| 297 posted_to_process_tasks_(false), | 301 posted_to_process_tasks_(false), |
| 298 encountered_error_(false), | 302 encountered_error_(false), |
| 299 paused_(false), | 303 paused_(false), |
| 300 testing_mode_(false) { | 304 testing_mode_(false) { |
| 301 DCHECK(task_runner_->BelongsToCurrentThread()); | 305 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 302 // Always participate in sync handle watching, because even if it doesn't | 306 // Always participate in sync handle watching, because even if it doesn't |
| 303 // expect sync requests during sync handle watching, it may still need to | 307 // expect sync requests during sync handle watching, it may still need to |
| 304 // dispatch messages to associated endpoints on a different thread. | 308 // dispatch messages to associated endpoints on a different thread. |
| 305 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 309 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
| 306 connector_.set_incoming_receiver(&filters_); | 310 connector_.set_incoming_receiver(&filters_); |
| 307 connector_.set_connection_error_handler( | 311 connector_.set_connection_error_handler( |
| 308 base::Bind(&MultiplexRouter::OnPipeConnectionError, | 312 base::Bind(&MultiplexRouter::OnPipeConnectionError, |
| 309 base::Unretained(this))); | 313 base::Unretained(this))); |
| 310 | 314 |
| 311 std::unique_ptr<MessageHeaderValidator> header_validator = | 315 std::unique_ptr<MessageHeaderValidator> header_validator = |
| 312 base::MakeUnique<MessageHeaderValidator>(); | 316 base::MakeUnique<MessageHeaderValidator>(); |
| 313 header_validator_ = header_validator.get(); | 317 header_validator_ = header_validator.get(); |
| 314 filters_.Append(std::move(header_validator)); | 318 filters_.Append(std::move(header_validator)); |
| 315 } | 319 } |
| 316 | 320 |
| 317 MultiplexRouter::~MultiplexRouter() { | 321 MultiplexRouter::~MultiplexRouter() { |
| 318 base::AutoLock locker(lock_); | 322 MayAutoLock locker(lock_.get()); |
| 319 | 323 |
| 320 sync_message_tasks_.clear(); | 324 sync_message_tasks_.clear(); |
| 321 tasks_.clear(); | 325 tasks_.clear(); |
| 322 | 326 |
| 323 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 327 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 324 InterfaceEndpoint* endpoint = iter->second.get(); | 328 InterfaceEndpoint* endpoint = iter->second.get(); |
| 325 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 329 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 326 // because it may remove the corresponding value from the map. | 330 // because it may remove the corresponding value from the map. |
| 327 ++iter; | 331 ++iter; |
| 328 | 332 |
| 329 DCHECK(endpoint->closed()); | 333 DCHECK(endpoint->closed()); |
| 330 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 334 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 331 } | 335 } |
| 332 | 336 |
| 333 DCHECK(endpoints_.empty()); | 337 DCHECK(endpoints_.empty()); |
| 334 } | 338 } |
| 335 | 339 |
| 336 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) { | 340 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) { |
| 337 DCHECK(thread_checker_.CalledOnValidThread()); | 341 DCHECK(thread_checker_.CalledOnValidThread()); |
| 338 header_validator_->SetDescription(name + " [master] MessageHeaderValidator"); | 342 header_validator_->SetDescription(name + " [master] MessageHeaderValidator"); |
| 339 control_message_handler_.SetDescription( | 343 control_message_handler_.SetDescription( |
| 340 name + " [master] PipeControlMessageHandler"); | 344 name + " [master] PipeControlMessageHandler"); |
| 341 } | 345 } |
| 342 | 346 |
| 343 void MultiplexRouter::CreateEndpointHandlePair( | 347 void MultiplexRouter::CreateEndpointHandlePair( |
| 344 ScopedInterfaceEndpointHandle* local_endpoint, | 348 ScopedInterfaceEndpointHandle* local_endpoint, |
| 345 ScopedInterfaceEndpointHandle* remote_endpoint) { | 349 ScopedInterfaceEndpointHandle* remote_endpoint) { |
| 346 base::AutoLock locker(lock_); | 350 MayAutoLock locker(lock_.get()); |
| 347 uint32_t id = 0; | 351 uint32_t id = 0; |
| 348 do { | 352 do { |
| 349 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) | 353 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) |
| 350 next_interface_id_value_ = 1; | 354 next_interface_id_value_ = 1; |
| 351 id = next_interface_id_value_++; | 355 id = next_interface_id_value_++; |
| 352 if (set_interface_id_namespace_bit_) | 356 if (set_interface_id_namespace_bit_) |
| 353 id |= kInterfaceIdNamespaceMask; | 357 id |= kInterfaceIdNamespaceMask; |
| 354 } while (base::ContainsKey(endpoints_, id)); | 358 } while (base::ContainsKey(endpoints_, id)); |
| 355 | 359 |
| 356 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); | 360 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| 357 endpoints_[id] = endpoint; | 361 endpoints_[id] = endpoint; |
| 358 if (encountered_error_) | 362 if (encountered_error_) |
| 359 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 363 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 360 | 364 |
| 361 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); | 365 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); |
| 362 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); | 366 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); |
| 363 } | 367 } |
| 364 | 368 |
| 365 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( | 369 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
| 366 InterfaceId id) { | 370 InterfaceId id) { |
| 367 if (!IsValidInterfaceId(id)) | 371 if (!IsValidInterfaceId(id)) |
| 368 return ScopedInterfaceEndpointHandle(); | 372 return ScopedInterfaceEndpointHandle(); |
| 369 | 373 |
| 370 base::AutoLock locker(lock_); | 374 MayAutoLock locker(lock_.get()); |
| 371 bool inserted = false; | 375 bool inserted = false; |
| 372 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 376 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 373 if (inserted) { | 377 if (inserted) { |
| 374 if (encountered_error_) | 378 if (encountered_error_) |
| 375 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 379 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 376 } else { | 380 } else { |
| 377 // If the endpoint already exist, it is because we have received a | 381 // If the endpoint already exist, it is because we have received a |
| 378 // notification that the peer endpoint has closed. | 382 // notification that the peer endpoint has closed. |
| 379 CHECK(!endpoint->closed()); | 383 CHECK(!endpoint->closed()); |
| 380 CHECK(endpoint->peer_closed()); | 384 CHECK(endpoint->peer_closed()); |
| 381 } | 385 } |
| 382 return CreateScopedInterfaceEndpointHandle(id, true); | 386 return CreateScopedInterfaceEndpointHandle(id, true); |
| 383 } | 387 } |
| 384 | 388 |
| 385 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { | 389 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
| 386 if (!IsValidInterfaceId(id)) | 390 if (!IsValidInterfaceId(id)) |
| 387 return; | 391 return; |
| 388 | 392 |
| 389 base::AutoLock locker(lock_); | 393 MayAutoLock locker(lock_.get()); |
| 390 | 394 |
| 391 if (!is_local) { | 395 if (!is_local) { |
| 392 DCHECK(base::ContainsKey(endpoints_, id)); | 396 DCHECK(base::ContainsKey(endpoints_, id)); |
| 393 DCHECK(!IsMasterInterfaceId(id)); | 397 DCHECK(!IsMasterInterfaceId(id)); |
| 394 | 398 |
| 395 // We will receive a NotifyPeerEndpointClosed message from the other side. | 399 // We will receive a NotifyPeerEndpointClosed message from the other side. |
| 396 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); | 400 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); |
| 397 | 401 |
| 398 return; | 402 return; |
| 399 } | 403 } |
| (...skipping 12 matching lines...) Expand all Loading... |
| 412 | 416 |
| 413 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( | 417 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
| 414 const ScopedInterfaceEndpointHandle& handle, | 418 const ScopedInterfaceEndpointHandle& handle, |
| 415 InterfaceEndpointClient* client, | 419 InterfaceEndpointClient* client, |
| 416 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 420 scoped_refptr<base::SingleThreadTaskRunner> runner) { |
| 417 const InterfaceId id = handle.id(); | 421 const InterfaceId id = handle.id(); |
| 418 | 422 |
| 419 DCHECK(IsValidInterfaceId(id)); | 423 DCHECK(IsValidInterfaceId(id)); |
| 420 DCHECK(client); | 424 DCHECK(client); |
| 421 | 425 |
| 422 base::AutoLock locker(lock_); | 426 MayAutoLock locker(lock_.get()); |
| 423 DCHECK(base::ContainsKey(endpoints_, id)); | 427 DCHECK(base::ContainsKey(endpoints_, id)); |
| 424 | 428 |
| 425 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 429 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 426 endpoint->AttachClient(client, std::move(runner)); | 430 endpoint->AttachClient(client, std::move(runner)); |
| 427 | 431 |
| 428 if (endpoint->peer_closed()) | 432 if (endpoint->peer_closed()) |
| 429 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 433 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 430 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 434 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 431 | 435 |
| 432 return endpoint; | 436 return endpoint; |
| 433 } | 437 } |
| 434 | 438 |
| 435 void MultiplexRouter::DetachEndpointClient( | 439 void MultiplexRouter::DetachEndpointClient( |
| 436 const ScopedInterfaceEndpointHandle& handle) { | 440 const ScopedInterfaceEndpointHandle& handle) { |
| 437 const InterfaceId id = handle.id(); | 441 const InterfaceId id = handle.id(); |
| 438 | 442 |
| 439 DCHECK(IsValidInterfaceId(id)); | 443 DCHECK(IsValidInterfaceId(id)); |
| 440 | 444 |
| 441 base::AutoLock locker(lock_); | 445 MayAutoLock locker(lock_.get()); |
| 442 DCHECK(base::ContainsKey(endpoints_, id)); | 446 DCHECK(base::ContainsKey(endpoints_, id)); |
| 443 | 447 |
| 444 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 448 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 445 endpoint->DetachClient(); | 449 endpoint->DetachClient(); |
| 446 } | 450 } |
| 447 | 451 |
| 448 void MultiplexRouter::RaiseError() { | 452 void MultiplexRouter::RaiseError() { |
| 449 if (task_runner_->BelongsToCurrentThread()) { | 453 if (task_runner_->BelongsToCurrentThread()) { |
| 450 connector_.RaiseError(); | 454 connector_.RaiseError(); |
| 451 } else { | 455 } else { |
| 452 task_runner_->PostTask(FROM_HERE, | 456 task_runner_->PostTask(FROM_HERE, |
| 453 base::Bind(&MultiplexRouter::RaiseError, this)); | 457 base::Bind(&MultiplexRouter::RaiseError, this)); |
| 454 } | 458 } |
| 455 } | 459 } |
| 456 | 460 |
| 457 void MultiplexRouter::CloseMessagePipe() { | 461 void MultiplexRouter::CloseMessagePipe() { |
| 458 DCHECK(thread_checker_.CalledOnValidThread()); | 462 DCHECK(thread_checker_.CalledOnValidThread()); |
| 459 connector_.CloseMessagePipe(); | 463 connector_.CloseMessagePipe(); |
| 460 // CloseMessagePipe() above won't trigger connection error handler. | 464 // CloseMessagePipe() above won't trigger connection error handler. |
| 461 // Explicitly call OnPipeConnectionError() so that associated endpoints will | 465 // Explicitly call OnPipeConnectionError() so that associated endpoints will |
| 462 // get notified. | 466 // get notified. |
| 463 OnPipeConnectionError(); | 467 OnPipeConnectionError(); |
| 464 } | 468 } |
| 465 | 469 |
| 466 void MultiplexRouter::PauseIncomingMethodCallProcessing() { | 470 void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
| 467 DCHECK(thread_checker_.CalledOnValidThread()); | 471 DCHECK(thread_checker_.CalledOnValidThread()); |
| 468 connector_.PauseIncomingMethodCallProcessing(); | 472 connector_.PauseIncomingMethodCallProcessing(); |
| 469 | 473 |
| 470 base::AutoLock locker(lock_); | 474 MayAutoLock locker(lock_.get()); |
| 471 paused_ = true; | 475 paused_ = true; |
| 472 | 476 |
| 473 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) | 477 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) |
| 474 iter->second->ResetSyncMessageSignal(); | 478 iter->second->ResetSyncMessageSignal(); |
| 475 } | 479 } |
| 476 | 480 |
| 477 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { | 481 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
| 478 DCHECK(thread_checker_.CalledOnValidThread()); | 482 DCHECK(thread_checker_.CalledOnValidThread()); |
| 479 connector_.ResumeIncomingMethodCallProcessing(); | 483 connector_.ResumeIncomingMethodCallProcessing(); |
| 480 | 484 |
| 481 base::AutoLock locker(lock_); | 485 MayAutoLock locker(lock_.get()); |
| 482 paused_ = false; | 486 paused_ = false; |
| 483 | 487 |
| 484 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { | 488 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { |
| 485 auto sync_iter = sync_message_tasks_.find(iter->first); | 489 auto sync_iter = sync_message_tasks_.find(iter->first); |
| 486 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) | 490 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) |
| 487 iter->second->SignalSyncMessageEvent(); | 491 iter->second->SignalSyncMessageEvent(); |
| 488 } | 492 } |
| 489 | 493 |
| 490 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); | 494 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 491 } | 495 } |
| 492 | 496 |
| 493 bool MultiplexRouter::HasAssociatedEndpoints() const { | 497 bool MultiplexRouter::HasAssociatedEndpoints() const { |
| 494 DCHECK(thread_checker_.CalledOnValidThread()); | 498 DCHECK(thread_checker_.CalledOnValidThread()); |
| 495 base::AutoLock locker(lock_); | 499 MayAutoLock locker(lock_.get()); |
| 496 | 500 |
| 497 if (endpoints_.size() > 1) | 501 if (endpoints_.size() > 1) |
| 498 return true; | 502 return true; |
| 499 if (endpoints_.size() == 0) | 503 if (endpoints_.size() == 0) |
| 500 return false; | 504 return false; |
| 501 | 505 |
| 502 return !base::ContainsKey(endpoints_, kMasterInterfaceId); | 506 return !base::ContainsKey(endpoints_, kMasterInterfaceId); |
| 503 } | 507 } |
| 504 | 508 |
| 505 void MultiplexRouter::EnableTestingMode() { | 509 void MultiplexRouter::EnableTestingMode() { |
| 506 DCHECK(thread_checker_.CalledOnValidThread()); | 510 DCHECK(thread_checker_.CalledOnValidThread()); |
| 507 base::AutoLock locker(lock_); | 511 MayAutoLock locker(lock_.get()); |
| 508 | 512 |
| 509 testing_mode_ = true; | 513 testing_mode_ = true; |
| 510 connector_.set_enforce_errors_from_incoming_receiver(false); | 514 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 511 } | 515 } |
| 512 | 516 |
| 513 bool MultiplexRouter::Accept(Message* message) { | 517 bool MultiplexRouter::Accept(Message* message) { |
| 514 DCHECK(thread_checker_.CalledOnValidThread()); | 518 DCHECK(thread_checker_.CalledOnValidThread()); |
| 515 | 519 |
| 516 scoped_refptr<MultiplexRouter> protector(this); | 520 scoped_refptr<MultiplexRouter> protector(this); |
| 517 base::AutoLock locker(lock_); | 521 MayAutoLock locker(lock_.get()); |
| 518 | 522 |
| 519 DCHECK(!paused_); | 523 DCHECK(!paused_); |
| 520 | 524 |
| 521 ClientCallBehavior client_call_behavior = | 525 ClientCallBehavior client_call_behavior = |
| 522 connector_.during_sync_handle_watcher_callback() | 526 connector_.during_sync_handle_watcher_callback() |
| 523 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 527 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 524 : ALLOW_DIRECT_CLIENT_CALLS; | 528 : ALLOW_DIRECT_CLIENT_CALLS; |
| 525 | 529 |
| 526 bool processed = | 530 bool processed = |
| 527 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, | 531 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, |
| (...skipping 18 matching lines...) Expand all Loading... |
| 546 // tasks. | 550 // tasks. |
| 547 ProcessTasks(client_call_behavior, connector_.task_runner()); | 551 ProcessTasks(client_call_behavior, connector_.task_runner()); |
| 548 } | 552 } |
| 549 | 553 |
| 550 // Always return true. If we see errors during message processing, we will | 554 // Always return true. If we see errors during message processing, we will |
| 551 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 555 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| 552 return true; | 556 return true; |
| 553 } | 557 } |
| 554 | 558 |
| 555 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | 559 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
| 556 lock_.AssertAcquired(); | 560 AssertLockAcquired(); |
| 557 | 561 |
| 558 if (IsMasterInterfaceId(id)) | 562 if (IsMasterInterfaceId(id)) |
| 559 return false; | 563 return false; |
| 560 | 564 |
| 561 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); | 565 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| 562 | 566 |
| 563 // It is possible that this endpoint has been set as peer closed. That is | 567 // It is possible that this endpoint has been set as peer closed. That is |
| 564 // because when the message pipe is closed, all the endpoints are updated with | 568 // because when the message pipe is closed, all the endpoints are updated with |
| 565 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, | 569 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, |
| 566 // as long as there are refs keeping the router alive. If there is a | 570 // as long as there are refs keeping the router alive. If there is a |
| 567 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get | 571 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get |
| 568 // here and see that the endpoint has been marked as peer closed. | 572 // here and see that the endpoint has been marked as peer closed. |
| 569 if (!endpoint->peer_closed()) { | 573 if (!endpoint->peer_closed()) { |
| 570 if (endpoint->client()) | 574 if (endpoint->client()) |
| 571 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 575 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 572 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 576 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 573 } | 577 } |
| 574 | 578 |
| 575 // No need to trigger a ProcessTasks() because it is already on the stack. | 579 // No need to trigger a ProcessTasks() because it is already on the stack. |
| 576 | 580 |
| 577 return true; | 581 return true; |
| 578 } | 582 } |
| 579 | 583 |
| 580 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { | 584 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
| 581 lock_.AssertAcquired(); | 585 AssertLockAcquired(); |
| 582 | 586 |
| 583 if (IsMasterInterfaceId(id)) | 587 if (IsMasterInterfaceId(id)) |
| 584 return false; | 588 return false; |
| 585 | 589 |
| 586 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); | 590 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| 587 DCHECK(!endpoint->closed()); | 591 DCHECK(!endpoint->closed()); |
| 588 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 592 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 589 | 593 |
| 590 control_message_proxy_.NotifyPeerEndpointClosed(id); | 594 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 591 | 595 |
| 592 return true; | 596 return true; |
| 593 } | 597 } |
| 594 | 598 |
| 595 void MultiplexRouter::OnPipeConnectionError() { | 599 void MultiplexRouter::OnPipeConnectionError() { |
| 596 DCHECK(thread_checker_.CalledOnValidThread()); | 600 DCHECK(thread_checker_.CalledOnValidThread()); |
| 597 | 601 |
| 598 scoped_refptr<MultiplexRouter> protector(this); | 602 scoped_refptr<MultiplexRouter> protector(this); |
| 599 base::AutoLock locker(lock_); | 603 MayAutoLock locker(lock_.get()); |
| 600 | 604 |
| 601 encountered_error_ = true; | 605 encountered_error_ = true; |
| 602 | 606 |
| 603 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { | 607 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 604 InterfaceEndpoint* endpoint = iter->second.get(); | 608 InterfaceEndpoint* endpoint = iter->second.get(); |
| 605 // Increment the iterator before calling UpdateEndpointStateMayRemove() | 609 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 606 // because it may remove the corresponding value from the map. | 610 // because it may remove the corresponding value from the map. |
| 607 ++iter; | 611 ++iter; |
| 608 | 612 |
| 609 if (endpoint->client()) | 613 if (endpoint->client()) |
| 610 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 614 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 611 | 615 |
| 612 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 616 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 613 } | 617 } |
| 614 | 618 |
| 615 ProcessTasks(connector_.during_sync_handle_watcher_callback() | 619 ProcessTasks(connector_.during_sync_handle_watcher_callback() |
| 616 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 620 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 617 : ALLOW_DIRECT_CLIENT_CALLS, | 621 : ALLOW_DIRECT_CLIENT_CALLS, |
| 618 connector_.task_runner()); | 622 connector_.task_runner()); |
| 619 } | 623 } |
| 620 | 624 |
| 621 void MultiplexRouter::ProcessTasks( | 625 void MultiplexRouter::ProcessTasks( |
| 622 ClientCallBehavior client_call_behavior, | 626 ClientCallBehavior client_call_behavior, |
| 623 base::SingleThreadTaskRunner* current_task_runner) { | 627 base::SingleThreadTaskRunner* current_task_runner) { |
| 624 lock_.AssertAcquired(); | 628 AssertLockAcquired(); |
| 625 | 629 |
| 626 if (posted_to_process_tasks_) | 630 if (posted_to_process_tasks_) |
| 627 return; | 631 return; |
| 628 | 632 |
| 629 while (!tasks_.empty() && !paused_) { | 633 while (!tasks_.empty() && !paused_) { |
| 630 std::unique_ptr<Task> task(std::move(tasks_.front())); | 634 std::unique_ptr<Task> task(std::move(tasks_.front())); |
| 631 tasks_.pop_front(); | 635 tasks_.pop_front(); |
| 632 | 636 |
| 633 InterfaceId id = kInvalidInterfaceId; | 637 InterfaceId id = kInvalidInterfaceId; |
| 634 bool sync_message = task->IsMessageTask() && !task->message.IsNull() && | 638 bool sync_message = task->IsMessageTask() && !task->message.IsNull() && |
| (...skipping 23 matching lines...) Expand all Loading... |
| 658 if (sync_message) { | 662 if (sync_message) { |
| 659 auto iter = sync_message_tasks_.find(id); | 663 auto iter = sync_message_tasks_.find(id); |
| 660 if (iter != sync_message_tasks_.end() && iter->second.empty()) | 664 if (iter != sync_message_tasks_.end() && iter->second.empty()) |
| 661 sync_message_tasks_.erase(iter); | 665 sync_message_tasks_.erase(iter); |
| 662 } | 666 } |
| 663 } | 667 } |
| 664 } | 668 } |
| 665 } | 669 } |
| 666 | 670 |
| 667 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { | 671 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
| 668 lock_.AssertAcquired(); | 672 AssertLockAcquired(); |
| 669 | 673 |
| 670 auto iter = sync_message_tasks_.find(id); | 674 auto iter = sync_message_tasks_.find(id); |
| 671 if (iter == sync_message_tasks_.end()) | 675 if (iter == sync_message_tasks_.end()) |
| 672 return false; | 676 return false; |
| 673 | 677 |
| 674 if (paused_) | 678 if (paused_) |
| 675 return true; | 679 return true; |
| 676 | 680 |
| 677 MultiplexRouter::Task* task = iter->second.front(); | 681 MultiplexRouter::Task* task = iter->second.front(); |
| 678 iter->second.pop_front(); | 682 iter->second.pop_front(); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 697 return true; | 701 return true; |
| 698 } | 702 } |
| 699 | 703 |
| 700 bool MultiplexRouter::ProcessNotifyErrorTask( | 704 bool MultiplexRouter::ProcessNotifyErrorTask( |
| 701 Task* task, | 705 Task* task, |
| 702 ClientCallBehavior client_call_behavior, | 706 ClientCallBehavior client_call_behavior, |
| 703 base::SingleThreadTaskRunner* current_task_runner) { | 707 base::SingleThreadTaskRunner* current_task_runner) { |
| 704 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 708 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 705 DCHECK(!paused_); | 709 DCHECK(!paused_); |
| 706 | 710 |
| 707 lock_.AssertAcquired(); | 711 AssertLockAcquired(); |
| 708 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 712 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 709 if (!endpoint->client()) | 713 if (!endpoint->client()) |
| 710 return true; | 714 return true; |
| 711 | 715 |
| 712 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || | 716 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || |
| 713 endpoint->task_runner() != current_task_runner) { | 717 endpoint->task_runner() != current_task_runner) { |
| 714 MaybePostToProcessTasks(endpoint->task_runner()); | 718 MaybePostToProcessTasks(endpoint->task_runner()); |
| 715 return false; | 719 return false; |
| 716 } | 720 } |
| 717 | 721 |
| 718 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 722 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 719 | 723 |
| 720 InterfaceEndpointClient* client = endpoint->client(); | 724 InterfaceEndpointClient* client = endpoint->client(); |
| 721 { | 725 { |
| 722 // We must unlock before calling into |client| because it may call this | 726 // We must unlock before calling into |client| because it may call this |
| 723 // object within NotifyError(). Holding the lock will lead to deadlock. | 727 // object within NotifyError(). Holding the lock will lead to deadlock. |
| 724 // | 728 // |
| 725 // It is safe to call into |client| without the lock. Because |client| is | 729 // It is safe to call into |client| without the lock. Because |client| is |
| 726 // always accessed on the same thread, including DetachEndpointClient(). | 730 // always accessed on the same thread, including DetachEndpointClient(). |
| 727 base::AutoUnlock unlocker(lock_); | 731 MayAutoUnlock unlocker(lock_.get()); |
| 728 client->NotifyError(); | 732 client->NotifyError(); |
| 729 } | 733 } |
| 730 return true; | 734 return true; |
| 731 } | 735 } |
| 732 | 736 |
| 733 bool MultiplexRouter::ProcessIncomingMessage( | 737 bool MultiplexRouter::ProcessIncomingMessage( |
| 734 Message* message, | 738 Message* message, |
| 735 ClientCallBehavior client_call_behavior, | 739 ClientCallBehavior client_call_behavior, |
| 736 base::SingleThreadTaskRunner* current_task_runner) { | 740 base::SingleThreadTaskRunner* current_task_runner) { |
| 737 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 741 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 738 DCHECK(!paused_); | 742 DCHECK(!paused_); |
| 739 DCHECK(message); | 743 DCHECK(message); |
| 740 lock_.AssertAcquired(); | 744 AssertLockAcquired(); |
| 741 | 745 |
| 742 if (message->IsNull()) { | 746 if (message->IsNull()) { |
| 743 // This is a sync message and has been processed during sync handle | 747 // This is a sync message and has been processed during sync handle |
| 744 // watching. | 748 // watching. |
| 745 return true; | 749 return true; |
| 746 } | 750 } |
| 747 | 751 |
| 748 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 752 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 749 if (!control_message_handler_.Accept(message)) | 753 if (!control_message_handler_.Accept(message)) |
| 750 RaiseErrorInNonTestingMode(); | 754 RaiseErrorInNonTestingMode(); |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 804 | 808 |
| 805 InterfaceEndpointClient* client = endpoint->client(); | 809 InterfaceEndpointClient* client = endpoint->client(); |
| 806 bool result = false; | 810 bool result = false; |
| 807 { | 811 { |
| 808 // We must unlock before calling into |client| because it may call this | 812 // We must unlock before calling into |client| because it may call this |
| 809 // object within HandleIncomingMessage(). Holding the lock will lead to | 813 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 810 // deadlock. | 814 // deadlock. |
| 811 // | 815 // |
| 812 // It is safe to call into |client| without the lock. Because |client| is | 816 // It is safe to call into |client| without the lock. Because |client| is |
| 813 // always accessed on the same thread, including DetachEndpointClient(). | 817 // always accessed on the same thread, including DetachEndpointClient(). |
| 814 base::AutoUnlock unlocker(lock_); | 818 MayAutoUnlock unlocker(lock_.get()); |
| 815 result = client->HandleIncomingMessage(message); | 819 result = client->HandleIncomingMessage(message); |
| 816 } | 820 } |
| 817 if (!result) | 821 if (!result) |
| 818 RaiseErrorInNonTestingMode(); | 822 RaiseErrorInNonTestingMode(); |
| 819 | 823 |
| 820 return true; | 824 return true; |
| 821 } | 825 } |
| 822 | 826 |
| 823 void MultiplexRouter::MaybePostToProcessTasks( | 827 void MultiplexRouter::MaybePostToProcessTasks( |
| 824 base::SingleThreadTaskRunner* task_runner) { | 828 base::SingleThreadTaskRunner* task_runner) { |
| 825 lock_.AssertAcquired(); | 829 AssertLockAcquired(); |
| 826 if (posted_to_process_tasks_) | 830 if (posted_to_process_tasks_) |
| 827 return; | 831 return; |
| 828 | 832 |
| 829 posted_to_process_tasks_ = true; | 833 posted_to_process_tasks_ = true; |
| 830 posted_to_task_runner_ = task_runner; | 834 posted_to_task_runner_ = task_runner; |
| 831 task_runner->PostTask( | 835 task_runner->PostTask( |
| 832 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 836 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 833 } | 837 } |
| 834 | 838 |
| 835 void MultiplexRouter::LockAndCallProcessTasks() { | 839 void MultiplexRouter::LockAndCallProcessTasks() { |
| 836 // There is no need to hold a ref to this class in this case because this is | 840 // There is no need to hold a ref to this class in this case because this is |
| 837 // always called using base::Bind(), which holds a ref. | 841 // always called using base::Bind(), which holds a ref. |
| 838 base::AutoLock locker(lock_); | 842 MayAutoLock locker(lock_.get()); |
| 839 posted_to_process_tasks_ = false; | 843 posted_to_process_tasks_ = false; |
| 840 scoped_refptr<base::SingleThreadTaskRunner> runner( | 844 scoped_refptr<base::SingleThreadTaskRunner> runner( |
| 841 std::move(posted_to_task_runner_)); | 845 std::move(posted_to_task_runner_)); |
| 842 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); | 846 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); |
| 843 } | 847 } |
| 844 | 848 |
| 845 void MultiplexRouter::UpdateEndpointStateMayRemove( | 849 void MultiplexRouter::UpdateEndpointStateMayRemove( |
| 846 InterfaceEndpoint* endpoint, | 850 InterfaceEndpoint* endpoint, |
| 847 EndpointStateUpdateType type) { | 851 EndpointStateUpdateType type) { |
| 848 switch (type) { | 852 switch (type) { |
| 849 case ENDPOINT_CLOSED: | 853 case ENDPOINT_CLOSED: |
| 850 endpoint->set_closed(); | 854 endpoint->set_closed(); |
| 851 break; | 855 break; |
| 852 case PEER_ENDPOINT_CLOSED: | 856 case PEER_ENDPOINT_CLOSED: |
| 853 endpoint->set_peer_closed(); | 857 endpoint->set_peer_closed(); |
| 854 // If the interface endpoint is performing a sync watch, this makes sure | 858 // If the interface endpoint is performing a sync watch, this makes sure |
| 855 // it is notified and eventually exits the sync watch. | 859 // it is notified and eventually exits the sync watch. |
| 856 endpoint->SignalSyncMessageEvent(); | 860 endpoint->SignalSyncMessageEvent(); |
| 857 break; | 861 break; |
| 858 } | 862 } |
| 859 if (endpoint->closed() && endpoint->peer_closed()) | 863 if (endpoint->closed() && endpoint->peer_closed()) |
| 860 endpoints_.erase(endpoint->id()); | 864 endpoints_.erase(endpoint->id()); |
| 861 } | 865 } |
| 862 | 866 |
| 863 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 867 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| 864 lock_.AssertAcquired(); | 868 AssertLockAcquired(); |
| 865 if (!testing_mode_) | 869 if (!testing_mode_) |
| 866 RaiseError(); | 870 RaiseError(); |
| 867 } | 871 } |
| 868 | 872 |
| 869 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( | 873 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( |
| 870 InterfaceId id, | 874 InterfaceId id, |
| 871 bool* inserted) { | 875 bool* inserted) { |
| 872 lock_.AssertAcquired(); | 876 AssertLockAcquired(); |
| 873 // Either |inserted| is nullptr or it points to a boolean initialized as | 877 // Either |inserted| is nullptr or it points to a boolean initialized as |
| 874 // false. | 878 // false. |
| 875 DCHECK(!inserted || !*inserted); | 879 DCHECK(!inserted || !*inserted); |
| 876 | 880 |
| 877 auto iter = endpoints_.find(id); | 881 auto iter = endpoints_.find(id); |
| 878 InterfaceEndpoint* endpoint; | 882 InterfaceEndpoint* endpoint; |
| 879 if (iter == endpoints_.end()) { | 883 if (iter == endpoints_.end()) { |
| 880 endpoint = new InterfaceEndpoint(this, id); | 884 endpoint = new InterfaceEndpoint(this, id); |
| 881 endpoints_[id] = endpoint; | 885 endpoints_[id] = endpoint; |
| 882 if (inserted) | 886 if (inserted) |
| 883 *inserted = true; | 887 *inserted = true; |
| 884 } else { | 888 } else { |
| 885 endpoint = iter->second.get(); | 889 endpoint = iter->second.get(); |
| 886 } | 890 } |
| 887 | 891 |
| 888 return endpoint; | 892 return endpoint; |
| 889 } | 893 } |
| 890 | 894 |
| 895 void MultiplexRouter::AssertLockAcquired() { |
| 896 #if DCHECK_IS_ON() |
| 897 if (lock_) |
| 898 lock_->AssertAcquired(); |
| 899 #endif |
| 900 } |
| 901 |
| 891 } // namespace internal | 902 } // namespace internal |
| 892 } // namespace mojo | 903 } // namespace mojo |
| OLD | NEW |