OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 |
| 7 #include "base/bind.h" |
| 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/stl_util.h" |
| 10 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
| 11 |
| 12 namespace mojo { |
| 13 namespace internal { |
| 14 |
| 15 // InterfaceEndpoint stores the information of an interface endpoint registered |
| 16 // with the router. Always accessed under the router's lock. |
| 17 // No one other than the router's |endpoints_| and |tasks_| should hold refs to |
| 18 // this object. |
| 19 class MultiplexRouter::InterfaceEndpoint |
| 20 : public base::RefCounted<InterfaceEndpoint> { |
| 21 public: |
| 22 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
| 23 : router_lock_(&router->lock_), |
| 24 id_(id), |
| 25 closed_(false), |
| 26 peer_closed_(false), |
| 27 client_(nullptr) { |
| 28 router_lock_->AssertAcquired(); |
| 29 } |
| 30 |
| 31 InterfaceId id() const { return id_; } |
| 32 |
| 33 bool closed() const { return closed_; } |
| 34 void set_closed() { |
| 35 router_lock_->AssertAcquired(); |
| 36 closed_ = true; |
| 37 } |
| 38 |
| 39 bool peer_closed() const { return peer_closed_; } |
| 40 void set_peer_closed() { |
| 41 router_lock_->AssertAcquired(); |
| 42 peer_closed_ = true; |
| 43 } |
| 44 |
| 45 const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const { |
| 46 return task_runner_; |
| 47 } |
| 48 void set_task_runner( |
| 49 scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
| 50 router_lock_->AssertAcquired(); |
| 51 task_runner_ = task_runner.Pass(); |
| 52 } |
| 53 |
| 54 InterfaceEndpointClient* client() const { return client_; } |
| 55 void set_client(InterfaceEndpointClient* client) { |
| 56 router_lock_->AssertAcquired(); |
| 57 client_ = client; |
| 58 } |
| 59 |
| 60 private: |
| 61 friend class base::RefCounted<InterfaceEndpoint>; |
| 62 |
| 63 ~InterfaceEndpoint() { |
| 64 router_lock_->AssertAcquired(); |
| 65 |
| 66 DCHECK(!client_); |
| 67 DCHECK(closed_); |
| 68 DCHECK(peer_closed_); |
| 69 } |
| 70 |
| 71 base::Lock* const router_lock_; |
| 72 const InterfaceId id_; |
| 73 |
| 74 // Whether the endpoint has been closed. |
| 75 bool closed_; |
| 76 // Whether the peer endpoint has been closed. |
| 77 bool peer_closed_; |
| 78 |
| 79 // The task runner on which |client_| can be accessed. |
| 80 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 81 // Not owned. It is null if no client is attached to this endpoint. |
| 82 InterfaceEndpointClient* client_; |
| 83 |
| 84 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
| 85 }; |
| 86 |
| 87 struct MultiplexRouter::Task { |
| 88 public: |
| 89 // Doesn't take ownership of |message| but takes its contents. |
| 90 static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { |
| 91 Task* task = new Task(); |
| 92 task->message.reset(new Message); |
| 93 message->MoveTo(task->message.get()); |
| 94 return make_scoped_ptr(task); |
| 95 } |
| 96 static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
| 97 Task* task = new Task(); |
| 98 task->endpoint_to_notify = endpoint; |
| 99 return make_scoped_ptr(task); |
| 100 } |
| 101 |
| 102 ~Task() {} |
| 103 |
| 104 bool IsIncomingMessageTask() const { return !!message; } |
| 105 bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } |
| 106 |
| 107 scoped_ptr<Message> message; |
| 108 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
| 109 |
| 110 private: |
| 111 Task() {} |
| 112 }; |
| 113 |
| 114 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| 115 ScopedMessagePipeHandle message_pipe, |
| 116 const MojoAsyncWaiter* waiter) |
| 117 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() |
| 118 ->task_runner()), |
| 119 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 120 header_validator_(this), |
| 121 connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter), |
| 122 encountered_error_(false), |
| 123 control_message_handler_(this), |
| 124 control_message_proxy_(&connector_), |
| 125 next_interface_id_value_(1), |
| 126 testing_mode_(false) { |
| 127 connector_.set_incoming_receiver(&header_validator_); |
| 128 connector_.set_connection_error_handler( |
| 129 [this]() { OnPipeConnectionError(); }); |
| 130 } |
| 131 |
| 132 MultiplexRouter::~MultiplexRouter() { |
| 133 base::AutoLock locker(lock_); |
| 134 |
| 135 tasks_.clear(); |
| 136 |
| 137 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 138 InterfaceEndpoint* endpoint = iter->second.get(); |
| 139 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 140 // because it may remove the corresponding value from the map. |
| 141 ++iter; |
| 142 |
| 143 DCHECK(endpoint->closed()); |
| 144 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 145 } |
| 146 |
| 147 DCHECK(endpoints_.empty()); |
| 148 } |
| 149 |
| 150 void MultiplexRouter::CreateEndpointHandlePair( |
| 151 ScopedInterfaceEndpointHandle* local_endpoint, |
| 152 ScopedInterfaceEndpointHandle* remote_endpoint) { |
| 153 base::AutoLock locker(lock_); |
| 154 uint32_t id = 0; |
| 155 do { |
| 156 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) |
| 157 next_interface_id_value_ = 1; |
| 158 id = next_interface_id_value_++; |
| 159 if (set_interface_id_namespace_bit_) |
| 160 id |= kInterfaceIdNamespaceMask; |
| 161 } while (ContainsKey(endpoints_, id)); |
| 162 |
| 163 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| 164 endpoints_[id] = endpoint; |
| 165 if (encountered_error_) |
| 166 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 167 |
| 168 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); |
| 169 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); |
| 170 } |
| 171 |
| 172 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
| 173 InterfaceId id) { |
| 174 if (!IsValidInterfaceId(id)) |
| 175 return ScopedInterfaceEndpointHandle(); |
| 176 |
| 177 base::AutoLock locker(lock_); |
| 178 if (ContainsKey(endpoints_, id)) { |
| 179 // If the endpoint already exist, it is because we have received a |
| 180 // notification that the peer endpoint has closed. |
| 181 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 182 CHECK(!endpoint->closed()); |
| 183 CHECK(endpoint->peer_closed()); |
| 184 } else { |
| 185 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| 186 endpoints_[id] = endpoint; |
| 187 if (encountered_error_) |
| 188 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 189 } |
| 190 return ScopedInterfaceEndpointHandle(id, true, this); |
| 191 } |
| 192 |
| 193 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
| 194 if (!IsValidInterfaceId(id)) |
| 195 return; |
| 196 |
| 197 base::AutoLock locker(lock_); |
| 198 |
| 199 if (!is_local) { |
| 200 DCHECK(ContainsKey(endpoints_, id)); |
| 201 DCHECK(!IsMasterInterfaceId(id)); |
| 202 |
| 203 // We will receive a NotifyPeerEndpointClosed message from the other side. |
| 204 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); |
| 205 |
| 206 return; |
| 207 } |
| 208 |
| 209 DCHECK(ContainsKey(endpoints_, id)); |
| 210 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 211 DCHECK(!endpoint->client()); |
| 212 DCHECK(!endpoint->closed()); |
| 213 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 214 |
| 215 if (!IsMasterInterfaceId(id)) |
| 216 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 217 |
| 218 ProcessTasks(true); |
| 219 } |
| 220 |
| 221 void MultiplexRouter::AttachEndpointClient( |
| 222 const ScopedInterfaceEndpointHandle& handle, |
| 223 InterfaceEndpointClient* client) { |
| 224 const InterfaceId id = handle.id(); |
| 225 |
| 226 DCHECK(IsValidInterfaceId(id)); |
| 227 DCHECK(client); |
| 228 |
| 229 base::AutoLock locker(lock_); |
| 230 DCHECK(ContainsKey(endpoints_, id)); |
| 231 |
| 232 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 233 DCHECK(!endpoint->client()); |
| 234 DCHECK(!endpoint->closed()); |
| 235 |
| 236 endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); |
| 237 endpoint->set_client(client); |
| 238 |
| 239 if (endpoint->peer_closed()) |
| 240 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 241 ProcessTasks(true); |
| 242 } |
| 243 |
| 244 void MultiplexRouter::DetachEndpointClient( |
| 245 const ScopedInterfaceEndpointHandle& handle) { |
| 246 const InterfaceId id = handle.id(); |
| 247 |
| 248 DCHECK(IsValidInterfaceId(id)); |
| 249 |
| 250 base::AutoLock locker(lock_); |
| 251 DCHECK(ContainsKey(endpoints_, id)); |
| 252 |
| 253 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 254 DCHECK(endpoint->client()); |
| 255 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 256 DCHECK(!endpoint->closed()); |
| 257 |
| 258 endpoint->set_task_runner(nullptr); |
| 259 endpoint->set_client(nullptr); |
| 260 } |
| 261 |
| 262 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, |
| 263 Message* message) { |
| 264 const InterfaceId id = handle.id(); |
| 265 |
| 266 base::AutoLock locker(lock_); |
| 267 if (!ContainsKey(endpoints_, id)) |
| 268 return false; |
| 269 |
| 270 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 271 if (endpoint->peer_closed()) |
| 272 return false; |
| 273 |
| 274 message->set_interface_id(id); |
| 275 return connector_.Accept(message); |
| 276 } |
| 277 |
| 278 void MultiplexRouter::RaiseError() { |
| 279 if (task_runner_->BelongsToCurrentThread()) { |
| 280 connector_.RaiseError(); |
| 281 } else { |
| 282 task_runner_->PostTask(FROM_HERE, |
| 283 base::Bind(&MultiplexRouter::RaiseError, this)); |
| 284 } |
| 285 } |
| 286 |
| 287 ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() { |
| 288 DCHECK(thread_checker_.CalledOnValidThread()); |
| 289 { |
| 290 base::AutoLock locker(lock_); |
| 291 DCHECK(endpoints_.empty() || (endpoints_.size() == 1 && |
| 292 ContainsKey(endpoints_, kMasterInterfaceId))); |
| 293 } |
| 294 return connector_.PassMessagePipe(); |
| 295 } |
| 296 |
| 297 void MultiplexRouter::EnableTestingMode() { |
| 298 DCHECK(thread_checker_.CalledOnValidThread()); |
| 299 base::AutoLock locker(lock_); |
| 300 |
| 301 testing_mode_ = true; |
| 302 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 303 } |
| 304 |
| 305 bool MultiplexRouter::Accept(Message* message) { |
| 306 DCHECK(thread_checker_.CalledOnValidThread()); |
| 307 |
| 308 scoped_refptr<MultiplexRouter> protector(this); |
| 309 base::AutoLock locker(lock_); |
| 310 tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
| 311 ProcessTasks(false); |
| 312 |
| 313 // Always return true. If we see errors during message processing, we will |
| 314 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| 315 return true; |
| 316 } |
| 317 |
| 318 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
| 319 lock_.AssertAcquired(); |
| 320 |
| 321 if (IsMasterInterfaceId(id)) |
| 322 return false; |
| 323 |
| 324 if (!ContainsKey(endpoints_, id)) |
| 325 endpoints_[id] = new InterfaceEndpoint(this, id); |
| 326 |
| 327 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 328 DCHECK(!endpoint->peer_closed()); |
| 329 |
| 330 if (endpoint->client()) |
| 331 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 332 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 333 |
| 334 // No need to trigger a ProcessTasks() because it is already on the stack. |
| 335 |
| 336 return true; |
| 337 } |
| 338 |
| 339 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
| 340 lock_.AssertAcquired(); |
| 341 |
| 342 if (IsMasterInterfaceId(id)) |
| 343 return false; |
| 344 |
| 345 if (!ContainsKey(endpoints_, id)) |
| 346 endpoints_[id] = new InterfaceEndpoint(this, id); |
| 347 |
| 348 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 349 DCHECK(!endpoint->closed()); |
| 350 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 351 |
| 352 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 353 |
| 354 return true; |
| 355 } |
| 356 |
| 357 void MultiplexRouter::OnPipeConnectionError() { |
| 358 DCHECK(thread_checker_.CalledOnValidThread()); |
| 359 |
| 360 scoped_refptr<MultiplexRouter> protector(this); |
| 361 base::AutoLock locker(lock_); |
| 362 |
| 363 encountered_error_ = true; |
| 364 |
| 365 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
| 366 InterfaceEndpoint* endpoint = iter->second.get(); |
| 367 // Increment the iterator before calling UpdateEndpointStateMayRemove() |
| 368 // because it may remove the corresponding value from the map. |
| 369 ++iter; |
| 370 |
| 371 if (endpoint->client()) |
| 372 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 373 |
| 374 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 375 } |
| 376 |
| 377 ProcessTasks(false); |
| 378 } |
| 379 |
| 380 void MultiplexRouter::ProcessTasks(bool force_async) { |
| 381 lock_.AssertAcquired(); |
| 382 |
| 383 while (!tasks_.empty()) { |
| 384 scoped_ptr<Task> task(tasks_.front().Pass()); |
| 385 tasks_.pop_front(); |
| 386 |
| 387 bool processed = task->IsNotifyErrorTask() |
| 388 ? ProcessNotifyErrorTask(task.get(), &force_async) |
| 389 : ProcessIncomingMessageTask(task.get(), &force_async); |
| 390 |
| 391 if (!processed) { |
| 392 tasks_.push_front(task.Pass()); |
| 393 break; |
| 394 } |
| 395 } |
| 396 } |
| 397 |
| 398 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { |
| 399 lock_.AssertAcquired(); |
| 400 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 401 if (!endpoint->client()) |
| 402 return true; |
| 403 |
| 404 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
| 405 endpoint->task_runner()->PostTask( |
| 406 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 407 return false; |
| 408 } |
| 409 |
| 410 *force_async = true; |
| 411 InterfaceEndpointClient* client = endpoint->client(); |
| 412 { |
| 413 // We must unlock before calling into |client| because it may call this |
| 414 // object within NotifyError(). Holding the lock will lead to deadlock. |
| 415 // |
| 416 // It is safe to call into |client| without the lock. Because |client| is |
| 417 // always accessed on the same thread, including DetachEndpointClient(). |
| 418 base::AutoUnlock unlocker(lock_); |
| 419 client->NotifyError(); |
| 420 } |
| 421 return true; |
| 422 } |
| 423 |
| 424 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, |
| 425 bool* force_async) { |
| 426 lock_.AssertAcquired(); |
| 427 Message* message = task->message.get(); |
| 428 |
| 429 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 430 if (!control_message_handler_.Accept(message)) |
| 431 RaiseErrorInNonTestingMode(); |
| 432 return true; |
| 433 } |
| 434 |
| 435 InterfaceId id = message->interface_id(); |
| 436 DCHECK(IsValidInterfaceId(id)); |
| 437 |
| 438 if (!ContainsKey(endpoints_, id)) { |
| 439 DCHECK(!IsMasterInterfaceId(id)); |
| 440 |
| 441 // Currently, it is legitimate to receive messages for an endpoint |
| 442 // that is not registered. For example, the endpoint is transferred in |
| 443 // a message that is discarded. Once we add support to specify all |
| 444 // enclosing endpoints in message header, we should be able to remove |
| 445 // this. |
| 446 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| 447 endpoints_[id] = endpoint; |
| 448 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 449 |
| 450 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 451 return true; |
| 452 } |
| 453 |
| 454 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 455 if (endpoint->closed()) |
| 456 return true; |
| 457 |
| 458 if (!endpoint->client()) { |
| 459 // We need to wait until a client is attached in order to dispatch further |
| 460 // messages. |
| 461 return false; |
| 462 } |
| 463 |
| 464 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
| 465 endpoint->task_runner()->PostTask( |
| 466 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 467 return false; |
| 468 } |
| 469 |
| 470 *force_async = true; |
| 471 InterfaceEndpointClient* client = endpoint->client(); |
| 472 scoped_ptr<Message> owned_message = task->message.Pass(); |
| 473 bool result = false; |
| 474 { |
| 475 // We must unlock before calling into |client| because it may call this |
| 476 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 477 // deadlock. |
| 478 // |
| 479 // It is safe to call into |client| without the lock. Because |client| is |
| 480 // always accessed on the same thread, including DetachEndpointClient(). |
| 481 base::AutoUnlock unlocker(lock_); |
| 482 result = client->HandleIncomingMessage(owned_message.get()); |
| 483 } |
| 484 if (!result) |
| 485 RaiseErrorInNonTestingMode(); |
| 486 |
| 487 return true; |
| 488 } |
| 489 |
| 490 void MultiplexRouter::LockAndCallProcessTasks() { |
| 491 // There is no need to hold a ref to this class in this case because this is |
| 492 // always called using base::Bind(), which holds a ref. |
| 493 base::AutoLock locker(lock_); |
| 494 ProcessTasks(false); |
| 495 } |
| 496 |
| 497 void MultiplexRouter::UpdateEndpointStateMayRemove( |
| 498 InterfaceEndpoint* endpoint, |
| 499 EndpointStateUpdateType type) { |
| 500 switch (type) { |
| 501 case ENDPOINT_CLOSED: |
| 502 endpoint->set_closed(); |
| 503 break; |
| 504 case PEER_ENDPOINT_CLOSED: |
| 505 endpoint->set_peer_closed(); |
| 506 break; |
| 507 } |
| 508 if (endpoint->closed() && endpoint->peer_closed()) |
| 509 endpoints_.erase(endpoint->id()); |
| 510 } |
| 511 |
| 512 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| 513 lock_.AssertAcquired(); |
| 514 if (!testing_mode_) |
| 515 RaiseError(); |
| 516 } |
| 517 |
| 518 } // namespace internal |
| 519 } // namespace mojo |
OLD | NEW |