Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1257)

Side by Side Diff: ipc/ipc_channel_proxy.cc

Issue 5513001: Add a base class for objects that want to filter messages on the UI thread. ... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Fix possible race condition Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « ipc/ipc_channel_proxy.h ('k') | ipc/ipc_sync_channel.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/message_loop.h" 5 #include "base/message_loop.h"
6 #include "base/ref_counted.h" 6 #include "base/ref_counted.h"
7 #include "base/scoped_ptr.h" 7 #include "base/scoped_ptr.h"
8 #include "base/thread.h" 8 #include "base/thread.h"
9 #include "ipc/ipc_channel_proxy.h" 9 #include "ipc/ipc_channel_proxy.h"
10 #include "ipc/ipc_logging.h" 10 #include "ipc/ipc_logging.h"
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
54 return false; 54 return false;
55 } 55 }
56 56
57 void ChannelProxy::MessageFilter::OnDestruct() const { 57 void ChannelProxy::MessageFilter::OnDestruct() const {
58 delete this; 58 delete this;
59 } 59 }
60 60
61 //------------------------------------------------------------------------------ 61 //------------------------------------------------------------------------------
62 62
63 ChannelProxy::Context::Context(Channel::Listener* listener, 63 ChannelProxy::Context::Context(Channel::Listener* listener,
64 MessageFilter* filter,
65 MessageLoop* ipc_message_loop) 64 MessageLoop* ipc_message_loop)
66 : listener_message_loop_(MessageLoop::current()), 65 : listener_message_loop_(MessageLoop::current()),
67 listener_(listener), 66 listener_(listener),
68 ipc_message_loop_(ipc_message_loop), 67 ipc_message_loop_(ipc_message_loop),
69 channel_(NULL), 68 channel_(NULL),
70 peer_pid_(0), 69 peer_pid_(0),
71 channel_connected_called_(false) { 70 channel_connected_called_(false) {
72 if (filter)
73 filters_.push_back(make_scoped_refptr(filter));
74 } 71 }
75 72
76 void ChannelProxy::Context::CreateChannel(const std::string& id, 73 void ChannelProxy::Context::CreateChannel(const std::string& id,
77 const Channel::Mode& mode) { 74 const Channel::Mode& mode) {
78 DCHECK(channel_ == NULL); 75 DCHECK(channel_ == NULL);
79 channel_id_ = id; 76 channel_id_ = id;
80 channel_ = new Channel(id, mode, this); 77 channel_ = new Channel(id, mode, this);
81 } 78 }
82 79
83 bool ChannelProxy::Context::TryFilters(const Message& message) { 80 bool ChannelProxy::Context::TryFilters(const Message& message) {
(...skipping 27 matching lines...) Expand all
111 // NOTE: This code relies on the listener's message loop not going away while 108 // NOTE: This code relies on the listener's message loop not going away while
112 // this thread is active. That should be a reasonable assumption, but it 109 // this thread is active. That should be a reasonable assumption, but it
113 // feels risky. We may want to invent some more indirect way of referring to 110 // feels risky. We may want to invent some more indirect way of referring to
114 // a MessageLoop if this becomes a problem. 111 // a MessageLoop if this becomes a problem.
115 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( 112 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
116 this, &Context::OnDispatchMessage, message)); 113 this, &Context::OnDispatchMessage, message));
117 } 114 }
118 115
119 // Called on the IPC::Channel thread 116 // Called on the IPC::Channel thread
120 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { 117 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
118 // Add any pending filters. This avoids a race condition where someone
119 // creates a ChannelProxy, calls AddFilter, and then right after starts the
120 // peer process. The IO thread could receive a message before the task to add
121 // the filter is run on the IO thread.
122 OnAddFilter();
123
121 peer_pid_ = peer_pid; 124 peer_pid_ = peer_pid;
122 for (size_t i = 0; i < filters_.size(); ++i) 125 for (size_t i = 0; i < filters_.size(); ++i)
123 filters_[i]->OnChannelConnected(peer_pid); 126 filters_[i]->OnChannelConnected(peer_pid);
124 127
125 // See above comment about using listener_message_loop_ here. 128 // See above comment about using listener_message_loop_ here.
126 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( 129 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
127 this, &Context::OnDispatchConnected)); 130 this, &Context::OnDispatchConnected));
128 } 131 }
129 132
130 // Called on the IPC::Channel thread 133 // Called on the IPC::Channel thread
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
182 if (!channel_) { 185 if (!channel_) {
183 delete message; 186 delete message;
184 OnChannelClosed(); 187 OnChannelClosed();
185 return; 188 return;
186 } 189 }
187 if (!channel_->Send(message)) 190 if (!channel_->Send(message))
188 OnChannelError(); 191 OnChannelError();
189 } 192 }
190 193
191 // Called on the IPC::Channel thread 194 // Called on the IPC::Channel thread
192 void ChannelProxy::Context::OnAddFilter(MessageFilter* filter) { 195 void ChannelProxy::Context::OnAddFilter() {
193 filters_.push_back(make_scoped_refptr(filter)); 196 std::vector<scoped_refptr<MessageFilter> > filters;
197 {
198 AutoLock auto_lock(pending_filters_lock_);
199 filters.swap(pending_filters_);
200 }
194 201
195 // If the channel has already been created, then we need to send this message 202 for (size_t i = 0; i < filters.size(); ++i) {
196 // so that the filter gets access to the Channel. 203 filters_.push_back(filters[i]);
197 if (channel_) 204
198 filter->OnFilterAdded(channel_); 205 // If the channel has already been created, then we need to send this
206 // message so that the filter gets access to the Channel.
207 if (channel_)
208 filters[i]->OnFilterAdded(channel_);
209 // Ditto for the peer process id.
210 if (peer_pid_)
211 filters[i]->OnChannelConnected(peer_pid_);
212 }
199 } 213 }
200 214
201 // Called on the IPC::Channel thread 215 // Called on the IPC::Channel thread
202 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { 216 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
203 for (size_t i = 0; i < filters_.size(); ++i) { 217 for (size_t i = 0; i < filters_.size(); ++i) {
204 if (filters_[i].get() == filter) { 218 if (filters_[i].get() == filter) {
205 filter->OnFilterRemoved(); 219 filter->OnFilterRemoved();
206 filters_.erase(filters_.begin() + i); 220 filters_.erase(filters_.begin() + i);
207 return; 221 return;
208 } 222 }
209 } 223 }
210 224
211 NOTREACHED() << "filter to be removed not found"; 225 NOTREACHED() << "filter to be removed not found";
212 } 226 }
213 227
214 // Called on the listener's thread 228 // Called on the listener's thread
229 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
230 AutoLock auto_lock(pending_filters_lock_);
231 pending_filters_.push_back(make_scoped_refptr(filter));
232 ipc_message_loop_->PostTask(
233 FROM_HERE,
234 NewRunnableMethod(this, &Context::OnAddFilter));
235 }
236
237 // Called on the listener's thread
215 void ChannelProxy::Context::OnDispatchMessage(const Message& message) { 238 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
216 if (!listener_) 239 if (!listener_)
217 return; 240 return;
218 241
219 OnDispatchConnected(); 242 OnDispatchConnected();
220 243
221 #ifdef IPC_MESSAGE_LOG_ENABLED 244 #ifdef IPC_MESSAGE_LOG_ENABLED
222 Logging* logger = Logging::current(); 245 Logging* logger = Logging::current();
223 if (message.type() == IPC_LOGGING_ID) { 246 if (message.type() == IPC_LOGGING_ID) {
224 logger->OnReceivedLoggingMessage(message); 247 logger->OnReceivedLoggingMessage(message);
(...skipping 23 matching lines...) Expand all
248 } 271 }
249 272
250 // Called on the listener's thread 273 // Called on the listener's thread
251 void ChannelProxy::Context::OnDispatchError() { 274 void ChannelProxy::Context::OnDispatchError() {
252 if (listener_) 275 if (listener_)
253 listener_->OnChannelError(); 276 listener_->OnChannelError();
254 } 277 }
255 278
256 //----------------------------------------------------------------------------- 279 //-----------------------------------------------------------------------------
257 280
258 ChannelProxy::ChannelProxy(const std::string& channel_id, Channel::Mode mode, 281 ChannelProxy::ChannelProxy(const std::string& channel_id,
259 Channel::Listener* listener, MessageFilter* filter, 282 Channel::Mode mode,
283 Channel::Listener* listener,
260 MessageLoop* ipc_thread) 284 MessageLoop* ipc_thread)
261 : context_(new Context(listener, filter, ipc_thread)) { 285 : context_(new Context(listener, ipc_thread)) {
262 Init(channel_id, mode, ipc_thread, true); 286 Init(channel_id, mode, ipc_thread, true);
263 } 287 }
264 288
265 ChannelProxy::ChannelProxy(const std::string& channel_id, Channel::Mode mode, 289 ChannelProxy::ChannelProxy(const std::string& channel_id,
266 MessageLoop* ipc_thread, Context* context, 290 Channel::Mode mode,
291 MessageLoop* ipc_thread,
292 Context* context,
267 bool create_pipe_now) 293 bool create_pipe_now)
268 : context_(context) { 294 : context_(context) {
269 Init(channel_id, mode, ipc_thread, create_pipe_now); 295 Init(channel_id, mode, ipc_thread, create_pipe_now);
270 } 296 }
271 297
272 ChannelProxy::~ChannelProxy() { 298 ChannelProxy::~ChannelProxy() {
273 Close(); 299 Close();
274 } 300 }
275 301
276 void ChannelProxy::Init(const std::string& channel_id, Channel::Mode mode, 302 void ChannelProxy::Init(const std::string& channel_id, Channel::Mode mode,
277 MessageLoop* ipc_thread_loop, bool create_pipe_now) { 303 MessageLoop* ipc_thread_loop, bool create_pipe_now) {
278 if (create_pipe_now) { 304 if (create_pipe_now) {
279 // Create the channel immediately. This effectively sets up the 305 // Create the channel immediately. This effectively sets up the
280 // low-level pipe so that the client can connect. Without creating 306 // low-level pipe so that the client can connect. Without creating
281 // the pipe immediately, it is possible for a listener to attempt 307 // the pipe immediately, it is possible for a listener to attempt
282 // to connect and get an error since the pipe doesn't exist yet. 308 // to connect and get an error since the pipe doesn't exist yet.
283 context_->CreateChannel(channel_id, mode); 309 context_->CreateChannel(channel_id, mode);
284 } else { 310 } else {
285 context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 311 context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
286 context_.get(), &Context::CreateChannel, channel_id, mode)); 312 context_.get(), &Context::CreateChannel, channel_id, mode));
287 } 313 }
288 314
289 // complete initialization on the background thread 315 // complete initialization on the background thread
290 context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 316 context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
291 context_.get(), &Context::OnChannelOpened)); 317 context_.get(), &Context::OnChannelOpened));
Matt Perry 2010/12/02 19:00:39 Couldn't this cause OnChannelOpened to run, then O
jam 2010/12/02 19:08:53 That's fine, since OnAddFilter will call OnChannel
292 } 318 }
293 319
294 void ChannelProxy::Close() { 320 void ChannelProxy::Close() {
295 // Clear the backpointer to the listener so that any pending calls to 321 // Clear the backpointer to the listener so that any pending calls to
296 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is 322 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
297 // possible that the channel could be closed while it is receiving messages! 323 // possible that the channel could be closed while it is receiving messages!
298 context_->Clear(); 324 context_->Clear();
299 325
300 if (context_->ipc_message_loop()) { 326 if (context_->ipc_message_loop()) {
301 context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 327 context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
302 context_.get(), &Context::OnChannelClosed)); 328 context_.get(), &Context::OnChannelClosed));
303 } 329 }
304 } 330 }
305 331
306 bool ChannelProxy::Send(Message* message) { 332 bool ChannelProxy::Send(Message* message) {
307 #ifdef IPC_MESSAGE_LOG_ENABLED 333 #ifdef IPC_MESSAGE_LOG_ENABLED
308 Logging::current()->OnSendMessage(message, context_->channel_id()); 334 Logging::current()->OnSendMessage(message, context_->channel_id());
309 #endif 335 #endif
310 336
311 context_->ipc_message_loop()->PostTask(FROM_HERE, 337 context_->ipc_message_loop()->PostTask(FROM_HERE,
312 new SendTask(context_.get(), message)); 338 new SendTask(context_.get(), message));
313 return true; 339 return true;
314 } 340 }
315 341
316 void ChannelProxy::AddFilter(MessageFilter* filter) { 342 void ChannelProxy::AddFilter(MessageFilter* filter) {
317 context_->ipc_message_loop()->PostTask( 343 context_->AddFilter(filter);
318 FROM_HERE,
319 NewRunnableMethod(
320 context_.get(),
321 &Context::OnAddFilter,
322 make_scoped_refptr(filter)));
323 } 344 }
324 345
325 void ChannelProxy::RemoveFilter(MessageFilter* filter) { 346 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
326 context_->ipc_message_loop()->PostTask( 347 context_->ipc_message_loop()->PostTask(
327 FROM_HERE, NewRunnableMethod( 348 FROM_HERE, NewRunnableMethod(
328 context_.get(), 349 context_.get(),
329 &Context::OnRemoveFilter, 350 &Context::OnRemoveFilter,
330 make_scoped_refptr(filter))); 351 make_scoped_refptr(filter)));
331 } 352 }
332 353
333 void ChannelProxy::ClearIPCMessageLoop() { 354 void ChannelProxy::ClearIPCMessageLoop() {
334 context()->ClearIPCMessageLoop(); 355 context()->ClearIPCMessageLoop();
335 } 356 }
336 357
337 #if defined(OS_POSIX) && !defined(OS_NACL) 358 #if defined(OS_POSIX) && !defined(OS_NACL)
338 // See the TODO regarding lazy initialization of the channel in 359 // See the TODO regarding lazy initialization of the channel in
339 // ChannelProxy::Init(). 360 // ChannelProxy::Init().
340 // We assume that IPC::Channel::GetClientFileDescriptorMapping() is thread-safe. 361 // We assume that IPC::Channel::GetClientFileDescriptorMapping() is thread-safe.
341 int ChannelProxy::GetClientFileDescriptor() const { 362 int ChannelProxy::GetClientFileDescriptor() const {
342 Channel *channel = context_.get()->channel_; 363 Channel *channel = context_.get()->channel_;
343 DCHECK(channel); // Channel must have been created first. 364 DCHECK(channel); // Channel must have been created first.
344 return channel->GetClientFileDescriptor(); 365 return channel->GetClientFileDescriptor();
345 } 366 }
346 #endif 367 #endif
347 368
348 //----------------------------------------------------------------------------- 369 //-----------------------------------------------------------------------------
349 370
350 } // namespace IPC 371 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_channel_proxy.h ('k') | ipc/ipc_sync_channel.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698