| OLD | NEW |
| (Empty) |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/edk/system/message_pipe.h" | |
| 6 | |
| 7 #include "base/logging.h" | |
| 8 #include "mojo/edk/system/channel.h" | |
| 9 #include "mojo/edk/system/channel_endpoint.h" | |
| 10 #include "mojo/edk/system/channel_endpoint_id.h" | |
| 11 #include "mojo/edk/system/incoming_endpoint.h" | |
| 12 #include "mojo/edk/system/local_message_pipe_endpoint.h" | |
| 13 #include "mojo/edk/system/message_in_transit.h" | |
| 14 #include "mojo/edk/system/message_pipe_dispatcher.h" | |
| 15 #include "mojo/edk/system/message_pipe_endpoint.h" | |
| 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | |
| 17 | |
| 18 namespace mojo { | |
| 19 namespace system { | |
| 20 | |
| 21 // static | |
| 22 MessagePipe* MessagePipe::CreateLocalLocal() { | |
| 23 MessagePipe* message_pipe = new MessagePipe(); | |
| 24 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | |
| 25 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | |
| 26 return message_pipe; | |
| 27 } | |
| 28 | |
| 29 // static | |
| 30 MessagePipe* MessagePipe::CreateLocalProxy( | |
| 31 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | |
| 32 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | |
| 33 MessagePipe* message_pipe = new MessagePipe(); | |
| 34 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | |
| 35 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); | |
| 36 message_pipe->endpoints_[1].reset( | |
| 37 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | |
| 38 return message_pipe; | |
| 39 } | |
| 40 | |
| 41 // static | |
| 42 MessagePipe* MessagePipe::CreateLocalProxyFromExisting( | |
| 43 MessageInTransitQueue* message_queue, | |
| 44 ChannelEndpoint* channel_endpoint) { | |
| 45 DCHECK(message_queue); | |
| 46 MessagePipe* message_pipe = new MessagePipe(); | |
| 47 message_pipe->endpoints_[0].reset( | |
| 48 new LocalMessagePipeEndpoint(message_queue)); | |
| 49 if (channel_endpoint) { | |
| 50 bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); | |
| 51 message_pipe->endpoints_[1].reset( | |
| 52 new ProxyMessagePipeEndpoint(channel_endpoint)); | |
| 53 if (!attached_to_channel) | |
| 54 message_pipe->OnDetachFromChannel(1); | |
| 55 } else { | |
| 56 // This means that the proxy side was already closed; we only need to inform | |
| 57 // the local side of this. | |
| 58 // TODO(vtl): This is safe to do without locking (but perhaps slightly | |
| 59 // dubious), since no other thread has access to |message_pipe| yet. | |
| 60 message_pipe->endpoints_[0]->OnPeerClose(); | |
| 61 } | |
| 62 return message_pipe; | |
| 63 } | |
| 64 | |
| 65 // static | |
| 66 MessagePipe* MessagePipe::CreateProxyLocal( | |
| 67 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | |
| 68 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. | |
| 69 MessagePipe* message_pipe = new MessagePipe(); | |
| 70 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); | |
| 71 message_pipe->endpoints_[0].reset( | |
| 72 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | |
| 73 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | |
| 74 return message_pipe; | |
| 75 } | |
| 76 | |
| 77 // static | |
| 78 unsigned MessagePipe::GetPeerPort(unsigned port) { | |
| 79 DCHECK(port == 0 || port == 1); | |
| 80 return port ^ 1; | |
| 81 } | |
| 82 | |
| 83 // static | |
| 84 bool MessagePipe::Deserialize(Channel* channel, | |
| 85 const void* source, | |
| 86 size_t size, | |
| 87 scoped_refptr<MessagePipe>* message_pipe, | |
| 88 unsigned* port) { | |
| 89 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. | |
| 90 | |
| 91 if (size != channel->GetSerializedEndpointSize()) { | |
| 92 LOG(ERROR) << "Invalid serialized message pipe"; | |
| 93 return false; | |
| 94 } | |
| 95 | |
| 96 scoped_refptr<IncomingEndpoint> incoming_endpoint = | |
| 97 channel->DeserializeEndpoint(source); | |
| 98 if (!incoming_endpoint) | |
| 99 return false; | |
| 100 | |
| 101 *message_pipe = incoming_endpoint->ConvertToMessagePipe(); | |
| 102 DCHECK(*message_pipe); | |
| 103 *port = 0; | |
| 104 return true; | |
| 105 } | |
| 106 | |
| 107 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | |
| 108 DCHECK(port == 0 || port == 1); | |
| 109 base::AutoLock locker(lock_); | |
| 110 DCHECK(endpoints_[port]); | |
| 111 | |
| 112 return endpoints_[port]->GetType(); | |
| 113 } | |
| 114 | |
| 115 void MessagePipe::CancelAllAwakables(unsigned port) { | |
| 116 DCHECK(port == 0 || port == 1); | |
| 117 | |
| 118 base::AutoLock locker(lock_); | |
| 119 DCHECK(endpoints_[port]); | |
| 120 endpoints_[port]->CancelAllAwakables(); | |
| 121 } | |
| 122 | |
| 123 void MessagePipe::Close(unsigned port) { | |
| 124 DCHECK(port == 0 || port == 1); | |
| 125 | |
| 126 unsigned peer_port = GetPeerPort(port); | |
| 127 | |
| 128 base::AutoLock locker(lock_); | |
| 129 // The endpoint's |OnPeerClose()| may have been called first and returned | |
| 130 // false, which would have resulted in its destruction. | |
| 131 if (!endpoints_[port]) | |
| 132 return; | |
| 133 | |
| 134 endpoints_[port]->Close(); | |
| 135 if (endpoints_[peer_port]) { | |
| 136 if (!endpoints_[peer_port]->OnPeerClose()) | |
| 137 endpoints_[peer_port].reset(); | |
| 138 } | |
| 139 endpoints_[port].reset(); | |
| 140 } | |
| 141 | |
| 142 // TODO(vtl): Handle flags. | |
| 143 MojoResult MessagePipe::WriteMessage( | |
| 144 unsigned port, | |
| 145 UserPointer<const void> bytes, | |
| 146 uint32_t num_bytes, | |
| 147 std::vector<DispatcherTransport>* transports, | |
| 148 MojoWriteMessageFlags flags) { | |
| 149 DCHECK(port == 0 || port == 1); | |
| 150 | |
| 151 base::AutoLock locker(lock_); | |
| 152 return EnqueueMessageNoLock( | |
| 153 GetPeerPort(port), | |
| 154 make_scoped_ptr(new MessageInTransit( | |
| 155 MessageInTransit::kTypeEndpoint, | |
| 156 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), | |
| 157 transports); | |
| 158 } | |
| 159 | |
| 160 MojoResult MessagePipe::ReadMessage(unsigned port, | |
| 161 UserPointer<void> bytes, | |
| 162 UserPointer<uint32_t> num_bytes, | |
| 163 DispatcherVector* dispatchers, | |
| 164 uint32_t* num_dispatchers, | |
| 165 MojoReadMessageFlags flags) { | |
| 166 DCHECK(port == 0 || port == 1); | |
| 167 | |
| 168 base::AutoLock locker(lock_); | |
| 169 DCHECK(endpoints_[port]); | |
| 170 | |
| 171 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers, | |
| 172 num_dispatchers, flags); | |
| 173 } | |
| 174 | |
| 175 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { | |
| 176 DCHECK(port == 0 || port == 1); | |
| 177 | |
| 178 base::AutoLock locker(const_cast<base::Lock&>(lock_)); | |
| 179 DCHECK(endpoints_[port]); | |
| 180 | |
| 181 return endpoints_[port]->GetHandleSignalsState(); | |
| 182 } | |
| 183 | |
| 184 MojoResult MessagePipe::AddAwakable(unsigned port, | |
| 185 Awakable* awakable, | |
| 186 MojoHandleSignals signals, | |
| 187 uint32_t context, | |
| 188 HandleSignalsState* signals_state) { | |
| 189 DCHECK(port == 0 || port == 1); | |
| 190 | |
| 191 base::AutoLock locker(lock_); | |
| 192 DCHECK(endpoints_[port]); | |
| 193 | |
| 194 return endpoints_[port]->AddAwakable(awakable, signals, context, | |
| 195 signals_state); | |
| 196 } | |
| 197 | |
| 198 void MessagePipe::RemoveAwakable(unsigned port, | |
| 199 Awakable* awakable, | |
| 200 HandleSignalsState* signals_state) { | |
| 201 DCHECK(port == 0 || port == 1); | |
| 202 | |
| 203 base::AutoLock locker(lock_); | |
| 204 DCHECK(endpoints_[port]); | |
| 205 | |
| 206 endpoints_[port]->RemoveAwakable(awakable, signals_state); | |
| 207 } | |
| 208 | |
| 209 void MessagePipe::StartSerialize(unsigned /*port*/, | |
| 210 Channel* channel, | |
| 211 size_t* max_size, | |
| 212 size_t* max_platform_handles) { | |
| 213 *max_size = channel->GetSerializedEndpointSize(); | |
| 214 *max_platform_handles = 0; | |
| 215 } | |
| 216 | |
| 217 bool MessagePipe::EndSerialize( | |
| 218 unsigned port, | |
| 219 Channel* channel, | |
| 220 void* destination, | |
| 221 size_t* actual_size, | |
| 222 embedder::PlatformHandleVector* /*platform_handles*/) { | |
| 223 DCHECK(port == 0 || port == 1); | |
| 224 | |
| 225 base::AutoLock locker(lock_); | |
| 226 DCHECK(endpoints_[port]); | |
| 227 | |
| 228 // The port being serialized must be local. | |
| 229 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); | |
| 230 | |
| 231 unsigned peer_port = GetPeerPort(port); | |
| 232 MessageInTransitQueue* message_queue = | |
| 233 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()) | |
| 234 ->message_queue(); | |
| 235 // The replacement for |endpoints_[port]|, if any. | |
| 236 MessagePipeEndpoint* replacement_endpoint = nullptr; | |
| 237 | |
| 238 // The three cases below correspond to the ones described above | |
| 239 // |Channel::SerializeEndpoint...()| (in channel.h). | |
| 240 if (!endpoints_[peer_port]) { | |
| 241 // Case 1: (known-)closed peer port. There's no reason for us to continue to | |
| 242 // exist afterwards. | |
| 243 channel->SerializeEndpointWithClosedPeer(destination, message_queue); | |
| 244 } else if (endpoints_[peer_port]->GetType() == | |
| 245 MessagePipeEndpoint::kTypeLocal) { | |
| 246 // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| | |
| 247 // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that | |
| 248 // the |Channel| returns to us. | |
| 249 scoped_refptr<ChannelEndpoint> channel_endpoint = | |
| 250 channel->SerializeEndpointWithLocalPeer(destination, message_queue, | |
| 251 this, port); | |
| 252 replacement_endpoint = new ProxyMessagePipeEndpoint(channel_endpoint.get()); | |
| 253 } else { | |
| 254 // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and | |
| 255 // pass it to the |Channel|. There's no reason for us to continue to exist | |
| 256 // afterwards. | |
| 257 DCHECK_EQ(endpoints_[peer_port]->GetType(), | |
| 258 MessagePipeEndpoint::kTypeProxy); | |
| 259 ProxyMessagePipeEndpoint* peer_endpoint = | |
| 260 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); | |
| 261 scoped_refptr<ChannelEndpoint> peer_channel_endpoint = | |
| 262 peer_endpoint->ReleaseChannelEndpoint(); | |
| 263 channel->SerializeEndpointWithRemotePeer(destination, message_queue, | |
| 264 peer_channel_endpoint); | |
| 265 // No need to call |Close()| after |ReleaseChannelEndpoint()|. | |
| 266 endpoints_[peer_port].reset(); | |
| 267 } | |
| 268 | |
| 269 endpoints_[port]->Close(); | |
| 270 endpoints_[port].reset(replacement_endpoint); | |
| 271 | |
| 272 *actual_size = channel->GetSerializedEndpointSize(); | |
| 273 return true; | |
| 274 } | |
| 275 | |
| 276 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { | |
| 277 base::AutoLock locker(lock_); | |
| 278 | |
| 279 if (!endpoints_[port]) { | |
| 280 // This will happen only on the rare occasion that the call to | |
| 281 // |OnReadMessage()| is racing with us calling | |
| 282 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, | |
| 283 // and the |ChannelEndpoint| can retry (calling the new client's | |
| 284 // |OnReadMessage()|). | |
| 285 return false; | |
| 286 } | |
| 287 | |
| 288 // This is called when the |ChannelEndpoint| for the | |
| 289 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). | |
| 290 // We need to pass this message on to its peer port (typically a | |
| 291 // |LocalMessagePipeEndpoint|). | |
| 292 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port), | |
| 293 make_scoped_ptr(message), nullptr); | |
| 294 DLOG_IF(WARNING, result != MOJO_RESULT_OK) | |
| 295 << "EnqueueMessageNoLock() failed (result = " << result << ")"; | |
| 296 return true; | |
| 297 } | |
| 298 | |
| 299 void MessagePipe::OnDetachFromChannel(unsigned port) { | |
| 300 Close(port); | |
| 301 } | |
| 302 | |
| 303 MessagePipe::MessagePipe() { | |
| 304 } | |
| 305 | |
| 306 MessagePipe::~MessagePipe() { | |
| 307 // Owned by the dispatchers. The owning dispatchers should only release us via | |
| 308 // their |Close()| method, which should inform us of being closed via our | |
| 309 // |Close()|. Thus these should already be null. | |
| 310 DCHECK(!endpoints_[0]); | |
| 311 DCHECK(!endpoints_[1]); | |
| 312 } | |
| 313 | |
| 314 MojoResult MessagePipe::EnqueueMessageNoLock( | |
| 315 unsigned port, | |
| 316 scoped_ptr<MessageInTransit> message, | |
| 317 std::vector<DispatcherTransport>* transports) { | |
| 318 DCHECK(port == 0 || port == 1); | |
| 319 DCHECK(message); | |
| 320 | |
| 321 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); | |
| 322 DCHECK(endpoints_[GetPeerPort(port)]); | |
| 323 | |
| 324 // The destination port need not be open, unlike the source port. | |
| 325 if (!endpoints_[port]) | |
| 326 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 327 | |
| 328 if (transports) { | |
| 329 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); | |
| 330 if (result != MOJO_RESULT_OK) | |
| 331 return result; | |
| 332 } | |
| 333 | |
| 334 // The endpoint's |EnqueueMessage()| may not report failure. | |
| 335 endpoints_[port]->EnqueueMessage(message.Pass()); | |
| 336 return MOJO_RESULT_OK; | |
| 337 } | |
| 338 | |
| 339 MojoResult MessagePipe::AttachTransportsNoLock( | |
| 340 unsigned port, | |
| 341 MessageInTransit* message, | |
| 342 std::vector<DispatcherTransport>* transports) { | |
| 343 DCHECK(!message->has_dispatchers()); | |
| 344 | |
| 345 // You're not allowed to send either handle to a message pipe over the message | |
| 346 // pipe, so check for this. (The case of trying to write a handle to itself is | |
| 347 // taken care of by |Core|. That case kind of makes sense, but leads to | |
| 348 // complications if, e.g., both sides try to do the same thing with their | |
| 349 // respective handles simultaneously. The other case, of trying to write the | |
| 350 // peer handle to a handle, doesn't make sense -- since no handle will be | |
| 351 // available to read the message from.) | |
| 352 for (size_t i = 0; i < transports->size(); i++) { | |
| 353 if (!(*transports)[i].is_valid()) | |
| 354 continue; | |
| 355 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) { | |
| 356 MessagePipeDispatcherTransport mp_transport((*transports)[i]); | |
| 357 if (mp_transport.GetMessagePipe() == this) { | |
| 358 // The other case should have been disallowed by |Core|. (Note: |port| | |
| 359 // is the peer port of the handle given to |WriteMessage()|.) | |
| 360 DCHECK_EQ(mp_transport.GetPort(), port); | |
| 361 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 362 } | |
| 363 } | |
| 364 } | |
| 365 | |
| 366 // Clone the dispatchers and attach them to the message. (This must be done as | |
| 367 // a separate loop, since we want to leave the dispatchers alone on failure.) | |
| 368 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | |
| 369 dispatchers->reserve(transports->size()); | |
| 370 for (size_t i = 0; i < transports->size(); i++) { | |
| 371 if ((*transports)[i].is_valid()) { | |
| 372 dispatchers->push_back( | |
| 373 (*transports)[i].CreateEquivalentDispatcherAndClose()); | |
| 374 } else { | |
| 375 LOG(WARNING) << "Enqueueing null dispatcher"; | |
| 376 dispatchers->push_back(nullptr); | |
| 377 } | |
| 378 } | |
| 379 message->SetDispatchers(dispatchers.Pass()); | |
| 380 return MOJO_RESULT_OK; | |
| 381 } | |
| 382 | |
| 383 } // namespace system | |
| 384 } // namespace mojo | |
| OLD | NEW |