Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(505)

Side by Side Diff: ipc/ipc_channel_proxy.cc

Issue 2301123004: Mojo Channel: Fix deferred proxy dispatch; support paused channels (Closed)
Patch Set: . Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_channel_proxy.h ('k') | ipc/ipc_message_pipe_reader.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_channel_proxy.h" 5 #include "ipc/ipc_channel_proxy.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 #ifdef IPC_MESSAGE_LOG_ENABLED 108 #ifdef IPC_MESSAGE_LOG_ENABLED
109 if (logger->Enabled()) 109 if (logger->Enabled())
110 logger->OnPostDispatchMessage(message, channel_id_); 110 logger->OnPostDispatchMessage(message, channel_id_);
111 #endif 111 #endif
112 return true; 112 return true;
113 } 113 }
114 return false; 114 return false;
115 } 115 }
116 116
117 // Called on the IPC::Channel thread 117 // Called on the IPC::Channel thread
118 void ChannelProxy::Context::UnpauseChannel(bool flush) {
119 DCHECK(channel_);
120 channel_->Unpause(flush);
121 }
122
123 // Called on the IPC::Channel thread
124 void ChannelProxy::Context::FlushChannel() {
125 DCHECK(channel_);
126 channel_->Flush();
127 }
128
129 // Called on the IPC::Channel thread
118 bool ChannelProxy::Context::OnMessageReceived(const Message& message) { 130 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
119 // First give a chance to the filters to process this message. 131 // First give a chance to the filters to process this message.
120 if (!TryFilters(message)) 132 if (!TryFilters(message))
121 OnMessageReceivedNoFilter(message); 133 OnMessageReceivedNoFilter(message);
122 return true; 134 return true;
123 } 135 }
124 136
125 // Called on the IPC::Channel thread 137 // Called on the IPC::Channel thread
126 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 138 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
127 listener_task_runner_->PostTask( 139 listener_task_runner_->PostTask(
(...skipping 24 matching lines...) Expand all
152 void ChannelProxy::Context::OnChannelError() { 164 void ChannelProxy::Context::OnChannelError() {
153 for (size_t i = 0; i < filters_.size(); ++i) 165 for (size_t i = 0; i < filters_.size(); ++i)
154 filters_[i]->OnChannelError(); 166 filters_[i]->OnChannelError();
155 167
156 // See above comment about using listener_task_runner_ here. 168 // See above comment about using listener_task_runner_ here.
157 listener_task_runner_->PostTask( 169 listener_task_runner_->PostTask(
158 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 170 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
159 } 171 }
160 172
161 // Called on the IPC::Channel thread 173 // Called on the IPC::Channel thread
162 void ChannelProxy::Context::OnChannelOpened() { 174 void ChannelProxy::Context::OnChannelOpened(bool pause) {
163 DCHECK(channel_ != NULL); 175 DCHECK(channel_ != NULL);
164 176
165 // Assume a reference to ourselves on behalf of this thread. This reference 177 // Assume a reference to ourselves on behalf of this thread. This reference
166 // will be released when we are closed. 178 // will be released when we are closed.
167 AddRef(); 179 AddRef();
168 180
169 if (!channel_->Connect()) { 181 bool success = pause ? channel_->ConnectPaused() : channel_->Connect();
182 if (!success) {
170 OnChannelError(); 183 OnChannelError();
171 return; 184 return;
172 } 185 }
173 186
174 for (size_t i = 0; i < filters_.size(); ++i) 187 for (size_t i = 0; i < filters_.size(); ++i)
175 filters_[i]->OnFilterAdded(channel_.get()); 188 filters_[i]->OnFilterAdded(channel_.get());
176 } 189 }
177 190
178 // Called on the IPC::Channel thread 191 // Called on the IPC::Channel thread
179 void ChannelProxy::Context::OnChannelClosed() { 192 void ChannelProxy::Context::OnChannelClosed() {
(...skipping 275 matching lines...) Expand 10 before | Expand all | Expand 10 after
455 } 468 }
456 469
457 ChannelProxy::~ChannelProxy() { 470 ChannelProxy::~ChannelProxy() {
458 DCHECK(CalledOnValidThread()); 471 DCHECK(CalledOnValidThread());
459 472
460 Close(); 473 Close();
461 } 474 }
462 475
463 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, 476 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
464 Channel::Mode mode, 477 Channel::Mode mode,
465 bool create_pipe_now) { 478 bool create_pipe_now,
479 bool create_paused) {
466 #if defined(OS_POSIX) 480 #if defined(OS_POSIX)
467 // When we are creating a server on POSIX, we need its file descriptor 481 // When we are creating a server on POSIX, we need its file descriptor
468 // to be created immediately so that it can be accessed and passed 482 // to be created immediately so that it can be accessed and passed
469 // to other processes. Forcing it to be created immediately avoids 483 // to other processes. Forcing it to be created immediately avoids
470 // race conditions that may otherwise arise. 484 // race conditions that may otherwise arise.
471 if (mode & Channel::MODE_SERVER_FLAG) { 485 if (mode & Channel::MODE_SERVER_FLAG) {
472 create_pipe_now = true; 486 create_pipe_now = true;
473 } 487 }
474 #endif // defined(OS_POSIX) 488 #endif // defined(OS_POSIX)
475 Init( 489 Init(
476 ChannelFactory::Create(channel_handle, mode, context_->ipc_task_runner()), 490 ChannelFactory::Create(channel_handle, mode, context_->ipc_task_runner()),
477 create_pipe_now); 491 create_pipe_now, create_paused);
478 } 492 }
479 493
480 void ChannelProxy::Init(std::unique_ptr<ChannelFactory> factory, 494 void ChannelProxy::Init(std::unique_ptr<ChannelFactory> factory,
481 bool create_pipe_now) { 495 bool create_pipe_now,
496 bool create_paused) {
482 DCHECK(CalledOnValidThread()); 497 DCHECK(CalledOnValidThread());
483 DCHECK(!did_init_); 498 DCHECK(!did_init_);
484 499
485 if (create_pipe_now) { 500 if (create_pipe_now) {
486 // Create the channel immediately. This effectively sets up the 501 // Create the channel immediately. This effectively sets up the
487 // low-level pipe so that the client can connect. Without creating 502 // low-level pipe so that the client can connect. Without creating
488 // the pipe immediately, it is possible for a listener to attempt 503 // the pipe immediately, it is possible for a listener to attempt
489 // to connect and get an error since the pipe doesn't exist yet. 504 // to connect and get an error since the pipe doesn't exist yet.
490 context_->CreateChannel(std::move(factory)); 505 context_->CreateChannel(std::move(factory));
491 } else { 506 } else {
492 context_->ipc_task_runner()->PostTask( 507 context_->ipc_task_runner()->PostTask(
493 FROM_HERE, base::Bind(&Context::CreateChannel, context_, 508 FROM_HERE, base::Bind(&Context::CreateChannel, context_,
494 base::Passed(&factory))); 509 base::Passed(&factory)));
495 } 510 }
496 511
497 // complete initialization on the background thread 512 // complete initialization on the background thread
498 context_->ipc_task_runner()->PostTask( 513 context_->ipc_task_runner()->PostTask(
499 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_)); 514 FROM_HERE,
515 base::Bind(&Context::OnChannelOpened, context_, create_paused));
500 516
501 did_init_ = true; 517 did_init_ = true;
502 OnChannelInit(); 518 OnChannelInit();
503 } 519 }
504 520
521 void ChannelProxy::Unpause(bool flush) {
522 context_->ipc_task_runner()->PostTask(
523 FROM_HERE, base::Bind(&Context::UnpauseChannel, context_, flush));
524 }
525
526 void ChannelProxy::Flush() {
527 context_->ipc_task_runner()->PostTask(
528 FROM_HERE, base::Bind(&Context::FlushChannel, context_));
529 }
530
505 void ChannelProxy::Close() { 531 void ChannelProxy::Close() {
506 DCHECK(CalledOnValidThread()); 532 DCHECK(CalledOnValidThread());
507 533
508 // Clear the backpointer to the listener so that any pending calls to 534 // Clear the backpointer to the listener so that any pending calls to
509 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 535 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
510 // possible that the channel could be closed while it is receiving messages! 536 // possible that the channel could be closed while it is receiving messages!
511 context_->Clear(); 537 context_->Clear();
512 538
513 if (context_->ipc_task_runner()) { 539 if (context_->ipc_task_runner()) {
514 context_->ipc_task_runner()->PostTask( 540 context_->ipc_task_runner()->PostTask(
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
614 return channel->TakeClientFileDescriptor(); 640 return channel->TakeClientFileDescriptor();
615 } 641 }
616 #endif 642 #endif
617 643
618 void ChannelProxy::OnChannelInit() { 644 void ChannelProxy::OnChannelInit() {
619 } 645 }
620 646
621 //----------------------------------------------------------------------------- 647 //-----------------------------------------------------------------------------
622 648
623 } // namespace IPC 649 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_channel_proxy.h ('k') | ipc/ipc_message_pipe_reader.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698