Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(155)

Side by Side Diff: mojo/system/message_pipe.cc

Issue 240133005: Mojo: Make some attempts towards fixing remote message pipe closure. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: fix some locking issues Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « mojo/system/message_pipe.h ('k') | mojo/system/message_pipe_dispatcher.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/message_pipe.h" 5 #include "mojo/system/message_pipe.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "mojo/system/channel.h" 8 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h" 9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h" 10 #include "mojo/system/message_in_transit.h"
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
50 50
51 void MessagePipe::Close(unsigned port) { 51 void MessagePipe::Close(unsigned port) {
52 DCHECK(port == 0 || port == 1); 52 DCHECK(port == 0 || port == 1);
53 53
54 unsigned destination_port = GetPeerPort(port); 54 unsigned destination_port = GetPeerPort(port);
55 55
56 base::AutoLock locker(lock_); 56 base::AutoLock locker(lock_);
57 DCHECK(endpoints_[port].get()); 57 DCHECK(endpoints_[port].get());
58 58
59 endpoints_[port]->Close(); 59 endpoints_[port]->Close();
60 if (endpoints_[destination_port].get()) 60 if (endpoints_[destination_port].get()) {
61 endpoints_[destination_port]->OnPeerClose(); 61 if (!endpoints_[destination_port]->OnPeerClose())
62 endpoints_[destination_port].reset();
63 }
62 endpoints_[port].reset(); 64 endpoints_[port].reset();
63 } 65 }
64 66
65 // TODO(vtl): Handle flags. 67 // TODO(vtl): Handle flags.
66 MojoResult MessagePipe::WriteMessage( 68 MojoResult MessagePipe::WriteMessage(
67 unsigned port, 69 unsigned port,
68 const void* bytes, 70 const void* bytes,
69 uint32_t num_bytes, 71 uint32_t num_bytes,
70 std::vector<DispatcherTransport>* transports, 72 std::vector<DispatcherTransport>* transports,
71 MojoWriteMessageFlags flags) { 73 MojoWriteMessageFlags flags) {
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after
216 } 218 }
217 } 219 }
218 message->SetDispatchers(dispatchers.Pass()); 220 message->SetDispatchers(dispatchers.Pass());
219 } 221 }
220 222
221 // The endpoint's |EnqueueMessage()| may not report failure. 223 // The endpoint's |EnqueueMessage()| may not report failure.
222 endpoints_[port]->EnqueueMessage(message.Pass()); 224 endpoints_[port]->EnqueueMessage(message.Pass());
223 return MOJO_RESULT_OK; 225 return MOJO_RESULT_OK;
224 } 226 }
225 227
226 void MessagePipe::Attach(unsigned port, 228 bool MessagePipe::Attach(unsigned port,
227 scoped_refptr<Channel> channel, 229 scoped_refptr<Channel> channel,
228 MessageInTransit::EndpointId local_id) { 230 MessageInTransit::EndpointId local_id) {
229 DCHECK(port == 0 || port == 1); 231 DCHECK(port == 0 || port == 1);
230 DCHECK(channel.get()); 232 DCHECK(channel.get());
231 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 233 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
232 234
233 base::AutoLock locker(lock_); 235 base::AutoLock locker(lock_);
234 DCHECK(endpoints_[port].get()); 236 if (!endpoints_[port].get())
237 return false;
235 238
239 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
236 endpoints_[port]->Attach(channel, local_id); 240 endpoints_[port]->Attach(channel, local_id);
241 return true;
237 } 242 }
238 243
239 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { 244 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
240 DCHECK(port == 0 || port == 1); 245 DCHECK(port == 0 || port == 1);
241 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); 246 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
242 247
243 base::AutoLock locker(lock_); 248 base::AutoLock locker(lock_);
244 DCHECK(endpoints_[port].get()); 249 DCHECK(endpoints_[port].get());
245 endpoints_[port]->Run(remote_id); 250 if (!endpoints_[port]->Run(remote_id))
251 endpoints_[port].reset();
252 }
253
254 void MessagePipe::OnRemove(unsigned port) {
255 unsigned destination_port = GetPeerPort(port);
256
257 base::AutoLock locker(lock_);
258 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
259 if (!endpoints_[port].get())
260 return;
261
262 endpoints_[port]->OnRemove();
263 if (endpoints_[destination_port].get()) {
264 if (!endpoints_[destination_port]->OnPeerClose())
265 endpoints_[destination_port].reset();
266 }
267 endpoints_[port].reset();
246 } 268 }
247 269
248 MessagePipe::~MessagePipe() { 270 MessagePipe::~MessagePipe() {
249 // Owned by the dispatchers. The owning dispatchers should only release us via 271 // Owned by the dispatchers. The owning dispatchers should only release us via
250 // their |Close()| method, which should inform us of being closed via our 272 // their |Close()| method, which should inform us of being closed via our
251 // |Close()|. Thus these should already be null. 273 // |Close()|. Thus these should already be null.
252 DCHECK(!endpoints_[0].get()); 274 DCHECK(!endpoints_[0].get());
253 DCHECK(!endpoints_[1].get()); 275 DCHECK(!endpoints_[1].get());
254 } 276 }
255 277
256 MojoResult MessagePipe::HandleControlMessage( 278 MojoResult MessagePipe::HandleControlMessage(
257 unsigned port, 279 unsigned /*port*/,
258 scoped_ptr<MessageInTransit> message) { 280 scoped_ptr<MessageInTransit> message) {
259 DCHECK(port == 0 || port == 1);
260 DCHECK(message.get());
261 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe);
262
263 switch (message->subtype()) {
264 case MessageInTransit::kSubtypeMessagePipePeerClosed: {
265 unsigned source_port = GetPeerPort(port);
266
267 base::AutoLock locker(lock_);
268 DCHECK(endpoints_[source_port].get());
269
270 endpoints_[source_port]->Close();
271 if (endpoints_[port].get())
272 endpoints_[port]->OnPeerClose();
273
274 endpoints_[source_port].reset();
275 return MOJO_RESULT_OK;
276 }
277 }
278
279 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " 281 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
280 << message->subtype(); 282 << message->subtype();
281 return MOJO_RESULT_UNKNOWN; 283 return MOJO_RESULT_UNKNOWN;
282 } 284 }
283 285
284 } // namespace system 286 } // namespace system
285 } // namespace mojo 287 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/message_pipe.h ('k') | mojo/system/message_pipe_dispatcher.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698