| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/message_pipe.h" | 5 #include "mojo/system/message_pipe.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "mojo/system/channel.h" | 8 #include "mojo/system/channel.h" |
| 9 #include "mojo/system/local_message_pipe_endpoint.h" | 9 #include "mojo/system/local_message_pipe_endpoint.h" |
| 10 #include "mojo/system/message_in_transit.h" | 10 #include "mojo/system/message_in_transit.h" |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 50 | 50 |
| 51 void MessagePipe::Close(unsigned port) { | 51 void MessagePipe::Close(unsigned port) { |
| 52 DCHECK(port == 0 || port == 1); | 52 DCHECK(port == 0 || port == 1); |
| 53 | 53 |
| 54 unsigned destination_port = GetPeerPort(port); | 54 unsigned destination_port = GetPeerPort(port); |
| 55 | 55 |
| 56 base::AutoLock locker(lock_); | 56 base::AutoLock locker(lock_); |
| 57 DCHECK(endpoints_[port].get()); | 57 DCHECK(endpoints_[port].get()); |
| 58 | 58 |
| 59 endpoints_[port]->Close(); | 59 endpoints_[port]->Close(); |
| 60 if (endpoints_[destination_port].get()) | 60 if (endpoints_[destination_port].get()) { |
| 61 endpoints_[destination_port]->OnPeerClose(); | 61 if (!endpoints_[destination_port]->OnPeerClose()) |
| 62 endpoints_[destination_port].reset(); |
| 63 } |
| 62 endpoints_[port].reset(); | 64 endpoints_[port].reset(); |
| 63 } | 65 } |
| 64 | 66 |
| 65 // TODO(vtl): Handle flags. | 67 // TODO(vtl): Handle flags. |
| 66 MojoResult MessagePipe::WriteMessage( | 68 MojoResult MessagePipe::WriteMessage( |
| 67 unsigned port, | 69 unsigned port, |
| 68 const void* bytes, | 70 const void* bytes, |
| 69 uint32_t num_bytes, | 71 uint32_t num_bytes, |
| 70 std::vector<DispatcherTransport>* transports, | 72 std::vector<DispatcherTransport>* transports, |
| 71 MojoWriteMessageFlags flags) { | 73 MojoWriteMessageFlags flags) { |
| (...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 216 } | 218 } |
| 217 } | 219 } |
| 218 message->SetDispatchers(dispatchers.Pass()); | 220 message->SetDispatchers(dispatchers.Pass()); |
| 219 } | 221 } |
| 220 | 222 |
| 221 // The endpoint's |EnqueueMessage()| may not report failure. | 223 // The endpoint's |EnqueueMessage()| may not report failure. |
| 222 endpoints_[port]->EnqueueMessage(message.Pass()); | 224 endpoints_[port]->EnqueueMessage(message.Pass()); |
| 223 return MOJO_RESULT_OK; | 225 return MOJO_RESULT_OK; |
| 224 } | 226 } |
| 225 | 227 |
| 226 void MessagePipe::Attach(unsigned port, | 228 bool MessagePipe::Attach(unsigned port, |
| 227 scoped_refptr<Channel> channel, | 229 scoped_refptr<Channel> channel, |
| 228 MessageInTransit::EndpointId local_id) { | 230 MessageInTransit::EndpointId local_id) { |
| 229 DCHECK(port == 0 || port == 1); | 231 DCHECK(port == 0 || port == 1); |
| 230 DCHECK(channel.get()); | 232 DCHECK(channel.get()); |
| 231 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | 233 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
| 232 | 234 |
| 233 base::AutoLock locker(lock_); | 235 base::AutoLock locker(lock_); |
| 234 DCHECK(endpoints_[port].get()); | 236 if (!endpoints_[port].get()) |
| 237 return false; |
| 235 | 238 |
| 239 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy); |
| 236 endpoints_[port]->Attach(channel, local_id); | 240 endpoints_[port]->Attach(channel, local_id); |
| 241 return true; |
| 237 } | 242 } |
| 238 | 243 |
| 239 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { | 244 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { |
| 240 DCHECK(port == 0 || port == 1); | 245 DCHECK(port == 0 || port == 1); |
| 241 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); | 246 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); |
| 242 | 247 |
| 243 base::AutoLock locker(lock_); | 248 base::AutoLock locker(lock_); |
| 244 DCHECK(endpoints_[port].get()); | 249 DCHECK(endpoints_[port].get()); |
| 245 endpoints_[port]->Run(remote_id); | 250 if (!endpoints_[port]->Run(remote_id)) |
| 251 endpoints_[port].reset(); |
| 252 } |
| 253 |
| 254 void MessagePipe::OnRemove(unsigned port) { |
| 255 unsigned destination_port = GetPeerPort(port); |
| 256 |
| 257 base::AutoLock locker(lock_); |
| 258 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called. |
| 259 if (!endpoints_[port].get()) |
| 260 return; |
| 261 |
| 262 endpoints_[port]->OnRemove(); |
| 263 if (endpoints_[destination_port].get()) { |
| 264 if (!endpoints_[destination_port]->OnPeerClose()) |
| 265 endpoints_[destination_port].reset(); |
| 266 } |
| 267 endpoints_[port].reset(); |
| 246 } | 268 } |
| 247 | 269 |
| 248 MessagePipe::~MessagePipe() { | 270 MessagePipe::~MessagePipe() { |
| 249 // Owned by the dispatchers. The owning dispatchers should only release us via | 271 // Owned by the dispatchers. The owning dispatchers should only release us via |
| 250 // their |Close()| method, which should inform us of being closed via our | 272 // their |Close()| method, which should inform us of being closed via our |
| 251 // |Close()|. Thus these should already be null. | 273 // |Close()|. Thus these should already be null. |
| 252 DCHECK(!endpoints_[0].get()); | 274 DCHECK(!endpoints_[0].get()); |
| 253 DCHECK(!endpoints_[1].get()); | 275 DCHECK(!endpoints_[1].get()); |
| 254 } | 276 } |
| 255 | 277 |
| 256 MojoResult MessagePipe::HandleControlMessage( | 278 MojoResult MessagePipe::HandleControlMessage( |
| 257 unsigned port, | 279 unsigned /*port*/, |
| 258 scoped_ptr<MessageInTransit> message) { | 280 scoped_ptr<MessageInTransit> message) { |
| 259 DCHECK(port == 0 || port == 1); | |
| 260 DCHECK(message.get()); | |
| 261 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe); | |
| 262 | |
| 263 switch (message->subtype()) { | |
| 264 case MessageInTransit::kSubtypeMessagePipePeerClosed: { | |
| 265 unsigned source_port = GetPeerPort(port); | |
| 266 | |
| 267 base::AutoLock locker(lock_); | |
| 268 DCHECK(endpoints_[source_port].get()); | |
| 269 | |
| 270 endpoints_[source_port]->Close(); | |
| 271 if (endpoints_[port].get()) | |
| 272 endpoints_[port]->OnPeerClose(); | |
| 273 | |
| 274 endpoints_[source_port].reset(); | |
| 275 return MOJO_RESULT_OK; | |
| 276 } | |
| 277 } | |
| 278 | |
| 279 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " | 281 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " |
| 280 << message->subtype(); | 282 << message->subtype(); |
| 281 return MOJO_RESULT_UNKNOWN; | 283 return MOJO_RESULT_UNKNOWN; |
| 282 } | 284 } |
| 283 | 285 |
| 284 } // namespace system | 286 } // namespace system |
| 285 } // namespace mojo | 287 } // namespace mojo |
| OLD | NEW |