| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/macros.h" | 12 #include "base/macros.h" |
| 13 #include "base/message_loop/message_loop.h" | 13 #include "base/message_loop/message_loop.h" |
| 14 #include "base/single_thread_task_runner.h" |
| 14 #include "base/stl_util.h" | 15 #include "base/stl_util.h" |
| 15 #include "mojo/public/cpp/bindings/associated_group.h" | 16 #include "mojo/public/cpp/bindings/associated_group.h" |
| 16 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | 17 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
| 17 | 18 |
| 18 namespace mojo { | 19 namespace mojo { |
| 19 namespace internal { | 20 namespace internal { |
| 20 | 21 |
| 21 // InterfaceEndpoint stores the information of an interface endpoint registered | 22 // InterfaceEndpoint stores the information of an interface endpoint registered |
| 22 // with the router. Always accessed under the router's lock. | 23 // with the router. Always accessed under the router's lock. |
| 23 // No one other than the router's |endpoints_| and |tasks_| should hold refs to | 24 // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
| (...skipping 17 matching lines...) Expand all Loading... |
| 41 router_lock_->AssertAcquired(); | 42 router_lock_->AssertAcquired(); |
| 42 closed_ = true; | 43 closed_ = true; |
| 43 } | 44 } |
| 44 | 45 |
| 45 bool peer_closed() const { return peer_closed_; } | 46 bool peer_closed() const { return peer_closed_; } |
| 46 void set_peer_closed() { | 47 void set_peer_closed() { |
| 47 router_lock_->AssertAcquired(); | 48 router_lock_->AssertAcquired(); |
| 48 peer_closed_ = true; | 49 peer_closed_ = true; |
| 49 } | 50 } |
| 50 | 51 |
| 51 const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const { | 52 base::SingleThreadTaskRunner* task_runner() const { |
| 52 return task_runner_; | 53 return task_runner_.get(); |
| 53 } | 54 } |
| 54 void set_task_runner( | 55 void set_task_runner( |
| 55 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { | 56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
| 56 router_lock_->AssertAcquired(); | 57 router_lock_->AssertAcquired(); |
| 57 task_runner_ = std::move(task_runner); | 58 task_runner_ = std::move(task_runner); |
| 58 } | 59 } |
| 59 | 60 |
| 60 InterfaceEndpointClient* client() const { return client_; } | 61 InterfaceEndpointClient* client() const { return client_; } |
| 61 void set_client(InterfaceEndpointClient* client) { | 62 void set_client(InterfaceEndpointClient* client) { |
| 62 router_lock_->AssertAcquired(); | 63 router_lock_->AssertAcquired(); |
| (...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 121 ScopedMessagePipeHandle message_pipe) | 122 ScopedMessagePipeHandle message_pipe) |
| 122 : RefCountedDeleteOnMessageLoop( | 123 : RefCountedDeleteOnMessageLoop( |
| 123 base::MessageLoop::current()->task_runner()), | 124 base::MessageLoop::current()->task_runner()), |
| 124 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 125 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 125 header_validator_(this), | 126 header_validator_(this), |
| 126 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), | 127 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), |
| 127 encountered_error_(false), | 128 encountered_error_(false), |
| 128 control_message_handler_(this), | 129 control_message_handler_(this), |
| 129 control_message_proxy_(&connector_), | 130 control_message_proxy_(&connector_), |
| 130 next_interface_id_value_(1), | 131 next_interface_id_value_(1), |
| 132 posted_to_process_tasks_(false), |
| 131 testing_mode_(false) { | 133 testing_mode_(false) { |
| 132 connector_.set_incoming_receiver(&header_validator_); | 134 connector_.set_incoming_receiver(&header_validator_); |
| 133 connector_.set_connection_error_handler( | 135 connector_.set_connection_error_handler( |
| 134 [this]() { OnPipeConnectionError(); }); | 136 [this]() { OnPipeConnectionError(); }); |
| 135 } | 137 } |
| 136 | 138 |
| 137 MultiplexRouter::~MultiplexRouter() { | 139 MultiplexRouter::~MultiplexRouter() { |
| 138 base::AutoLock locker(lock_); | 140 base::AutoLock locker(lock_); |
| 139 | 141 |
| 140 tasks_.clear(); | 142 tasks_.clear(); |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 173 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); | 175 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); |
| 174 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); | 176 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); |
| 175 } | 177 } |
| 176 | 178 |
| 177 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( | 179 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
| 178 InterfaceId id) { | 180 InterfaceId id) { |
| 179 if (!IsValidInterfaceId(id)) | 181 if (!IsValidInterfaceId(id)) |
| 180 return ScopedInterfaceEndpointHandle(); | 182 return ScopedInterfaceEndpointHandle(); |
| 181 | 183 |
| 182 base::AutoLock locker(lock_); | 184 base::AutoLock locker(lock_); |
| 183 if (ContainsKey(endpoints_, id)) { | 185 bool inserted = false; |
| 186 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 187 if (inserted) { |
| 188 if (encountered_error_) |
| 189 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 190 } else { |
| 184 // If the endpoint already exist, it is because we have received a | 191 // If the endpoint already exist, it is because we have received a |
| 185 // notification that the peer endpoint has closed. | 192 // notification that the peer endpoint has closed. |
| 186 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
| 187 CHECK(!endpoint->closed()); | 193 CHECK(!endpoint->closed()); |
| 188 CHECK(endpoint->peer_closed()); | 194 CHECK(endpoint->peer_closed()); |
| 189 } else { | |
| 190 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); | |
| 191 endpoints_[id] = endpoint; | |
| 192 if (encountered_error_) | |
| 193 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | |
| 194 } | 195 } |
| 195 return ScopedInterfaceEndpointHandle(id, true, this); | 196 return ScopedInterfaceEndpointHandle(id, true, this); |
| 196 } | 197 } |
| 197 | 198 |
| 198 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { | 199 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
| 199 if (!IsValidInterfaceId(id)) | 200 if (!IsValidInterfaceId(id)) |
| 200 return; | 201 return; |
| 201 | 202 |
| 202 base::AutoLock locker(lock_); | 203 base::AutoLock locker(lock_); |
| 203 | 204 |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 259 DCHECK(endpoint->client()); | 260 DCHECK(endpoint->client()); |
| 260 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 261 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 261 DCHECK(!endpoint->closed()); | 262 DCHECK(!endpoint->closed()); |
| 262 | 263 |
| 263 endpoint->set_task_runner(nullptr); | 264 endpoint->set_task_runner(nullptr); |
| 264 endpoint->set_client(nullptr); | 265 endpoint->set_client(nullptr); |
| 265 } | 266 } |
| 266 | 267 |
| 267 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, | 268 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, |
| 268 Message* message) { | 269 Message* message) { |
| 269 const InterfaceId id = handle.id(); | 270 message->set_interface_id(handle.id()); |
| 270 | |
| 271 base::AutoLock locker(lock_); | |
| 272 if (!ContainsKey(endpoints_, id)) | |
| 273 return false; | |
| 274 | |
| 275 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
| 276 if (endpoint->peer_closed()) | |
| 277 return false; | |
| 278 | |
| 279 message->set_interface_id(id); | |
| 280 return connector_.Accept(message); | 271 return connector_.Accept(message); |
| 281 } | 272 } |
| 282 | 273 |
| 283 void MultiplexRouter::RaiseError() { | 274 void MultiplexRouter::RaiseError() { |
| 284 if (task_runner_->BelongsToCurrentThread()) { | 275 if (task_runner_->BelongsToCurrentThread()) { |
| 285 connector_.RaiseError(); | 276 connector_.RaiseError(); |
| 286 } else { | 277 } else { |
| 287 task_runner_->PostTask(FROM_HERE, | 278 task_runner_->PostTask(FROM_HERE, |
| 288 base::Bind(&MultiplexRouter::RaiseError, this)); | 279 base::Bind(&MultiplexRouter::RaiseError, this)); |
| 289 } | 280 } |
| (...skipping 28 matching lines...) Expand all Loading... |
| 318 | 309 |
| 319 testing_mode_ = true; | 310 testing_mode_ = true; |
| 320 connector_.set_enforce_errors_from_incoming_receiver(false); | 311 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 321 } | 312 } |
| 322 | 313 |
| 323 bool MultiplexRouter::Accept(Message* message) { | 314 bool MultiplexRouter::Accept(Message* message) { |
| 324 DCHECK(thread_checker_.CalledOnValidThread()); | 315 DCHECK(thread_checker_.CalledOnValidThread()); |
| 325 | 316 |
| 326 scoped_refptr<MultiplexRouter> protector(this); | 317 scoped_refptr<MultiplexRouter> protector(this); |
| 327 base::AutoLock locker(lock_); | 318 base::AutoLock locker(lock_); |
| 328 tasks_.push_back(Task::CreateIncomingMessageTask(message)); | 319 |
| 329 ProcessTasks(false); | 320 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); |
| 321 |
| 322 if (!processed) { |
| 323 // Either the task queue is not empty or we cannot process the message |
| 324 // directly. In both cases, there is no need to call ProcessTasks(). |
| 325 tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
| 326 } else if (!tasks_.empty()) { |
| 327 // Processing the message may result in new tasks (for error notification) |
| 328 // being added to the queue. In this case, we have to attempt to process the |
| 329 // tasks. |
| 330 ProcessTasks(false); |
| 331 } |
| 330 | 332 |
| 331 // Always return true. If we see errors during message processing, we will | 333 // Always return true. If we see errors during message processing, we will |
| 332 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 334 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| 333 return true; | 335 return true; |
| 334 } | 336 } |
| 335 | 337 |
| 336 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { | 338 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
| 337 lock_.AssertAcquired(); | 339 lock_.AssertAcquired(); |
| 338 | 340 |
| 339 if (IsMasterInterfaceId(id)) | 341 if (IsMasterInterfaceId(id)) |
| 340 return false; | 342 return false; |
| 341 | 343 |
| 342 if (!ContainsKey(endpoints_, id)) | 344 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| 343 endpoints_[id] = new InterfaceEndpoint(this, id); | |
| 344 | |
| 345 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
| 346 DCHECK(!endpoint->peer_closed()); | 345 DCHECK(!endpoint->peer_closed()); |
| 347 | 346 |
| 348 if (endpoint->client()) | 347 if (endpoint->client()) |
| 349 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 348 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 350 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 349 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 351 | 350 |
| 352 // No need to trigger a ProcessTasks() because it is already on the stack. | 351 // No need to trigger a ProcessTasks() because it is already on the stack. |
| 353 | 352 |
| 354 return true; | 353 return true; |
| 355 } | 354 } |
| 356 | 355 |
| 357 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { | 356 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
| 358 lock_.AssertAcquired(); | 357 lock_.AssertAcquired(); |
| 359 | 358 |
| 360 if (IsMasterInterfaceId(id)) | 359 if (IsMasterInterfaceId(id)) |
| 361 return false; | 360 return false; |
| 362 | 361 |
| 363 if (!ContainsKey(endpoints_, id)) | 362 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| 364 endpoints_[id] = new InterfaceEndpoint(this, id); | |
| 365 | |
| 366 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
| 367 DCHECK(!endpoint->closed()); | 363 DCHECK(!endpoint->closed()); |
| 368 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 364 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 369 | 365 |
| 370 control_message_proxy_.NotifyPeerEndpointClosed(id); | 366 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 371 | 367 |
| 372 return true; | 368 return true; |
| 373 } | 369 } |
| 374 | 370 |
| 375 void MultiplexRouter::OnPipeConnectionError() { | 371 void MultiplexRouter::OnPipeConnectionError() { |
| 376 DCHECK(thread_checker_.CalledOnValidThread()); | 372 DCHECK(thread_checker_.CalledOnValidThread()); |
| (...skipping 14 matching lines...) Expand all Loading... |
| 391 | 387 |
| 392 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 388 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 393 } | 389 } |
| 394 | 390 |
| 395 ProcessTasks(false); | 391 ProcessTasks(false); |
| 396 } | 392 } |
| 397 | 393 |
| 398 void MultiplexRouter::ProcessTasks(bool force_async) { | 394 void MultiplexRouter::ProcessTasks(bool force_async) { |
| 399 lock_.AssertAcquired(); | 395 lock_.AssertAcquired(); |
| 400 | 396 |
| 397 if (posted_to_process_tasks_) |
| 398 return; |
| 399 |
| 401 while (!tasks_.empty()) { | 400 while (!tasks_.empty()) { |
| 402 scoped_ptr<Task> task(std::move(tasks_.front())); | 401 scoped_ptr<Task> task(std::move(tasks_.front())); |
| 403 tasks_.pop_front(); | 402 tasks_.pop_front(); |
| 404 | 403 |
| 405 bool processed = task->IsNotifyErrorTask() | 404 bool processed = |
| 406 ? ProcessNotifyErrorTask(task.get(), force_async) | 405 task->IsNotifyErrorTask() |
| 407 : ProcessIncomingMessageTask(task.get(), force_async); | 406 ? ProcessNotifyErrorTask(task.get(), force_async) |
| 407 : ProcessIncomingMessage(task->message.get(), force_async); |
| 408 | 408 |
| 409 if (!processed) { | 409 if (!processed) { |
| 410 tasks_.push_front(std::move(task)); | 410 tasks_.push_front(std::move(task)); |
| 411 break; | 411 break; |
| 412 } | 412 } |
| 413 } | 413 } |
| 414 } | 414 } |
| 415 | 415 |
| 416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { | 416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { |
| 417 lock_.AssertAcquired(); | 417 lock_.AssertAcquired(); |
| 418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 419 if (!endpoint->client()) | 419 if (!endpoint->client()) |
| 420 return true; | 420 return true; |
| 421 | 421 |
| 422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
| 423 endpoint->task_runner()->PostTask( | 423 MaybePostToProcessTasks(endpoint->task_runner()); |
| 424 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | |
| 425 return false; | 424 return false; |
| 426 } | 425 } |
| 427 | 426 |
| 428 InterfaceEndpointClient* client = endpoint->client(); | 427 InterfaceEndpointClient* client = endpoint->client(); |
| 429 { | 428 { |
| 430 // We must unlock before calling into |client| because it may call this | 429 // We must unlock before calling into |client| because it may call this |
| 431 // object within NotifyError(). Holding the lock will lead to deadlock. | 430 // object within NotifyError(). Holding the lock will lead to deadlock. |
| 432 // | 431 // |
| 433 // It is safe to call into |client| without the lock. Because |client| is | 432 // It is safe to call into |client| without the lock. Because |client| is |
| 434 // always accessed on the same thread, including DetachEndpointClient(). | 433 // always accessed on the same thread, including DetachEndpointClient(). |
| 435 base::AutoUnlock unlocker(lock_); | 434 base::AutoUnlock unlocker(lock_); |
| 436 client->NotifyError(); | 435 client->NotifyError(); |
| 437 } | 436 } |
| 438 return true; | 437 return true; |
| 439 } | 438 } |
| 440 | 439 |
| 441 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { | 440 bool MultiplexRouter::ProcessIncomingMessage(Message* message, |
| 441 bool force_async) { |
| 442 lock_.AssertAcquired(); | 442 lock_.AssertAcquired(); |
| 443 Message* message = task->message.get(); | |
| 444 | |
| 445 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 443 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 446 if (!control_message_handler_.Accept(message)) | 444 if (!control_message_handler_.Accept(message)) |
| 447 RaiseErrorInNonTestingMode(); | 445 RaiseErrorInNonTestingMode(); |
| 448 return true; | 446 return true; |
| 449 } | 447 } |
| 450 | 448 |
| 451 InterfaceId id = message->interface_id(); | 449 InterfaceId id = message->interface_id(); |
| 452 DCHECK(IsValidInterfaceId(id)); | 450 DCHECK(IsValidInterfaceId(id)); |
| 453 | 451 |
| 454 if (!ContainsKey(endpoints_, id)) { | 452 bool inserted = false; |
| 453 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 454 if (inserted) { |
| 455 DCHECK(!IsMasterInterfaceId(id)); | 455 DCHECK(!IsMasterInterfaceId(id)); |
| 456 | 456 |
| 457 // Currently, it is legitimate to receive messages for an endpoint | 457 // Currently, it is legitimate to receive messages for an endpoint |
| 458 // that is not registered. For example, the endpoint is transferred in | 458 // that is not registered. For example, the endpoint is transferred in |
| 459 // a message that is discarded. Once we add support to specify all | 459 // a message that is discarded. Once we add support to specify all |
| 460 // enclosing endpoints in message header, we should be able to remove | 460 // enclosing endpoints in message header, we should be able to remove |
| 461 // this. | 461 // this. |
| 462 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); | |
| 463 endpoints_[id] = endpoint; | |
| 464 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | 462 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 465 | 463 |
| 466 control_message_proxy_.NotifyPeerEndpointClosed(id); | 464 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 467 return true; | 465 return true; |
| 468 } | 466 } |
| 469 | 467 |
| 470 InterfaceEndpoint* endpoint = endpoints_[id].get(); | |
| 471 if (endpoint->closed()) | 468 if (endpoint->closed()) |
| 472 return true; | 469 return true; |
| 473 | 470 |
| 474 if (!endpoint->client()) { | 471 if (!endpoint->client()) { |
| 475 // We need to wait until a client is attached in order to dispatch further | 472 // We need to wait until a client is attached in order to dispatch further |
| 476 // messages. | 473 // messages. |
| 477 return false; | 474 return false; |
| 478 } | 475 } |
| 479 | 476 |
| 480 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { | 477 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
| 481 endpoint->task_runner()->PostTask( | 478 MaybePostToProcessTasks(endpoint->task_runner()); |
| 482 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | |
| 483 return false; | 479 return false; |
| 484 } | 480 } |
| 485 | 481 |
| 486 InterfaceEndpointClient* client = endpoint->client(); | 482 InterfaceEndpointClient* client = endpoint->client(); |
| 487 scoped_ptr<Message> owned_message = std::move(task->message); | |
| 488 bool result = false; | 483 bool result = false; |
| 489 { | 484 { |
| 490 // We must unlock before calling into |client| because it may call this | 485 // We must unlock before calling into |client| because it may call this |
| 491 // object within HandleIncomingMessage(). Holding the lock will lead to | 486 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 492 // deadlock. | 487 // deadlock. |
| 493 // | 488 // |
| 494 // It is safe to call into |client| without the lock. Because |client| is | 489 // It is safe to call into |client| without the lock. Because |client| is |
| 495 // always accessed on the same thread, including DetachEndpointClient(). | 490 // always accessed on the same thread, including DetachEndpointClient(). |
| 496 base::AutoUnlock unlocker(lock_); | 491 base::AutoUnlock unlocker(lock_); |
| 497 result = client->HandleIncomingMessage(owned_message.get()); | 492 result = client->HandleIncomingMessage(message); |
| 498 } | 493 } |
| 499 if (!result) | 494 if (!result) |
| 500 RaiseErrorInNonTestingMode(); | 495 RaiseErrorInNonTestingMode(); |
| 501 | 496 |
| 502 return true; | 497 return true; |
| 503 } | 498 } |
| 504 | 499 |
| 500 void MultiplexRouter::MaybePostToProcessTasks( |
| 501 base::SingleThreadTaskRunner* task_runner) { |
| 502 lock_.AssertAcquired(); |
| 503 if (posted_to_process_tasks_) |
| 504 return; |
| 505 |
| 506 posted_to_process_tasks_ = true; |
| 507 task_runner->PostTask( |
| 508 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 509 } |
| 510 |
| 505 void MultiplexRouter::LockAndCallProcessTasks() { | 511 void MultiplexRouter::LockAndCallProcessTasks() { |
| 506 // There is no need to hold a ref to this class in this case because this is | 512 // There is no need to hold a ref to this class in this case because this is |
| 507 // always called using base::Bind(), which holds a ref. | 513 // always called using base::Bind(), which holds a ref. |
| 508 base::AutoLock locker(lock_); | 514 base::AutoLock locker(lock_); |
| 515 posted_to_process_tasks_ = false; |
| 509 ProcessTasks(false); | 516 ProcessTasks(false); |
| 510 } | 517 } |
| 511 | 518 |
| 512 void MultiplexRouter::UpdateEndpointStateMayRemove( | 519 void MultiplexRouter::UpdateEndpointStateMayRemove( |
| 513 InterfaceEndpoint* endpoint, | 520 InterfaceEndpoint* endpoint, |
| 514 EndpointStateUpdateType type) { | 521 EndpointStateUpdateType type) { |
| 515 switch (type) { | 522 switch (type) { |
| 516 case ENDPOINT_CLOSED: | 523 case ENDPOINT_CLOSED: |
| 517 endpoint->set_closed(); | 524 endpoint->set_closed(); |
| 518 break; | 525 break; |
| 519 case PEER_ENDPOINT_CLOSED: | 526 case PEER_ENDPOINT_CLOSED: |
| 520 endpoint->set_peer_closed(); | 527 endpoint->set_peer_closed(); |
| 521 break; | 528 break; |
| 522 } | 529 } |
| 523 if (endpoint->closed() && endpoint->peer_closed()) | 530 if (endpoint->closed() && endpoint->peer_closed()) |
| 524 endpoints_.erase(endpoint->id()); | 531 endpoints_.erase(endpoint->id()); |
| 525 } | 532 } |
| 526 | 533 |
| 527 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 534 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| 528 lock_.AssertAcquired(); | 535 lock_.AssertAcquired(); |
| 529 if (!testing_mode_) | 536 if (!testing_mode_) |
| 530 RaiseError(); | 537 RaiseError(); |
| 531 } | 538 } |
| 532 | 539 |
| 540 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( |
| 541 InterfaceId id, |
| 542 bool* inserted) { |
| 543 lock_.AssertAcquired(); |
| 544 // Either |inserted| is nullptr or it points to a boolean initialized as |
| 545 // false. |
| 546 DCHECK(!inserted || !*inserted); |
| 547 |
| 548 auto iter = endpoints_.find(id); |
| 549 InterfaceEndpoint* endpoint; |
| 550 if (iter == endpoints_.end()) { |
| 551 endpoint = new InterfaceEndpoint(this, id); |
| 552 endpoints_[id] = endpoint; |
| 553 if (inserted) |
| 554 *inserted = true; |
| 555 } else { |
| 556 endpoint = iter->second.get(); |
| 557 } |
| 558 |
| 559 return endpoint; |
| 560 } |
| 561 |
| 533 } // namespace internal | 562 } // namespace internal |
| 534 } // namespace mojo | 563 } // namespace mojo |
| OLD | NEW |