| OLD | NEW |
| (Empty) |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/edk/system/message_pipe.h" | |
| 6 | |
| 7 #include "base/logging.h" | |
| 8 #include "mojo/edk/system/channel_endpoint.h" | |
| 9 #include "mojo/edk/system/local_message_pipe_endpoint.h" | |
| 10 #include "mojo/edk/system/message_in_transit.h" | |
| 11 #include "mojo/edk/system/message_pipe_dispatcher.h" | |
| 12 #include "mojo/edk/system/message_pipe_endpoint.h" | |
| 13 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | |
| 14 | |
| 15 namespace mojo { | |
| 16 namespace system { | |
| 17 | |
| 18 // static | |
| 19 MessagePipe* MessagePipe::CreateLocalLocal() { | |
| 20 MessagePipe* message_pipe = new MessagePipe(); | |
| 21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | |
| 22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | |
| 23 return message_pipe; | |
| 24 } | |
| 25 | |
| 26 // static | |
| 27 MessagePipe* MessagePipe::CreateLocalProxy( | |
| 28 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | |
| 29 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. | |
| 30 MessagePipe* message_pipe = new MessagePipe(); | |
| 31 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | |
| 32 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); | |
| 33 message_pipe->endpoints_[1].reset( | |
| 34 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | |
| 35 return message_pipe; | |
| 36 } | |
| 37 | |
| 38 // static | |
| 39 MessagePipe* MessagePipe::CreateProxyLocal( | |
| 40 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | |
| 41 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. | |
| 42 MessagePipe* message_pipe = new MessagePipe(); | |
| 43 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); | |
| 44 message_pipe->endpoints_[0].reset( | |
| 45 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | |
| 46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | |
| 47 return message_pipe; | |
| 48 } | |
| 49 | |
| 50 // static | |
| 51 unsigned MessagePipe::GetPeerPort(unsigned port) { | |
| 52 DCHECK(port == 0 || port == 1); | |
| 53 return port ^ 1; | |
| 54 } | |
| 55 | |
| 56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | |
| 57 DCHECK(port == 0 || port == 1); | |
| 58 base::AutoLock locker(lock_); | |
| 59 DCHECK(endpoints_[port]); | |
| 60 | |
| 61 return endpoints_[port]->GetType(); | |
| 62 } | |
| 63 | |
| 64 void MessagePipe::CancelAllWaiters(unsigned port) { | |
| 65 DCHECK(port == 0 || port == 1); | |
| 66 | |
| 67 base::AutoLock locker(lock_); | |
| 68 DCHECK(endpoints_[port]); | |
| 69 endpoints_[port]->CancelAllWaiters(); | |
| 70 } | |
| 71 | |
| 72 void MessagePipe::Close(unsigned port) { | |
| 73 DCHECK(port == 0 || port == 1); | |
| 74 | |
| 75 unsigned destination_port = GetPeerPort(port); | |
| 76 | |
| 77 base::AutoLock locker(lock_); | |
| 78 // The endpoint's |OnPeerClose()| may have been called first and returned | |
| 79 // false, which would have resulted in its destruction. | |
| 80 if (!endpoints_[port]) | |
| 81 return; | |
| 82 | |
| 83 endpoints_[port]->Close(); | |
| 84 if (endpoints_[destination_port]) { | |
| 85 if (!endpoints_[destination_port]->OnPeerClose()) | |
| 86 endpoints_[destination_port].reset(); | |
| 87 } | |
| 88 endpoints_[port].reset(); | |
| 89 } | |
| 90 | |
| 91 // TODO(vtl): Handle flags. | |
| 92 MojoResult MessagePipe::WriteMessage( | |
| 93 unsigned port, | |
| 94 UserPointer<const void> bytes, | |
| 95 uint32_t num_bytes, | |
| 96 std::vector<DispatcherTransport>* transports, | |
| 97 MojoWriteMessageFlags flags) { | |
| 98 DCHECK(port == 0 || port == 1); | |
| 99 return EnqueueMessageInternal( | |
| 100 GetPeerPort(port), | |
| 101 make_scoped_ptr(new MessageInTransit( | |
| 102 MessageInTransit::kTypeMessagePipeEndpoint, | |
| 103 MessageInTransit::kSubtypeMessagePipeEndpointData, | |
| 104 num_bytes, | |
| 105 bytes)), | |
| 106 transports); | |
| 107 } | |
| 108 | |
| 109 MojoResult MessagePipe::ReadMessage(unsigned port, | |
| 110 UserPointer<void> bytes, | |
| 111 UserPointer<uint32_t> num_bytes, | |
| 112 DispatcherVector* dispatchers, | |
| 113 uint32_t* num_dispatchers, | |
| 114 MojoReadMessageFlags flags) { | |
| 115 DCHECK(port == 0 || port == 1); | |
| 116 | |
| 117 base::AutoLock locker(lock_); | |
| 118 DCHECK(endpoints_[port]); | |
| 119 | |
| 120 return endpoints_[port]->ReadMessage( | |
| 121 bytes, num_bytes, dispatchers, num_dispatchers, flags); | |
| 122 } | |
| 123 | |
| 124 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { | |
| 125 DCHECK(port == 0 || port == 1); | |
| 126 | |
| 127 base::AutoLock locker(const_cast<base::Lock&>(lock_)); | |
| 128 DCHECK(endpoints_[port]); | |
| 129 | |
| 130 return endpoints_[port]->GetHandleSignalsState(); | |
| 131 } | |
| 132 | |
| 133 MojoResult MessagePipe::AddWaiter(unsigned port, | |
| 134 Waiter* waiter, | |
| 135 MojoHandleSignals signals, | |
| 136 uint32_t context, | |
| 137 HandleSignalsState* signals_state) { | |
| 138 DCHECK(port == 0 || port == 1); | |
| 139 | |
| 140 base::AutoLock locker(lock_); | |
| 141 DCHECK(endpoints_[port]); | |
| 142 | |
| 143 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state); | |
| 144 } | |
| 145 | |
| 146 void MessagePipe::RemoveWaiter(unsigned port, | |
| 147 Waiter* waiter, | |
| 148 HandleSignalsState* signals_state) { | |
| 149 DCHECK(port == 0 || port == 1); | |
| 150 | |
| 151 base::AutoLock locker(lock_); | |
| 152 DCHECK(endpoints_[port]); | |
| 153 | |
| 154 endpoints_[port]->RemoveWaiter(waiter, signals_state); | |
| 155 } | |
| 156 | |
| 157 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { | |
| 158 DCHECK(port == 0 || port == 1); | |
| 159 | |
| 160 base::AutoLock locker(lock_); | |
| 161 DCHECK(endpoints_[port]); | |
| 162 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); | |
| 163 | |
| 164 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a | |
| 165 // |MessagePipe| with two proxy endpoints, which will then act as a proxy | |
| 166 // (rather than trying to connect the two ends directly). | |
| 167 DLOG_IF(WARNING, | |
| 168 !!endpoints_[GetPeerPort(port)] && | |
| 169 endpoints_[GetPeerPort(port)]->GetType() != | |
| 170 MessagePipeEndpoint::kTypeLocal) | |
| 171 << "Direct message pipe passing across multiple channels not yet " | |
| 172 "implemented; will proxy"; | |
| 173 | |
| 174 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); | |
| 175 scoped_refptr<ChannelEndpoint> channel_endpoint( | |
| 176 new ChannelEndpoint(this, port)); | |
| 177 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); | |
| 178 channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>( | |
| 179 old_endpoint.get())->message_queue()); | |
| 180 old_endpoint->Close(); | |
| 181 | |
| 182 return channel_endpoint; | |
| 183 } | |
| 184 | |
| 185 MojoResult MessagePipe::EnqueueMessage(unsigned port, | |
| 186 scoped_ptr<MessageInTransit> message) { | |
| 187 return EnqueueMessageInternal(port, message.Pass(), nullptr); | |
| 188 } | |
| 189 | |
| 190 MessagePipe::MessagePipe() { | |
| 191 } | |
| 192 | |
| 193 MessagePipe::~MessagePipe() { | |
| 194 // Owned by the dispatchers. The owning dispatchers should only release us via | |
| 195 // their |Close()| method, which should inform us of being closed via our | |
| 196 // |Close()|. Thus these should already be null. | |
| 197 DCHECK(!endpoints_[0]); | |
| 198 DCHECK(!endpoints_[1]); | |
| 199 } | |
| 200 | |
| 201 MojoResult MessagePipe::EnqueueMessageInternal( | |
| 202 unsigned port, | |
| 203 scoped_ptr<MessageInTransit> message, | |
| 204 std::vector<DispatcherTransport>* transports) { | |
| 205 DCHECK(port == 0 || port == 1); | |
| 206 DCHECK(message); | |
| 207 | |
| 208 if (message->type() == MessageInTransit::kTypeMessagePipe) { | |
| 209 DCHECK(!transports); | |
| 210 return HandleControlMessage(port, message.Pass()); | |
| 211 } | |
| 212 | |
| 213 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint); | |
| 214 | |
| 215 base::AutoLock locker(lock_); | |
| 216 DCHECK(endpoints_[GetPeerPort(port)]); | |
| 217 | |
| 218 // The destination port need not be open, unlike the source port. | |
| 219 if (!endpoints_[port]) | |
| 220 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 221 | |
| 222 if (transports) { | |
| 223 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); | |
| 224 if (result != MOJO_RESULT_OK) | |
| 225 return result; | |
| 226 } | |
| 227 | |
| 228 // The endpoint's |EnqueueMessage()| may not report failure. | |
| 229 endpoints_[port]->EnqueueMessage(message.Pass()); | |
| 230 return MOJO_RESULT_OK; | |
| 231 } | |
| 232 | |
| 233 MojoResult MessagePipe::AttachTransportsNoLock( | |
| 234 unsigned port, | |
| 235 MessageInTransit* message, | |
| 236 std::vector<DispatcherTransport>* transports) { | |
| 237 DCHECK(!message->has_dispatchers()); | |
| 238 | |
| 239 // You're not allowed to send either handle to a message pipe over the message | |
| 240 // pipe, so check for this. (The case of trying to write a handle to itself is | |
| 241 // taken care of by |Core|. That case kind of makes sense, but leads to | |
| 242 // complications if, e.g., both sides try to do the same thing with their | |
| 243 // respective handles simultaneously. The other case, of trying to write the | |
| 244 // peer handle to a handle, doesn't make sense -- since no handle will be | |
| 245 // available to read the message from.) | |
| 246 for (size_t i = 0; i < transports->size(); i++) { | |
| 247 if (!(*transports)[i].is_valid()) | |
| 248 continue; | |
| 249 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) { | |
| 250 MessagePipeDispatcherTransport mp_transport((*transports)[i]); | |
| 251 if (mp_transport.GetMessagePipe() == this) { | |
| 252 // The other case should have been disallowed by |Core|. (Note: |port| | |
| 253 // is the peer port of the handle given to |WriteMessage()|.) | |
| 254 DCHECK_EQ(mp_transport.GetPort(), port); | |
| 255 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 256 } | |
| 257 } | |
| 258 } | |
| 259 | |
| 260 // Clone the dispatchers and attach them to the message. (This must be done as | |
| 261 // a separate loop, since we want to leave the dispatchers alone on failure.) | |
| 262 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | |
| 263 dispatchers->reserve(transports->size()); | |
| 264 for (size_t i = 0; i < transports->size(); i++) { | |
| 265 if ((*transports)[i].is_valid()) { | |
| 266 dispatchers->push_back( | |
| 267 (*transports)[i].CreateEquivalentDispatcherAndClose()); | |
| 268 } else { | |
| 269 LOG(WARNING) << "Enqueueing null dispatcher"; | |
| 270 dispatchers->push_back(scoped_refptr<Dispatcher>()); | |
| 271 } | |
| 272 } | |
| 273 message->SetDispatchers(dispatchers.Pass()); | |
| 274 return MOJO_RESULT_OK; | |
| 275 } | |
| 276 | |
| 277 MojoResult MessagePipe::HandleControlMessage( | |
| 278 unsigned /*port*/, | |
| 279 scoped_ptr<MessageInTransit> message) { | |
| 280 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " | |
| 281 << message->subtype(); | |
| 282 return MOJO_RESULT_UNKNOWN; | |
| 283 } | |
| 284 | |
| 285 } // namespace system | |
| 286 } // namespace mojo | |
| OLD | NEW |