Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(195)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 1831513002: Mojo C++ bindings: some MultiplexRouter optimization. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
121 ScopedMessagePipeHandle message_pipe) 121 ScopedMessagePipeHandle message_pipe)
122 : RefCountedDeleteOnMessageLoop( 122 : RefCountedDeleteOnMessageLoop(
123 base::MessageLoop::current()->task_runner()), 123 base::MessageLoop::current()->task_runner()),
124 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 124 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
125 header_validator_(this), 125 header_validator_(this),
126 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), 126 connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND),
127 encountered_error_(false), 127 encountered_error_(false),
128 control_message_handler_(this), 128 control_message_handler_(this),
129 control_message_proxy_(&connector_), 129 control_message_proxy_(&connector_),
130 next_interface_id_value_(1), 130 next_interface_id_value_(1),
131 posted_to_process_tasks_(false),
131 testing_mode_(false) { 132 testing_mode_(false) {
132 connector_.set_incoming_receiver(&header_validator_); 133 connector_.set_incoming_receiver(&header_validator_);
133 connector_.set_connection_error_handler( 134 connector_.set_connection_error_handler(
134 [this]() { OnPipeConnectionError(); }); 135 [this]() { OnPipeConnectionError(); });
135 } 136 }
136 137
137 MultiplexRouter::~MultiplexRouter() { 138 MultiplexRouter::~MultiplexRouter() {
138 base::AutoLock locker(lock_); 139 base::AutoLock locker(lock_);
139 140
140 tasks_.clear(); 141 tasks_.clear();
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
173 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); 174 *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this);
174 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); 175 *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this);
175 } 176 }
176 177
177 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( 178 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
178 InterfaceId id) { 179 InterfaceId id) {
179 if (!IsValidInterfaceId(id)) 180 if (!IsValidInterfaceId(id))
180 return ScopedInterfaceEndpointHandle(); 181 return ScopedInterfaceEndpointHandle();
181 182
182 base::AutoLock locker(lock_); 183 base::AutoLock locker(lock_);
183 if (ContainsKey(endpoints_, id)) { 184 bool inserted = false;
185 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
186 if (inserted) {
187 if (encountered_error_)
188 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
189 } else {
184 // If the endpoint already exist, it is because we have received a 190 // If the endpoint already exist, it is because we have received a
185 // notification that the peer endpoint has closed. 191 // notification that the peer endpoint has closed.
186 InterfaceEndpoint* endpoint = endpoints_[id].get();
187 CHECK(!endpoint->closed()); 192 CHECK(!endpoint->closed());
188 CHECK(endpoint->peer_closed()); 193 CHECK(endpoint->peer_closed());
189 } else {
190 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
191 endpoints_[id] = endpoint;
192 if (encountered_error_)
193 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
194 } 194 }
195 return ScopedInterfaceEndpointHandle(id, true, this); 195 return ScopedInterfaceEndpointHandle(id, true, this);
196 } 196 }
197 197
198 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { 198 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
199 if (!IsValidInterfaceId(id)) 199 if (!IsValidInterfaceId(id))
200 return; 200 return;
201 201
202 base::AutoLock locker(lock_); 202 base::AutoLock locker(lock_);
203 203
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
259 DCHECK(endpoint->client()); 259 DCHECK(endpoint->client());
260 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 260 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
261 DCHECK(!endpoint->closed()); 261 DCHECK(!endpoint->closed());
262 262
263 endpoint->set_task_runner(nullptr); 263 endpoint->set_task_runner(nullptr);
264 endpoint->set_client(nullptr); 264 endpoint->set_client(nullptr);
265 } 265 }
266 266
267 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, 267 bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
268 Message* message) { 268 Message* message) {
269 const InterfaceId id = handle.id(); 269 message->set_interface_id(handle.id());
270
271 base::AutoLock locker(lock_);
272 if (!ContainsKey(endpoints_, id))
273 return false;
274
275 InterfaceEndpoint* endpoint = endpoints_[id].get();
276 if (endpoint->peer_closed())
277 return false;
278
279 message->set_interface_id(id);
280 return connector_.Accept(message); 270 return connector_.Accept(message);
281 } 271 }
282 272
283 void MultiplexRouter::RaiseError() { 273 void MultiplexRouter::RaiseError() {
284 if (task_runner_->BelongsToCurrentThread()) { 274 if (task_runner_->BelongsToCurrentThread()) {
285 connector_.RaiseError(); 275 connector_.RaiseError();
286 } else { 276 } else {
287 task_runner_->PostTask(FROM_HERE, 277 task_runner_->PostTask(FROM_HERE,
288 base::Bind(&MultiplexRouter::RaiseError, this)); 278 base::Bind(&MultiplexRouter::RaiseError, this));
289 } 279 }
(...skipping 28 matching lines...) Expand all
318 308
319 testing_mode_ = true; 309 testing_mode_ = true;
320 connector_.set_enforce_errors_from_incoming_receiver(false); 310 connector_.set_enforce_errors_from_incoming_receiver(false);
321 } 311 }
322 312
323 bool MultiplexRouter::Accept(Message* message) { 313 bool MultiplexRouter::Accept(Message* message) {
324 DCHECK(thread_checker_.CalledOnValidThread()); 314 DCHECK(thread_checker_.CalledOnValidThread());
325 315
326 scoped_refptr<MultiplexRouter> protector(this); 316 scoped_refptr<MultiplexRouter> protector(this);
327 base::AutoLock locker(lock_); 317 base::AutoLock locker(lock_);
328 tasks_.push_back(Task::CreateIncomingMessageTask(message)); 318
329 ProcessTasks(false); 319 bool processed = tasks_.empty() && ProcessIncomingMessage(message, false);
320
321 if (!processed) {
322 // Either the task queue is not empty or we cannot process the message
323 // directly. In both cases, there is no need to call ProcessTasks().
324 tasks_.push_back(Task::CreateIncomingMessageTask(message));
325 } else if (!tasks_.empty()) {
326 // Processing the message may result in new tasks (for error notification)
327 // being added to the queue. In this case, we have to attempt to process the
328 // tasks.
329 ProcessTasks(false);
330 }
330 331
331 // Always return true. If we see errors during message processing, we will 332 // Always return true. If we see errors during message processing, we will
332 // explicitly call Connector::RaiseError() to disconnect the message pipe. 333 // explicitly call Connector::RaiseError() to disconnect the message pipe.
333 return true; 334 return true;
334 } 335 }
335 336
336 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { 337 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
337 lock_.AssertAcquired(); 338 lock_.AssertAcquired();
338 339
339 if (IsMasterInterfaceId(id)) 340 if (IsMasterInterfaceId(id))
340 return false; 341 return false;
341 342
342 if (!ContainsKey(endpoints_, id)) 343 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
343 endpoints_[id] = new InterfaceEndpoint(this, id);
344
345 InterfaceEndpoint* endpoint = endpoints_[id].get();
346 DCHECK(!endpoint->peer_closed()); 344 DCHECK(!endpoint->peer_closed());
347 345
348 if (endpoint->client()) 346 if (endpoint->client())
349 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 347 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
350 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 348 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
351 349
352 // No need to trigger a ProcessTasks() because it is already on the stack. 350 // No need to trigger a ProcessTasks() because it is already on the stack.
353 351
354 return true; 352 return true;
355 } 353 }
356 354
357 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { 355 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
358 lock_.AssertAcquired(); 356 lock_.AssertAcquired();
359 357
360 if (IsMasterInterfaceId(id)) 358 if (IsMasterInterfaceId(id))
361 return false; 359 return false;
362 360
363 if (!ContainsKey(endpoints_, id)) 361 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
364 endpoints_[id] = new InterfaceEndpoint(this, id);
365
366 InterfaceEndpoint* endpoint = endpoints_[id].get();
367 DCHECK(!endpoint->closed()); 362 DCHECK(!endpoint->closed());
368 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 363 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
369 364
370 control_message_proxy_.NotifyPeerEndpointClosed(id); 365 control_message_proxy_.NotifyPeerEndpointClosed(id);
371 366
372 return true; 367 return true;
373 } 368 }
374 369
375 void MultiplexRouter::OnPipeConnectionError() { 370 void MultiplexRouter::OnPipeConnectionError() {
376 DCHECK(thread_checker_.CalledOnValidThread()); 371 DCHECK(thread_checker_.CalledOnValidThread());
(...skipping 14 matching lines...) Expand all
391 386
392 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 387 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
393 } 388 }
394 389
395 ProcessTasks(false); 390 ProcessTasks(false);
396 } 391 }
397 392
398 void MultiplexRouter::ProcessTasks(bool force_async) { 393 void MultiplexRouter::ProcessTasks(bool force_async) {
399 lock_.AssertAcquired(); 394 lock_.AssertAcquired();
400 395
396 if (posted_to_process_tasks_)
397 return;
398
401 while (!tasks_.empty()) { 399 while (!tasks_.empty()) {
402 scoped_ptr<Task> task(std::move(tasks_.front())); 400 scoped_ptr<Task> task(std::move(tasks_.front()));
403 tasks_.pop_front(); 401 tasks_.pop_front();
404 402
405 bool processed = task->IsNotifyErrorTask() 403 bool processed =
406 ? ProcessNotifyErrorTask(task.get(), force_async) 404 task->IsNotifyErrorTask()
407 : ProcessIncomingMessageTask(task.get(), force_async); 405 ? ProcessNotifyErrorTask(task.get(), force_async)
406 : ProcessIncomingMessage(task->message.get(), force_async);
408 407
409 if (!processed) { 408 if (!processed) {
410 tasks_.push_front(std::move(task)); 409 tasks_.push_front(std::move(task));
411 break; 410 break;
412 } 411 }
413 } 412 }
414 } 413 }
415 414
416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { 415 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
417 lock_.AssertAcquired(); 416 lock_.AssertAcquired();
418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 417 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
419 if (!endpoint->client()) 418 if (!endpoint->client())
420 return true; 419 return true;
421 420
422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { 421 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
423 endpoint->task_runner()->PostTask( 422 if (!posted_to_process_tasks_) {
Ken Rockot(use gerrit already) 2016/03/23 19:15:55 nit: Perhaps you could move this logic into anothe
yzshen1 2016/03/23 20:24:12 Done.
424 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 423 posted_to_process_tasks_ = true;
424 endpoint->task_runner()->PostTask(
425 FROM_HERE,
426 base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
427 }
425 return false; 428 return false;
426 } 429 }
427 430
428 InterfaceEndpointClient* client = endpoint->client(); 431 InterfaceEndpointClient* client = endpoint->client();
429 { 432 {
430 // We must unlock before calling into |client| because it may call this 433 // We must unlock before calling into |client| because it may call this
431 // object within NotifyError(). Holding the lock will lead to deadlock. 434 // object within NotifyError(). Holding the lock will lead to deadlock.
432 // 435 //
433 // It is safe to call into |client| without the lock. Because |client| is 436 // It is safe to call into |client| without the lock. Because |client| is
434 // always accessed on the same thread, including DetachEndpointClient(). 437 // always accessed on the same thread, including DetachEndpointClient().
435 base::AutoUnlock unlocker(lock_); 438 base::AutoUnlock unlocker(lock_);
436 client->NotifyError(); 439 client->NotifyError();
437 } 440 }
438 return true; 441 return true;
439 } 442 }
440 443
441 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { 444 bool MultiplexRouter::ProcessIncomingMessage(Message* message,
445 bool force_async) {
442 lock_.AssertAcquired(); 446 lock_.AssertAcquired();
443 Message* message = task->message.get();
444
445 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 447 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
446 if (!control_message_handler_.Accept(message)) 448 if (!control_message_handler_.Accept(message))
447 RaiseErrorInNonTestingMode(); 449 RaiseErrorInNonTestingMode();
448 return true; 450 return true;
449 } 451 }
450 452
451 InterfaceId id = message->interface_id(); 453 InterfaceId id = message->interface_id();
452 DCHECK(IsValidInterfaceId(id)); 454 DCHECK(IsValidInterfaceId(id));
453 455
454 if (!ContainsKey(endpoints_, id)) { 456 bool inserted = false;
457 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
458 if (inserted) {
455 DCHECK(!IsMasterInterfaceId(id)); 459 DCHECK(!IsMasterInterfaceId(id));
456 460
457 // Currently, it is legitimate to receive messages for an endpoint 461 // Currently, it is legitimate to receive messages for an endpoint
458 // that is not registered. For example, the endpoint is transferred in 462 // that is not registered. For example, the endpoint is transferred in
459 // a message that is discarded. Once we add support to specify all 463 // a message that is discarded. Once we add support to specify all
460 // enclosing endpoints in message header, we should be able to remove 464 // enclosing endpoints in message header, we should be able to remove
461 // this. 465 // this.
462 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
463 endpoints_[id] = endpoint;
464 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 466 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
465 467
466 control_message_proxy_.NotifyPeerEndpointClosed(id); 468 control_message_proxy_.NotifyPeerEndpointClosed(id);
467 return true; 469 return true;
468 } 470 }
469 471
470 InterfaceEndpoint* endpoint = endpoints_[id].get();
471 if (endpoint->closed()) 472 if (endpoint->closed())
472 return true; 473 return true;
473 474
474 if (!endpoint->client()) { 475 if (!endpoint->client()) {
475 // We need to wait until a client is attached in order to dispatch further 476 // We need to wait until a client is attached in order to dispatch further
476 // messages. 477 // messages.
477 return false; 478 return false;
478 } 479 }
479 480
480 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { 481 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
481 endpoint->task_runner()->PostTask( 482 if (!posted_to_process_tasks_) {
482 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 483 posted_to_process_tasks_ = true;
484 endpoint->task_runner()->PostTask(
485 FROM_HERE,
486 base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
487 }
483 return false; 488 return false;
484 } 489 }
485 490
486 InterfaceEndpointClient* client = endpoint->client(); 491 InterfaceEndpointClient* client = endpoint->client();
487 scoped_ptr<Message> owned_message = std::move(task->message);
488 bool result = false; 492 bool result = false;
489 { 493 {
490 // We must unlock before calling into |client| because it may call this 494 // We must unlock before calling into |client| because it may call this
491 // object within HandleIncomingMessage(). Holding the lock will lead to 495 // object within HandleIncomingMessage(). Holding the lock will lead to
492 // deadlock. 496 // deadlock.
493 // 497 //
494 // It is safe to call into |client| without the lock. Because |client| is 498 // It is safe to call into |client| without the lock. Because |client| is
495 // always accessed on the same thread, including DetachEndpointClient(). 499 // always accessed on the same thread, including DetachEndpointClient().
496 base::AutoUnlock unlocker(lock_); 500 base::AutoUnlock unlocker(lock_);
497 result = client->HandleIncomingMessage(owned_message.get()); 501 result = client->HandleIncomingMessage(message);
498 } 502 }
499 if (!result) 503 if (!result)
500 RaiseErrorInNonTestingMode(); 504 RaiseErrorInNonTestingMode();
501 505
502 return true; 506 return true;
503 } 507 }
504 508
505 void MultiplexRouter::LockAndCallProcessTasks() { 509 void MultiplexRouter::LockAndCallProcessTasks() {
506 // There is no need to hold a ref to this class in this case because this is 510 // There is no need to hold a ref to this class in this case because this is
507 // always called using base::Bind(), which holds a ref. 511 // always called using base::Bind(), which holds a ref.
508 base::AutoLock locker(lock_); 512 base::AutoLock locker(lock_);
513 posted_to_process_tasks_ = false;
509 ProcessTasks(false); 514 ProcessTasks(false);
510 } 515 }
511 516
512 void MultiplexRouter::UpdateEndpointStateMayRemove( 517 void MultiplexRouter::UpdateEndpointStateMayRemove(
513 InterfaceEndpoint* endpoint, 518 InterfaceEndpoint* endpoint,
514 EndpointStateUpdateType type) { 519 EndpointStateUpdateType type) {
515 switch (type) { 520 switch (type) {
516 case ENDPOINT_CLOSED: 521 case ENDPOINT_CLOSED:
517 endpoint->set_closed(); 522 endpoint->set_closed();
518 break; 523 break;
519 case PEER_ENDPOINT_CLOSED: 524 case PEER_ENDPOINT_CLOSED:
520 endpoint->set_peer_closed(); 525 endpoint->set_peer_closed();
521 break; 526 break;
522 } 527 }
523 if (endpoint->closed() && endpoint->peer_closed()) 528 if (endpoint->closed() && endpoint->peer_closed())
524 endpoints_.erase(endpoint->id()); 529 endpoints_.erase(endpoint->id());
525 } 530 }
526 531
527 void MultiplexRouter::RaiseErrorInNonTestingMode() { 532 void MultiplexRouter::RaiseErrorInNonTestingMode() {
528 lock_.AssertAcquired(); 533 lock_.AssertAcquired();
529 if (!testing_mode_) 534 if (!testing_mode_)
530 RaiseError(); 535 RaiseError();
531 } 536 }
532 537
538 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
539 InterfaceId id,
540 bool* inserted) {
541 lock_.AssertAcquired();
542 // Either |inserted| is nullptr or it points to a boolean initialized as
543 // false.
544 DCHECK(!inserted || !*inserted);
545
546 auto iter = endpoints_.find(id);
547 InterfaceEndpoint* endpoint;
548 if (iter == endpoints_.end()) {
549 endpoint = new InterfaceEndpoint(this, id);
550 endpoints_[id] = endpoint;
551 if (inserted)
552 *inserted = true;
553 } else {
554 endpoint = iter->second.get();
555 }
556
557 return endpoint;
558 }
559
533 } // namespace internal 560 } // namespace internal
534 } // namespace mojo 561 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698