Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(356)

Side by Side Diff: ipc/ipc_channel_proxy.cc

Issue 10694014: Cleanup IPC::ChannelProxy to use SingleThreadTaskRunner (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « ipc/ipc_channel_proxy.h ('k') | ipc/ipc_sync_channel.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/bind.h" 5 #include "base/bind.h"
6 #include "base/compiler_specific.h" 6 #include "base/compiler_specific.h"
7 #include "base/debug/trace_event.h" 7 #include "base/debug/trace_event.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/memory/ref_counted.h" 9 #include "base/memory/ref_counted.h"
10 #include "base/memory/scoped_ptr.h" 10 #include "base/memory/scoped_ptr.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
11 #include "ipc/ipc_channel_proxy.h" 13 #include "ipc/ipc_channel_proxy.h"
12 #include "ipc/ipc_listener.h" 14 #include "ipc/ipc_listener.h"
13 #include "ipc/ipc_logging.h" 15 #include "ipc/ipc_logging.h"
14 #include "ipc/ipc_message_macros.h" 16 #include "ipc/ipc_message_macros.h"
15 #include "ipc/ipc_message_utils.h" 17 #include "ipc/ipc_message_utils.h"
16 18
17 namespace IPC { 19 namespace IPC {
18 20
19 //------------------------------------------------------------------------------ 21 //------------------------------------------------------------------------------
20 22
(...skipping 15 matching lines...) Expand all
36 38
37 void ChannelProxy::MessageFilter::OnDestruct() const { 39 void ChannelProxy::MessageFilter::OnDestruct() const {
38 delete this; 40 delete this;
39 } 41 }
40 42
41 ChannelProxy::MessageFilter::~MessageFilter() {} 43 ChannelProxy::MessageFilter::~MessageFilter() {}
42 44
43 //------------------------------------------------------------------------------ 45 //------------------------------------------------------------------------------
44 46
45 ChannelProxy::Context::Context(Listener* listener, 47 ChannelProxy::Context::Context(Listener* listener,
46 base::MessageLoopProxy* ipc_message_loop) 48 base::SingleThreadTaskRunner* ipc_task_runner)
47 : listener_message_loop_(base::MessageLoopProxy::current()), 49 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
48 listener_(listener), 50 listener_(listener),
49 ipc_message_loop_(ipc_message_loop), 51 ipc_task_runner_(ipc_task_runner),
50 channel_connected_called_(false), 52 channel_connected_called_(false),
51 peer_pid_(base::kNullProcessId) { 53 peer_pid_(base::kNullProcessId) {
54 DCHECK(ipc_task_runner_);
52 } 55 }
53 56
54 ChannelProxy::Context::~Context() { 57 ChannelProxy::Context::~Context() {
55 } 58 }
56 59
60 void ChannelProxy::Context::ClearIPCTaskRunner() {
61 ipc_task_runner_ = NULL;
62 }
63
57 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle, 64 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
58 const Channel::Mode& mode) { 65 const Channel::Mode& mode) {
59 DCHECK(channel_.get() == NULL); 66 DCHECK(channel_.get() == NULL);
60 channel_id_ = handle.name; 67 channel_id_ = handle.name;
61 channel_.reset(new Channel(handle, mode, this)); 68 channel_.reset(new Channel(handle, mode, this));
62 } 69 }
63 70
64 bool ChannelProxy::Context::TryFilters(const Message& message) { 71 bool ChannelProxy::Context::TryFilters(const Message& message) {
65 #ifdef IPC_MESSAGE_LOG_ENABLED 72 #ifdef IPC_MESSAGE_LOG_ENABLED
66 Logging* logger = Logging::GetInstance(); 73 Logging* logger = Logging::GetInstance();
(...skipping 20 matching lines...) Expand all
87 OnMessageReceivedNoFilter(message); 94 OnMessageReceivedNoFilter(message);
88 return true; 95 return true;
89 } 96 }
90 97
91 // Called on the IPC::Channel thread 98 // Called on the IPC::Channel thread
92 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { 99 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
93 // NOTE: This code relies on the listener's message loop not going away while 100 // NOTE: This code relies on the listener's message loop not going away while
94 // this thread is active. That should be a reasonable assumption, but it 101 // this thread is active. That should be a reasonable assumption, but it
95 // feels risky. We may want to invent some more indirect way of referring to 102 // feels risky. We may want to invent some more indirect way of referring to
96 // a MessageLoop if this becomes a problem. 103 // a MessageLoop if this becomes a problem.
97 listener_message_loop_->PostTask( 104 listener_task_runner_->PostTask(
98 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); 105 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
99 return true; 106 return true;
100 } 107 }
101 108
102 // Called on the IPC::Channel thread 109 // Called on the IPC::Channel thread
103 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { 110 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
104 // Add any pending filters. This avoids a race condition where someone 111 // Add any pending filters. This avoids a race condition where someone
105 // creates a ChannelProxy, calls AddFilter, and then right after starts the 112 // creates a ChannelProxy, calls AddFilter, and then right after starts the
106 // peer process. The IO thread could receive a message before the task to add 113 // peer process. The IO thread could receive a message before the task to add
107 // the filter is run on the IO thread. 114 // the filter is run on the IO thread.
108 OnAddFilter(); 115 OnAddFilter();
109 116
110 // We cache off the peer_pid so it can be safely accessed from both threads. 117 // We cache off the peer_pid so it can be safely accessed from both threads.
111 peer_pid_ = channel_->peer_pid(); 118 peer_pid_ = channel_->peer_pid();
112 for (size_t i = 0; i < filters_.size(); ++i) 119 for (size_t i = 0; i < filters_.size(); ++i)
113 filters_[i]->OnChannelConnected(peer_pid); 120 filters_[i]->OnChannelConnected(peer_pid);
114 121
115 // See above comment about using listener_message_loop_ here. 122 // See above comment about using listener_task_runner_ here.
116 listener_message_loop_->PostTask( 123 listener_task_runner_->PostTask(
117 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); 124 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
118 } 125 }
119 126
120 // Called on the IPC::Channel thread 127 // Called on the IPC::Channel thread
121 void ChannelProxy::Context::OnChannelError() { 128 void ChannelProxy::Context::OnChannelError() {
122 for (size_t i = 0; i < filters_.size(); ++i) 129 for (size_t i = 0; i < filters_.size(); ++i)
123 filters_[i]->OnChannelError(); 130 filters_[i]->OnChannelError();
124 131
125 // See above comment about using listener_message_loop_ here. 132 // See above comment about using listener_task_runner_ here.
126 listener_message_loop_->PostTask( 133 listener_task_runner_->PostTask(
127 FROM_HERE, base::Bind(&Context::OnDispatchError, this)); 134 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
128 } 135 }
129 136
130 // Called on the IPC::Channel thread 137 // Called on the IPC::Channel thread
131 void ChannelProxy::Context::OnChannelOpened() { 138 void ChannelProxy::Context::OnChannelOpened() {
132 DCHECK(channel_ != NULL); 139 DCHECK(channel_ != NULL);
133 140
134 // Assume a reference to ourselves on behalf of this thread. This reference 141 // Assume a reference to ourselves on behalf of this thread. This reference
135 // will be released when we are closed. 142 // will be released when we are closed.
136 AddRef(); 143 AddRef();
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
207 } 214 }
208 } 215 }
209 216
210 NOTREACHED() << "filter to be removed not found"; 217 NOTREACHED() << "filter to be removed not found";
211 } 218 }
212 219
213 // Called on the listener's thread 220 // Called on the listener's thread
214 void ChannelProxy::Context::AddFilter(MessageFilter* filter) { 221 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
215 base::AutoLock auto_lock(pending_filters_lock_); 222 base::AutoLock auto_lock(pending_filters_lock_);
216 pending_filters_.push_back(make_scoped_refptr(filter)); 223 pending_filters_.push_back(make_scoped_refptr(filter));
217 ipc_message_loop_->PostTask( 224 ipc_task_runner_->PostTask(
218 FROM_HERE, base::Bind(&Context::OnAddFilter, this)); 225 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
219 } 226 }
220 227
221 // Called on the listener's thread 228 // Called on the listener's thread
222 void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 229 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
223 #ifdef IPC_MESSAGE_LOG_ENABLED 230 #ifdef IPC_MESSAGE_LOG_ENABLED
224 Logging* logger = Logging::GetInstance(); 231 Logging* logger = Logging::GetInstance();
225 std::string name; 232 std::string name;
226 logger->GetMessageText(message.type(), &name, &message, NULL); 233 logger->GetMessageText(message.type(), &name, &message, NULL);
227 TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage", 234 TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage",
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
269 void ChannelProxy::Context::OnDispatchError() { 276 void ChannelProxy::Context::OnDispatchError() {
270 if (listener_) 277 if (listener_)
271 listener_->OnChannelError(); 278 listener_->OnChannelError();
272 } 279 }
273 280
274 //----------------------------------------------------------------------------- 281 //-----------------------------------------------------------------------------
275 282
276 ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle, 283 ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
277 Channel::Mode mode, 284 Channel::Mode mode,
278 Listener* listener, 285 Listener* listener,
279 base::MessageLoopProxy* ipc_thread) 286 base::SingleThreadTaskRunner* ipc_task_runner)
280 : context_(new Context(listener, ipc_thread)), 287 : context_(new Context(listener, ipc_task_runner)),
281 outgoing_message_filter_(NULL), 288 outgoing_message_filter_(NULL),
282 did_init_(false) { 289 did_init_(false) {
283 Init(channel_handle, mode, true); 290 Init(channel_handle, mode, true);
284 } 291 }
285 292
286 ChannelProxy::ChannelProxy(Context* context) 293 ChannelProxy::ChannelProxy(Context* context)
287 : context_(context), 294 : context_(context),
288 outgoing_message_filter_(NULL), 295 outgoing_message_filter_(NULL),
289 did_init_(false) { 296 did_init_(false) {
290 } 297 }
(...skipping 16 matching lines...) Expand all
307 } 314 }
308 #endif // defined(OS_POSIX) 315 #endif // defined(OS_POSIX)
309 316
310 if (create_pipe_now) { 317 if (create_pipe_now) {
311 // Create the channel immediately. This effectively sets up the 318 // Create the channel immediately. This effectively sets up the
312 // low-level pipe so that the client can connect. Without creating 319 // low-level pipe so that the client can connect. Without creating
313 // the pipe immediately, it is possible for a listener to attempt 320 // the pipe immediately, it is possible for a listener to attempt
314 // to connect and get an error since the pipe doesn't exist yet. 321 // to connect and get an error since the pipe doesn't exist yet.
315 context_->CreateChannel(channel_handle, mode); 322 context_->CreateChannel(channel_handle, mode);
316 } else { 323 } else {
317 context_->ipc_message_loop()->PostTask( 324 context_->ipc_task_runner()->PostTask(
318 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(), 325 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
319 channel_handle, mode)); 326 channel_handle, mode));
320 } 327 }
321 328
322 // complete initialization on the background thread 329 // complete initialization on the background thread
323 context_->ipc_message_loop()->PostTask( 330 context_->ipc_task_runner()->PostTask(
324 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get())); 331 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
325 332
326 did_init_ = true; 333 did_init_ = true;
327 } 334 }
328 335
329 void ChannelProxy::Close() { 336 void ChannelProxy::Close() {
330 // Clear the backpointer to the listener so that any pending calls to 337 // Clear the backpointer to the listener so that any pending calls to
331 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 338 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
332 // possible that the channel could be closed while it is receiving messages! 339 // possible that the channel could be closed while it is receiving messages!
333 context_->Clear(); 340 context_->Clear();
334 341
335 if (context_->ipc_message_loop()) { 342 if (context_->ipc_task_runner()) {
336 context_->ipc_message_loop()->PostTask( 343 context_->ipc_task_runner()->PostTask(
337 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get())); 344 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
338 } 345 }
339 } 346 }
340 347
341 bool ChannelProxy::Send(Message* message) { 348 bool ChannelProxy::Send(Message* message) {
342 DCHECK(did_init_); 349 DCHECK(did_init_);
343 if (outgoing_message_filter()) 350 if (outgoing_message_filter())
344 message = outgoing_message_filter()->Rewrite(message); 351 message = outgoing_message_filter()->Rewrite(message);
345 352
346 #ifdef IPC_MESSAGE_LOG_ENABLED 353 #ifdef IPC_MESSAGE_LOG_ENABLED
347 Logging::GetInstance()->OnSendMessage(message, context_->channel_id()); 354 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
348 #endif 355 #endif
349 356
350 context_->ipc_message_loop()->PostTask( 357 context_->ipc_task_runner()->PostTask(
351 FROM_HERE, 358 FROM_HERE,
352 base::Bind(&ChannelProxy::Context::OnSendMessage, 359 base::Bind(&ChannelProxy::Context::OnSendMessage,
353 context_, base::Passed(scoped_ptr<Message>(message)))); 360 context_, base::Passed(scoped_ptr<Message>(message))));
354 return true; 361 return true;
355 } 362 }
356 363
357 void ChannelProxy::AddFilter(MessageFilter* filter) { 364 void ChannelProxy::AddFilter(MessageFilter* filter) {
358 context_->AddFilter(filter); 365 context_->AddFilter(filter);
359 } 366 }
360 367
361 void ChannelProxy::RemoveFilter(MessageFilter* filter) { 368 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
362 context_->ipc_message_loop()->PostTask( 369 context_->ipc_task_runner()->PostTask(
363 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(), 370 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
364 make_scoped_refptr(filter))); 371 make_scoped_refptr(filter)));
365 } 372 }
366 373
367 void ChannelProxy::ClearIPCMessageLoop() { 374 void ChannelProxy::ClearIPCTaskRunner() {
368 context()->ClearIPCMessageLoop(); 375 context()->ClearIPCTaskRunner();
369 } 376 }
370 377
371 #if defined(OS_POSIX) && !defined(OS_NACL) 378 #if defined(OS_POSIX) && !defined(OS_NACL)
372 // See the TODO regarding lazy initialization of the channel in 379 // See the TODO regarding lazy initialization of the channel in
373 // ChannelProxy::Init(). 380 // ChannelProxy::Init().
374 int ChannelProxy::GetClientFileDescriptor() { 381 int ChannelProxy::GetClientFileDescriptor() {
375 Channel* channel = context_.get()->channel_.get(); 382 Channel* channel = context_.get()->channel_.get();
376 // Channel must have been created first. 383 // Channel must have been created first.
377 DCHECK(channel) << context_.get()->channel_id_; 384 DCHECK(channel) << context_.get()->channel_id_;
378 return channel->GetClientFileDescriptor(); 385 return channel->GetClientFileDescriptor();
(...skipping 10 matching lines...) Expand all
389 Channel* channel = context_.get()->channel_.get(); 396 Channel* channel = context_.get()->channel_.get();
390 // Channel must have been created first. 397 // Channel must have been created first.
391 DCHECK(channel) << context_.get()->channel_id_; 398 DCHECK(channel) << context_.get()->channel_id_;
392 return channel->GetClientEuid(client_euid); 399 return channel->GetClientEuid(client_euid);
393 } 400 }
394 #endif 401 #endif
395 402
396 //----------------------------------------------------------------------------- 403 //-----------------------------------------------------------------------------
397 404
398 } // namespace IPC 405 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_channel_proxy.h ('k') | ipc/ipc_sync_channel.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698