| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "mojo/edk/system/channel.h" | 8 #include "mojo/edk/system/channel.h" |
| 9 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
| 10 #include "mojo/edk/system/channel_endpoint_id.h" | 10 #include "mojo/edk/system/channel_endpoint_id.h" |
| 11 #include "mojo/edk/system/endpoint_relayer.h" |
| 11 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 12 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
| 12 #include "mojo/edk/system/message_in_transit.h" | 13 #include "mojo/edk/system/message_in_transit.h" |
| 13 #include "mojo/edk/system/message_pipe_dispatcher.h" | 14 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 14 #include "mojo/edk/system/message_pipe_endpoint.h" | 15 #include "mojo/edk/system/message_pipe_endpoint.h" |
| 15 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
| 16 | 17 |
| 17 namespace mojo { | 18 namespace mojo { |
| 18 namespace system { | 19 namespace system { |
| 19 | 20 |
| 20 namespace { | 21 namespace { |
| (...skipping 13 matching lines...) Expand all Loading... |
| 34 MessagePipe* MessagePipe::CreateLocalLocal() { | 35 MessagePipe* MessagePipe::CreateLocalLocal() { |
| 35 MessagePipe* message_pipe = new MessagePipe(); | 36 MessagePipe* message_pipe = new MessagePipe(); |
| 36 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 37 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 37 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 38 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 38 return message_pipe; | 39 return message_pipe; |
| 39 } | 40 } |
| 40 | 41 |
| 41 // static | 42 // static |
| 42 MessagePipe* MessagePipe::CreateLocalProxy( | 43 MessagePipe* MessagePipe::CreateLocalProxy( |
| 43 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | 44 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
| 44 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. | 45 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
| 45 MessagePipe* message_pipe = new MessagePipe(); | 46 MessagePipe* message_pipe = new MessagePipe(); |
| 46 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 47 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 47 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); | 48 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); |
| 48 message_pipe->endpoints_[1].reset( | 49 message_pipe->endpoints_[1].reset( |
| 49 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | 50 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
| 50 return message_pipe; | 51 return message_pipe; |
| 51 } | 52 } |
| 52 | 53 |
| 53 // static | 54 // static |
| 54 MessagePipe* MessagePipe::CreateProxyLocal( | 55 MessagePipe* MessagePipe::CreateProxyLocal( |
| 55 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | 56 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
| 56 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. | 57 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
| 57 MessagePipe* message_pipe = new MessagePipe(); | 58 MessagePipe* message_pipe = new MessagePipe(); |
| 58 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); | 59 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); |
| 59 message_pipe->endpoints_[0].reset( | 60 message_pipe->endpoints_[0].reset( |
| 60 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | 61 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
| 61 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 62 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 62 return message_pipe; | 63 return message_pipe; |
| 63 } | 64 } |
| 64 | 65 |
| 65 // static | 66 // static |
| 66 unsigned MessagePipe::GetPeerPort(unsigned port) { | 67 unsigned MessagePipe::GetPeerPort(unsigned port) { |
| 67 DCHECK(port == 0 || port == 1); | 68 DCHECK(port == 0 || port == 1); |
| 68 return port ^ 1; | 69 return port ^ 1; |
| 69 } | 70 } |
| 70 | 71 |
| 71 // static | 72 // static |
| 72 bool MessagePipe::Deserialize(Channel* channel, | 73 bool MessagePipe::Deserialize(Channel* channel, |
| 73 const void* source, | 74 const void* source, |
| 74 size_t size, | 75 size_t size, |
| 75 scoped_refptr<MessagePipe>* message_pipe, | 76 scoped_refptr<MessagePipe>* message_pipe, |
| 76 unsigned* port) { | 77 unsigned* port) { |
| 77 DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely. | 78 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. |
| 78 | 79 |
| 79 if (size != sizeof(SerializedMessagePipe)) { | 80 if (size != sizeof(SerializedMessagePipe)) { |
| 80 LOG(ERROR) << "Invalid serialized message pipe"; | 81 LOG(ERROR) << "Invalid serialized message pipe"; |
| 81 return false; | 82 return false; |
| 82 } | 83 } |
| 83 | 84 |
| 84 const SerializedMessagePipe* s = | 85 const SerializedMessagePipe* s = |
| 85 static_cast<const SerializedMessagePipe*>(source); | 86 static_cast<const SerializedMessagePipe*>(source); |
| 86 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); | 87 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); |
| 87 if (!message_pipe->get()) { | 88 if (!*message_pipe) { |
| 88 LOG(ERROR) << "Failed to deserialize message pipe (ID = " | 89 LOG(ERROR) << "Failed to deserialize message pipe (ID = " |
| 89 << s->receiver_endpoint_id << ")"; | 90 << s->receiver_endpoint_id << ")"; |
| 90 return false; | 91 return false; |
| 91 } | 92 } |
| 92 | 93 |
| 93 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " | 94 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " |
| 94 << s->receiver_endpoint_id << ")"; | 95 << s->receiver_endpoint_id << ")"; |
| 95 *port = 0; | 96 *port = 0; |
| 96 return true; | 97 return true; |
| 97 } | 98 } |
| 98 | 99 |
| 99 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | 100 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
| 100 DCHECK(port == 0 || port == 1); | 101 DCHECK(port == 0 || port == 1); |
| 101 base::AutoLock locker(lock_); | 102 base::AutoLock locker(lock_); |
| 102 DCHECK(endpoints_[port]); | 103 DCHECK(endpoints_[port]); |
| 103 | 104 |
| 104 return endpoints_[port]->GetType(); | 105 return endpoints_[port]->GetType(); |
| 105 } | 106 } |
| 106 | 107 |
| 107 void MessagePipe::CancelAllWaiters(unsigned port) { | 108 void MessagePipe::CancelAllAwakables(unsigned port) { |
| 108 DCHECK(port == 0 || port == 1); | 109 DCHECK(port == 0 || port == 1); |
| 109 | 110 |
| 110 base::AutoLock locker(lock_); | 111 base::AutoLock locker(lock_); |
| 111 DCHECK(endpoints_[port]); | 112 DCHECK(endpoints_[port]); |
| 112 endpoints_[port]->CancelAllWaiters(); | 113 endpoints_[port]->CancelAllAwakables(); |
| 113 } | 114 } |
| 114 | 115 |
| 115 void MessagePipe::Close(unsigned port) { | 116 void MessagePipe::Close(unsigned port) { |
| 116 DCHECK(port == 0 || port == 1); | 117 DCHECK(port == 0 || port == 1); |
| 117 | 118 |
| 118 unsigned destination_port = GetPeerPort(port); | 119 unsigned peer_port = GetPeerPort(port); |
| 119 | 120 |
| 120 base::AutoLock locker(lock_); | 121 base::AutoLock locker(lock_); |
| 121 // The endpoint's |OnPeerClose()| may have been called first and returned | 122 // The endpoint's |OnPeerClose()| may have been called first and returned |
| 122 // false, which would have resulted in its destruction. | 123 // false, which would have resulted in its destruction. |
| 123 if (!endpoints_[port]) | 124 if (!endpoints_[port]) |
| 124 return; | 125 return; |
| 125 | 126 |
| 126 endpoints_[port]->Close(); | 127 endpoints_[port]->Close(); |
| 127 if (endpoints_[destination_port]) { | 128 if (endpoints_[peer_port]) { |
| 128 if (!endpoints_[destination_port]->OnPeerClose()) | 129 if (!endpoints_[peer_port]->OnPeerClose()) |
| 129 endpoints_[destination_port].reset(); | 130 endpoints_[peer_port].reset(); |
| 130 } | 131 } |
| 131 endpoints_[port].reset(); | 132 endpoints_[port].reset(); |
| 132 } | 133 } |
| 133 | 134 |
| 134 // TODO(vtl): Handle flags. | 135 // TODO(vtl): Handle flags. |
| 135 MojoResult MessagePipe::WriteMessage( | 136 MojoResult MessagePipe::WriteMessage( |
| 136 unsigned port, | 137 unsigned port, |
| 137 UserPointer<const void> bytes, | 138 UserPointer<const void> bytes, |
| 138 uint32_t num_bytes, | 139 uint32_t num_bytes, |
| 139 std::vector<DispatcherTransport>* transports, | 140 std::vector<DispatcherTransport>* transports, |
| 140 MojoWriteMessageFlags flags) { | 141 MojoWriteMessageFlags flags) { |
| 141 DCHECK(port == 0 || port == 1); | 142 DCHECK(port == 0 || port == 1); |
| 142 return EnqueueMessage( | 143 |
| 144 base::AutoLock locker(lock_); |
| 145 return EnqueueMessageNoLock( |
| 143 GetPeerPort(port), | 146 GetPeerPort(port), |
| 144 make_scoped_ptr(new MessageInTransit( | 147 make_scoped_ptr(new MessageInTransit( |
| 145 MessageInTransit::kTypeEndpoint, | 148 MessageInTransit::kTypeEndpoint, |
| 146 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), | 149 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), |
| 147 transports); | 150 transports); |
| 148 } | 151 } |
| 149 | 152 |
| 150 MojoResult MessagePipe::ReadMessage(unsigned port, | 153 MojoResult MessagePipe::ReadMessage(unsigned port, |
| 151 UserPointer<void> bytes, | 154 UserPointer<void> bytes, |
| 152 UserPointer<uint32_t> num_bytes, | 155 UserPointer<uint32_t> num_bytes, |
| (...skipping 11 matching lines...) Expand all Loading... |
| 164 | 167 |
| 165 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { | 168 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { |
| 166 DCHECK(port == 0 || port == 1); | 169 DCHECK(port == 0 || port == 1); |
| 167 | 170 |
| 168 base::AutoLock locker(const_cast<base::Lock&>(lock_)); | 171 base::AutoLock locker(const_cast<base::Lock&>(lock_)); |
| 169 DCHECK(endpoints_[port]); | 172 DCHECK(endpoints_[port]); |
| 170 | 173 |
| 171 return endpoints_[port]->GetHandleSignalsState(); | 174 return endpoints_[port]->GetHandleSignalsState(); |
| 172 } | 175 } |
| 173 | 176 |
| 174 MojoResult MessagePipe::AddWaiter(unsigned port, | 177 MojoResult MessagePipe::AddAwakable(unsigned port, |
| 175 Waiter* waiter, | 178 Awakable* awakable, |
| 176 MojoHandleSignals signals, | 179 MojoHandleSignals signals, |
| 177 uint32_t context, | 180 uint32_t context, |
| 178 HandleSignalsState* signals_state) { | 181 HandleSignalsState* signals_state) { |
| 179 DCHECK(port == 0 || port == 1); | 182 DCHECK(port == 0 || port == 1); |
| 180 | 183 |
| 181 base::AutoLock locker(lock_); | 184 base::AutoLock locker(lock_); |
| 182 DCHECK(endpoints_[port]); | 185 DCHECK(endpoints_[port]); |
| 183 | 186 |
| 184 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state); | 187 return endpoints_[port]->AddAwakable(awakable, signals, context, |
| 188 signals_state); |
| 185 } | 189 } |
| 186 | 190 |
| 187 void MessagePipe::RemoveWaiter(unsigned port, | 191 void MessagePipe::RemoveAwakable(unsigned port, |
| 188 Waiter* waiter, | 192 Awakable* awakable, |
| 189 HandleSignalsState* signals_state) { | 193 HandleSignalsState* signals_state) { |
| 190 DCHECK(port == 0 || port == 1); | 194 DCHECK(port == 0 || port == 1); |
| 191 | 195 |
| 192 base::AutoLock locker(lock_); | 196 base::AutoLock locker(lock_); |
| 193 DCHECK(endpoints_[port]); | 197 DCHECK(endpoints_[port]); |
| 194 | 198 |
| 195 endpoints_[port]->RemoveWaiter(waiter, signals_state); | 199 endpoints_[port]->RemoveAwakable(awakable, signals_state); |
| 196 } | 200 } |
| 197 | 201 |
| 198 void MessagePipe::StartSerialize(unsigned /*port*/, | 202 void MessagePipe::StartSerialize(unsigned /*port*/, |
| 199 Channel* /*channel*/, | 203 Channel* /*channel*/, |
| 200 size_t* max_size, | 204 size_t* max_size, |
| 201 size_t* max_platform_handles) { | 205 size_t* max_platform_handles) { |
| 202 *max_size = sizeof(SerializedMessagePipe); | 206 *max_size = sizeof(SerializedMessagePipe); |
| 203 *max_platform_handles = 0; | 207 *max_platform_handles = 0; |
| 204 } | 208 } |
| 205 | 209 |
| (...skipping 30 matching lines...) Expand all Loading... |
| 236 // |ProxyMessagePipeEndpoint| to replace |port|'s | 240 // |ProxyMessagePipeEndpoint| to replace |port|'s |
| 237 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer | 241 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer |
| 238 // port's message pipe dispatcher will continue to hold a reference to | 242 // port's message pipe dispatcher will continue to hold a reference to |
| 239 // us. | 243 // us. |
| 240 // | 244 // |
| 241 // 3. The peer port is remote. | 245 // 3. The peer port is remote. |
| 242 // | 246 // |
| 243 // We also pass its |ChannelEndpoint| to the channel, which then decides | 247 // We also pass its |ChannelEndpoint| to the channel, which then decides |
| 244 // what to do. We have no reason to continue to exist. | 248 // what to do. We have no reason to continue to exist. |
| 245 // | 249 // |
| 246 // TODO(vtl): Factor some of this out to |ChannelEndpoint|. | 250 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). |
| 247 | 251 |
| 248 if (!endpoints_[GetPeerPort(port)]) { | 252 unsigned peer_port = GetPeerPort(port); |
| 253 if (!endpoints_[peer_port]) { |
| 249 // Case 1. | 254 // Case 1. |
| 250 channel_endpoint = new ChannelEndpoint( | 255 channel_endpoint = new ChannelEndpoint( |
| 251 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( | 256 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
| 252 endpoints_[port].get())->message_queue()); | 257 endpoints_[port].get())->message_queue()); |
| 253 endpoints_[port]->Close(); | 258 endpoints_[port]->Close(); |
| 254 endpoints_[port].reset(); | 259 endpoints_[port].reset(); |
| 255 } else if (endpoints_[GetPeerPort(port)]->GetType() == | 260 } else if (endpoints_[peer_port]->GetType() == |
| 256 MessagePipeEndpoint::kTypeLocal) { | 261 MessagePipeEndpoint::kTypeLocal) { |
| 257 // Case 2. | 262 // Case 2. |
| 258 channel_endpoint = new ChannelEndpoint( | 263 channel_endpoint = new ChannelEndpoint( |
| 259 this, port, static_cast<LocalMessagePipeEndpoint*>( | 264 this, port, static_cast<LocalMessagePipeEndpoint*>( |
| 260 endpoints_[port].get())->message_queue()); | 265 endpoints_[port].get())->message_queue()); |
| 261 endpoints_[port]->Close(); | 266 endpoints_[port]->Close(); |
| 262 endpoints_[port].reset( | 267 endpoints_[port].reset( |
| 263 new ProxyMessagePipeEndpoint(channel_endpoint.get())); | 268 new ProxyMessagePipeEndpoint(channel_endpoint.get())); |
| 264 } else { | 269 } else { |
| 265 // Case 3. | 270 // Case 3. |
| 266 // TODO(vtl): Temporarily the same as case 2. | |
| 267 DLOG(WARNING) << "Direct message pipe passing across multiple channels " | 271 DLOG(WARNING) << "Direct message pipe passing across multiple channels " |
| 268 "not yet implemented; will proxy"; | 272 "not yet implemented; will proxy"; |
| 273 |
| 274 // Create an |EndpointRelayer| to replace ourselves (rather than having a |
| 275 // |MessagePipe| object that exists solely to relay messages between two |
| 276 // |ChannelEndpoint|s, owned by the |Channel| through them. |
| 277 // |
| 278 // This reduces overhead somewhat, and more importantly restores some |
| 279 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. |
| 280 // |
| 281 // TODO(vtl): If we get the |Channel| to own/track the relayer directly, |
| 282 // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw |
| 283 // pointer (and not have the |Channel| owning the relayer via its |
| 284 // |ChannelEndpoint|s. |
| 285 // |
| 286 // TODO(vtl): This is not obviously the right place for (all of) this |
| 287 // logic, nor is it obviously factored correctly. |
| 288 |
| 289 DCHECK_EQ(endpoints_[peer_port]->GetType(), |
| 290 MessagePipeEndpoint::kTypeProxy); |
| 291 ProxyMessagePipeEndpoint* peer_endpoint = |
| 292 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); |
| 293 scoped_refptr<ChannelEndpoint> peer_channel_endpoint = |
| 294 peer_endpoint->ReleaseChannelEndpoint(); |
| 295 |
| 296 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); |
| 297 // We'll assign our peer port's endpoint to the relayer's port 1, and this |
| 298 // port's endpoint to the relayer's port 0. |
| 269 channel_endpoint = new ChannelEndpoint( | 299 channel_endpoint = new ChannelEndpoint( |
| 270 this, port, static_cast<LocalMessagePipeEndpoint*>( | 300 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( |
| 271 endpoints_[port].get())->message_queue()); | 301 endpoints_[port].get())->message_queue()); |
| 302 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); |
| 303 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); |
| 304 |
| 272 endpoints_[port]->Close(); | 305 endpoints_[port]->Close(); |
| 273 endpoints_[port].reset( | 306 endpoints_[port].reset(); |
| 274 new ProxyMessagePipeEndpoint(channel_endpoint.get())); | 307 // No need to call |Close()| after |ReleaseChannelEndpoint()|. |
| 308 endpoints_[peer_port].reset(); |
| 275 } | 309 } |
| 276 } | 310 } |
| 277 | 311 |
| 278 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); | 312 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); |
| 279 | 313 |
| 280 // Convert the local endpoint to a proxy endpoint (moving the message queue) | 314 // Convert the local endpoint to a proxy endpoint (moving the message queue) |
| 281 // and attach it to the channel. | 315 // and attach it to the channel. |
| 282 s->receiver_endpoint_id = | 316 s->receiver_endpoint_id = |
| 283 channel->AttachAndRunEndpoint(channel_endpoint, false); | 317 channel->AttachAndRunEndpoint(channel_endpoint, false); |
| 284 DVLOG(2) << "Serializing message pipe (remote ID = " | 318 DVLOG(2) << "Serializing message pipe (remote ID = " |
| 285 << s->receiver_endpoint_id << ")"; | 319 << s->receiver_endpoint_id << ")"; |
| 286 *actual_size = sizeof(SerializedMessagePipe); | 320 *actual_size = sizeof(SerializedMessagePipe); |
| 287 return true; | 321 return true; |
| 288 } | 322 } |
| 289 | 323 |
| 290 void MessagePipe::OnReadMessage(unsigned port, | 324 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
| 291 scoped_ptr<MessageInTransit> message) { | 325 base::AutoLock locker(lock_); |
| 326 |
| 327 if (!endpoints_[port]) { |
| 328 // This will happen only on the rare occasion that the call to |
| 329 // |OnReadMessage()| is racing with us calling |
| 330 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
| 331 // and the |ChannelEndpoint| can retry (calling the new client's |
| 332 // |OnReadMessage()|). |
| 333 return false; |
| 334 } |
| 335 |
| 292 // This is called when the |ChannelEndpoint| for the | 336 // This is called when the |ChannelEndpoint| for the |
| 293 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). | 337 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). |
| 294 // We need to pass this message on to its peer port (typically a | 338 // We need to pass this message on to its peer port (typically a |
| 295 // |LocalMessagePipeEndpoint|). | 339 // |LocalMessagePipeEndpoint|). |
| 296 MojoResult result = | 340 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port), |
| 297 EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr); | 341 make_scoped_ptr(message), nullptr); |
| 298 DLOG_IF(WARNING, result != MOJO_RESULT_OK) | 342 DLOG_IF(WARNING, result != MOJO_RESULT_OK) |
| 299 << "EnqueueMessage() failed (result = " << result << ")"; | 343 << "EnqueueMessageNoLock() failed (result = " << result << ")"; |
| 344 return true; |
| 300 } | 345 } |
| 301 | 346 |
| 302 void MessagePipe::OnDetachFromChannel(unsigned port) { | 347 void MessagePipe::OnDetachFromChannel(unsigned port) { |
| 303 Close(port); | 348 Close(port); |
| 304 } | 349 } |
| 305 | 350 |
| 306 MessagePipe::MessagePipe() { | 351 MessagePipe::MessagePipe() { |
| 307 } | 352 } |
| 308 | 353 |
| 309 MessagePipe::~MessagePipe() { | 354 MessagePipe::~MessagePipe() { |
| 310 // Owned by the dispatchers. The owning dispatchers should only release us via | 355 // Owned by the dispatchers. The owning dispatchers should only release us via |
| 311 // their |Close()| method, which should inform us of being closed via our | 356 // their |Close()| method, which should inform us of being closed via our |
| 312 // |Close()|. Thus these should already be null. | 357 // |Close()|. Thus these should already be null. |
| 313 DCHECK(!endpoints_[0]); | 358 DCHECK(!endpoints_[0]); |
| 314 DCHECK(!endpoints_[1]); | 359 DCHECK(!endpoints_[1]); |
| 315 } | 360 } |
| 316 | 361 |
| 317 MojoResult MessagePipe::EnqueueMessage( | 362 MojoResult MessagePipe::EnqueueMessageNoLock( |
| 318 unsigned port, | 363 unsigned port, |
| 319 scoped_ptr<MessageInTransit> message, | 364 scoped_ptr<MessageInTransit> message, |
| 320 std::vector<DispatcherTransport>* transports) { | 365 std::vector<DispatcherTransport>* transports) { |
| 321 DCHECK(port == 0 || port == 1); | 366 DCHECK(port == 0 || port == 1); |
| 322 DCHECK(message); | 367 DCHECK(message); |
| 323 | 368 |
| 324 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); | 369 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); |
| 325 | |
| 326 base::AutoLock locker(lock_); | |
| 327 DCHECK(endpoints_[GetPeerPort(port)]); | 370 DCHECK(endpoints_[GetPeerPort(port)]); |
| 328 | 371 |
| 329 // The destination port need not be open, unlike the source port. | 372 // The destination port need not be open, unlike the source port. |
| 330 if (!endpoints_[port]) | 373 if (!endpoints_[port]) |
| 331 return MOJO_RESULT_FAILED_PRECONDITION; | 374 return MOJO_RESULT_FAILED_PRECONDITION; |
| 332 | 375 |
| 333 if (transports) { | 376 if (transports) { |
| 334 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); | 377 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); |
| 335 if (result != MOJO_RESULT_OK) | 378 if (result != MOJO_RESULT_OK) |
| 336 return result; | 379 return result; |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 380 LOG(WARNING) << "Enqueueing null dispatcher"; | 423 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 381 dispatchers->push_back(nullptr); | 424 dispatchers->push_back(nullptr); |
| 382 } | 425 } |
| 383 } | 426 } |
| 384 message->SetDispatchers(dispatchers.Pass()); | 427 message->SetDispatchers(dispatchers.Pass()); |
| 385 return MOJO_RESULT_OK; | 428 return MOJO_RESULT_OK; |
| 386 } | 429 } |
| 387 | 430 |
| 388 } // namespace system | 431 } // namespace system |
| 389 } // namespace mojo | 432 } // namespace mojo |
| OLD | NEW |