| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "mojo/edk/system/channel.h" |
| 8 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
| 10 #include "mojo/edk/system/channel_endpoint_id.h" |
| 9 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 11 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
| 10 #include "mojo/edk/system/message_in_transit.h" | 12 #include "mojo/edk/system/message_in_transit.h" |
| 11 #include "mojo/edk/system/message_pipe_dispatcher.h" | 13 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 12 #include "mojo/edk/system/message_pipe_endpoint.h" | 14 #include "mojo/edk/system/message_pipe_endpoint.h" |
| 13 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 15 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
| 14 | 16 |
| 15 namespace mojo { | 17 namespace mojo { |
| 16 namespace system { | 18 namespace system { |
| 17 | 19 |
| 20 namespace { |
| 21 |
| 22 // TODO(vtl): Move this into |Channel| (and possible further). |
| 23 struct SerializedMessagePipe { |
| 24 // This is the endpoint ID on the receiving side, and should be a "remote ID". |
| 25 // (The receiving side should already have had an endpoint attached and been |
| 26 // run via the |Channel|s. This endpoint will have both IDs assigned, so this |
| 27 // ID is only needed to associate that endpoint with a particular dispatcher.) |
| 28 ChannelEndpointId receiver_endpoint_id; |
| 29 }; |
| 30 |
| 31 } // namespace |
| 32 |
| 18 // static | 33 // static |
| 19 MessagePipe* MessagePipe::CreateLocalLocal() { | 34 MessagePipe* MessagePipe::CreateLocalLocal() { |
| 20 MessagePipe* message_pipe = new MessagePipe(); | 35 MessagePipe* message_pipe = new MessagePipe(); |
| 21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 36 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
| 22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 37 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 23 return message_pipe; | 38 return message_pipe; |
| 24 } | 39 } |
| 25 | 40 |
| 26 // static | 41 // static |
| 27 MessagePipe* MessagePipe::CreateLocalProxy( | 42 MessagePipe* MessagePipe::CreateLocalProxy( |
| (...skipping 18 matching lines...) Expand all Loading... |
| 46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 61 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
| 47 return message_pipe; | 62 return message_pipe; |
| 48 } | 63 } |
| 49 | 64 |
| 50 // static | 65 // static |
| 51 unsigned MessagePipe::GetPeerPort(unsigned port) { | 66 unsigned MessagePipe::GetPeerPort(unsigned port) { |
| 52 DCHECK(port == 0 || port == 1); | 67 DCHECK(port == 0 || port == 1); |
| 53 return port ^ 1; | 68 return port ^ 1; |
| 54 } | 69 } |
| 55 | 70 |
| 71 // static |
| 72 bool MessagePipe::Deserialize(Channel* channel, |
| 73 const void* source, |
| 74 size_t size, |
| 75 scoped_refptr<MessagePipe>* message_pipe, |
| 76 unsigned* port) { |
| 77 DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely. |
| 78 |
| 79 if (size != sizeof(SerializedMessagePipe)) { |
| 80 LOG(ERROR) << "Invalid serialized message pipe"; |
| 81 return false; |
| 82 } |
| 83 |
| 84 const SerializedMessagePipe* s = |
| 85 static_cast<const SerializedMessagePipe*>(source); |
| 86 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); |
| 87 if (!message_pipe->get()) { |
| 88 LOG(ERROR) << "Failed to deserialize message pipe (ID = " |
| 89 << s->receiver_endpoint_id << ")"; |
| 90 return false; |
| 91 } |
| 92 |
| 93 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " |
| 94 << s->receiver_endpoint_id << ")"; |
| 95 *port = 0; |
| 96 return true; |
| 97 } |
| 98 |
| 56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | 99 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
| 57 DCHECK(port == 0 || port == 1); | 100 DCHECK(port == 0 || port == 1); |
| 58 base::AutoLock locker(lock_); | 101 base::AutoLock locker(lock_); |
| 59 DCHECK(endpoints_[port]); | 102 DCHECK(endpoints_[port]); |
| 60 | 103 |
| 61 return endpoints_[port]->GetType(); | 104 return endpoints_[port]->GetType(); |
| 62 } | 105 } |
| 63 | 106 |
| 64 void MessagePipe::CancelAllWaiters(unsigned port) { | 107 void MessagePipe::CancelAllWaiters(unsigned port) { |
| 65 DCHECK(port == 0 || port == 1); | 108 DCHECK(port == 0 || port == 1); |
| (...skipping 27 matching lines...) Expand all Loading... |
| 93 unsigned port, | 136 unsigned port, |
| 94 UserPointer<const void> bytes, | 137 UserPointer<const void> bytes, |
| 95 uint32_t num_bytes, | 138 uint32_t num_bytes, |
| 96 std::vector<DispatcherTransport>* transports, | 139 std::vector<DispatcherTransport>* transports, |
| 97 MojoWriteMessageFlags flags) { | 140 MojoWriteMessageFlags flags) { |
| 98 DCHECK(port == 0 || port == 1); | 141 DCHECK(port == 0 || port == 1); |
| 99 return EnqueueMessageInternal( | 142 return EnqueueMessageInternal( |
| 100 GetPeerPort(port), | 143 GetPeerPort(port), |
| 101 make_scoped_ptr(new MessageInTransit( | 144 make_scoped_ptr(new MessageInTransit( |
| 102 MessageInTransit::kTypeMessagePipeEndpoint, | 145 MessageInTransit::kTypeMessagePipeEndpoint, |
| 103 MessageInTransit::kSubtypeMessagePipeEndpointData, | 146 MessageInTransit::kSubtypeMessagePipeEndpointData, num_bytes, bytes)), |
| 104 num_bytes, | |
| 105 bytes)), | |
| 106 transports); | 147 transports); |
| 107 } | 148 } |
| 108 | 149 |
| 109 MojoResult MessagePipe::ReadMessage(unsigned port, | 150 MojoResult MessagePipe::ReadMessage(unsigned port, |
| 110 UserPointer<void> bytes, | 151 UserPointer<void> bytes, |
| 111 UserPointer<uint32_t> num_bytes, | 152 UserPointer<uint32_t> num_bytes, |
| 112 DispatcherVector* dispatchers, | 153 DispatcherVector* dispatchers, |
| 113 uint32_t* num_dispatchers, | 154 uint32_t* num_dispatchers, |
| 114 MojoReadMessageFlags flags) { | 155 MojoReadMessageFlags flags) { |
| 115 DCHECK(port == 0 || port == 1); | 156 DCHECK(port == 0 || port == 1); |
| 116 | 157 |
| 117 base::AutoLock locker(lock_); | 158 base::AutoLock locker(lock_); |
| 118 DCHECK(endpoints_[port]); | 159 DCHECK(endpoints_[port]); |
| 119 | 160 |
| 120 return endpoints_[port]->ReadMessage( | 161 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers, |
| 121 bytes, num_bytes, dispatchers, num_dispatchers, flags); | 162 num_dispatchers, flags); |
| 122 } | 163 } |
| 123 | 164 |
| 124 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { | 165 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { |
| 125 DCHECK(port == 0 || port == 1); | 166 DCHECK(port == 0 || port == 1); |
| 126 | 167 |
| 127 base::AutoLock locker(const_cast<base::Lock&>(lock_)); | 168 base::AutoLock locker(const_cast<base::Lock&>(lock_)); |
| 128 DCHECK(endpoints_[port]); | 169 DCHECK(endpoints_[port]); |
| 129 | 170 |
| 130 return endpoints_[port]->GetHandleSignalsState(); | 171 return endpoints_[port]->GetHandleSignalsState(); |
| 131 } | 172 } |
| (...skipping 15 matching lines...) Expand all Loading... |
| 147 Waiter* waiter, | 188 Waiter* waiter, |
| 148 HandleSignalsState* signals_state) { | 189 HandleSignalsState* signals_state) { |
| 149 DCHECK(port == 0 || port == 1); | 190 DCHECK(port == 0 || port == 1); |
| 150 | 191 |
| 151 base::AutoLock locker(lock_); | 192 base::AutoLock locker(lock_); |
| 152 DCHECK(endpoints_[port]); | 193 DCHECK(endpoints_[port]); |
| 153 | 194 |
| 154 endpoints_[port]->RemoveWaiter(waiter, signals_state); | 195 endpoints_[port]->RemoveWaiter(waiter, signals_state); |
| 155 } | 196 } |
| 156 | 197 |
| 198 void MessagePipe::StartSerialize(unsigned /*port*/, |
| 199 Channel* /*channel*/, |
| 200 size_t* max_size, |
| 201 size_t* max_platform_handles) { |
| 202 *max_size = sizeof(SerializedMessagePipe); |
| 203 *max_platform_handles = 0; |
| 204 } |
| 205 |
| 206 bool MessagePipe::EndSerialize( |
| 207 unsigned port, |
| 208 Channel* channel, |
| 209 void* destination, |
| 210 size_t* actual_size, |
| 211 embedder::PlatformHandleVector* /*platform_handles*/) { |
| 212 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); |
| 213 |
| 214 // Convert the local endpoint to a proxy endpoint (moving the message queue) |
| 215 // and attach it to the channel. |
| 216 s->receiver_endpoint_id = |
| 217 channel->AttachAndRunEndpoint(ConvertLocalToProxy(port), false); |
| 218 DVLOG(2) << "Serializing message pipe (remote ID = " |
| 219 << s->receiver_endpoint_id << ")"; |
| 220 *actual_size = sizeof(SerializedMessagePipe); |
| 221 return true; |
| 222 } |
| 223 |
| 157 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { | 224 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { |
| 158 DCHECK(port == 0 || port == 1); | 225 DCHECK(port == 0 || port == 1); |
| 159 | 226 |
| 160 base::AutoLock locker(lock_); | 227 base::AutoLock locker(lock_); |
| 161 DCHECK(endpoints_[port]); | 228 DCHECK(endpoints_[port]); |
| 162 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); | 229 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); |
| 163 | 230 |
| 164 // The local peer is already closed, so just make a |ChannelEndpoint| that'll | 231 // The local peer is already closed, so just make a |ChannelEndpoint| that'll |
| 165 // send the already-queued messages. | 232 // send the already-queued messages. |
| 166 if (!endpoints_[GetPeerPort(port)]) { | 233 if (!endpoints_[GetPeerPort(port)]) { |
| 167 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( | 234 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( |
| 168 nullptr, | 235 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
| 169 0, | 236 endpoints_[port].get())->message_queue())); |
| 170 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()) | |
| 171 ->message_queue())); | |
| 172 endpoints_[port]->Close(); | 237 endpoints_[port]->Close(); |
| 173 endpoints_[port].reset(); | 238 endpoints_[port].reset(); |
| 174 return channel_endpoint; | 239 return channel_endpoint; |
| 175 } | 240 } |
| 176 | 241 |
| 177 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a | 242 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a |
| 178 // |MessagePipe| with two proxy endpoints, which will then act as a proxy | 243 // |MessagePipe| with two proxy endpoints, which will then act as a proxy |
| 179 // (rather than trying to connect the two ends directly). | 244 // (rather than trying to connect the two ends directly). |
| 180 DLOG_IF(WARNING, | 245 DLOG_IF(WARNING, endpoints_[GetPeerPort(port)]->GetType() != |
| 181 endpoints_[GetPeerPort(port)]->GetType() != | 246 MessagePipeEndpoint::kTypeLocal) |
| 182 MessagePipeEndpoint::kTypeLocal) | |
| 183 << "Direct message pipe passing across multiple channels not yet " | 247 << "Direct message pipe passing across multiple channels not yet " |
| 184 "implemented; will proxy"; | 248 "implemented; will proxy"; |
| 185 | 249 |
| 186 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); | 250 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); |
| 187 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( | 251 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( |
| 188 this, | 252 this, port, static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get()) |
| 189 port, | 253 ->message_queue())); |
| 190 static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get()) | |
| 191 ->message_queue())); | |
| 192 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); | 254 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); |
| 193 old_endpoint->Close(); | 255 old_endpoint->Close(); |
| 194 | 256 |
| 195 return channel_endpoint; | 257 return channel_endpoint; |
| 196 } | 258 } |
| 197 | 259 |
| 198 MojoResult MessagePipe::EnqueueMessage(unsigned port, | 260 MojoResult MessagePipe::EnqueueMessage(unsigned port, |
| 199 scoped_ptr<MessageInTransit> message) { | 261 scoped_ptr<MessageInTransit> message) { |
| 200 return EnqueueMessageInternal(port, message.Pass(), nullptr); | 262 return EnqueueMessageInternal(port, message.Pass(), nullptr); |
| 201 } | 263 } |
| (...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 273 // Clone the dispatchers and attach them to the message. (This must be done as | 335 // Clone the dispatchers and attach them to the message. (This must be done as |
| 274 // a separate loop, since we want to leave the dispatchers alone on failure.) | 336 // a separate loop, since we want to leave the dispatchers alone on failure.) |
| 275 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | 337 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
| 276 dispatchers->reserve(transports->size()); | 338 dispatchers->reserve(transports->size()); |
| 277 for (size_t i = 0; i < transports->size(); i++) { | 339 for (size_t i = 0; i < transports->size(); i++) { |
| 278 if ((*transports)[i].is_valid()) { | 340 if ((*transports)[i].is_valid()) { |
| 279 dispatchers->push_back( | 341 dispatchers->push_back( |
| 280 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 342 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
| 281 } else { | 343 } else { |
| 282 LOG(WARNING) << "Enqueueing null dispatcher"; | 344 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 283 dispatchers->push_back(scoped_refptr<Dispatcher>()); | 345 dispatchers->push_back(nullptr); |
| 284 } | 346 } |
| 285 } | 347 } |
| 286 message->SetDispatchers(dispatchers.Pass()); | 348 message->SetDispatchers(dispatchers.Pass()); |
| 287 return MOJO_RESULT_OK; | 349 return MOJO_RESULT_OK; |
| 288 } | 350 } |
| 289 | 351 |
| 290 MojoResult MessagePipe::HandleControlMessage( | 352 MojoResult MessagePipe::HandleControlMessage( |
| 291 unsigned /*port*/, | 353 unsigned /*port*/, |
| 292 scoped_ptr<MessageInTransit> message) { | 354 scoped_ptr<MessageInTransit> message) { |
| 293 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " | 355 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " |
| 294 << message->subtype(); | 356 << message->subtype(); |
| 295 return MOJO_RESULT_UNKNOWN; | 357 return MOJO_RESULT_UNKNOWN; |
| 296 } | 358 } |
| 297 | 359 |
| 298 } // namespace system | 360 } // namespace system |
| 299 } // namespace mojo | 361 } // namespace mojo |
| OLD | NEW |