OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "mojo/edk/system/channel.h" |
8 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
| 10 #include "mojo/edk/system/channel_endpoint_id.h" |
9 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 11 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
10 #include "mojo/edk/system/message_in_transit.h" | 12 #include "mojo/edk/system/message_in_transit.h" |
11 #include "mojo/edk/system/message_pipe_dispatcher.h" | 13 #include "mojo/edk/system/message_pipe_dispatcher.h" |
12 #include "mojo/edk/system/message_pipe_endpoint.h" | 14 #include "mojo/edk/system/message_pipe_endpoint.h" |
13 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 15 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
14 | 16 |
15 namespace mojo { | 17 namespace mojo { |
16 namespace system { | 18 namespace system { |
17 | 19 |
| 20 namespace { |
| 21 |
| 22 // TODO(vtl): Move this into |Channel| (and possible further). |
| 23 struct SerializedMessagePipe { |
| 24 // This is the endpoint ID on the receiving side, and should be a "remote ID". |
| 25 // (The receiving side should already have had an endpoint attached and been |
| 26 // run via the |Channel|s. This endpoint will have both IDs assigned, so this |
| 27 // ID is only needed to associate that endpoint with a particular dispatcher.) |
| 28 ChannelEndpointId receiver_endpoint_id; |
| 29 }; |
| 30 |
| 31 } // namespace |
| 32 |
18 // static | 33 // static |
19 MessagePipe* MessagePipe::CreateLocalLocal() { | 34 MessagePipe* MessagePipe::CreateLocalLocal() { |
20 MessagePipe* message_pipe = new MessagePipe(); | 35 MessagePipe* message_pipe = new MessagePipe(); |
21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 36 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 37 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
23 return message_pipe; | 38 return message_pipe; |
24 } | 39 } |
25 | 40 |
26 // static | 41 // static |
27 MessagePipe* MessagePipe::CreateLocalProxy( | 42 MessagePipe* MessagePipe::CreateLocalProxy( |
(...skipping 18 matching lines...) Expand all Loading... |
46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 61 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
47 return message_pipe; | 62 return message_pipe; |
48 } | 63 } |
49 | 64 |
50 // static | 65 // static |
51 unsigned MessagePipe::GetPeerPort(unsigned port) { | 66 unsigned MessagePipe::GetPeerPort(unsigned port) { |
52 DCHECK(port == 0 || port == 1); | 67 DCHECK(port == 0 || port == 1); |
53 return port ^ 1; | 68 return port ^ 1; |
54 } | 69 } |
55 | 70 |
| 71 // static |
| 72 bool MessagePipe::Deserialize(Channel* channel, |
| 73 const void* source, |
| 74 size_t size, |
| 75 scoped_refptr<MessagePipe>* message_pipe, |
| 76 unsigned* port) { |
| 77 DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely. |
| 78 |
| 79 if (size != sizeof(SerializedMessagePipe)) { |
| 80 LOG(ERROR) << "Invalid serialized message pipe"; |
| 81 return false; |
| 82 } |
| 83 |
| 84 const SerializedMessagePipe* s = |
| 85 static_cast<const SerializedMessagePipe*>(source); |
| 86 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); |
| 87 if (!message_pipe->get()) { |
| 88 LOG(ERROR) << "Failed to deserialize message pipe (ID = " |
| 89 << s->receiver_endpoint_id << ")"; |
| 90 return false; |
| 91 } |
| 92 |
| 93 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " |
| 94 << s->receiver_endpoint_id << ")"; |
| 95 *port = 0; |
| 96 return true; |
| 97 } |
| 98 |
56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | 99 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
57 DCHECK(port == 0 || port == 1); | 100 DCHECK(port == 0 || port == 1); |
58 base::AutoLock locker(lock_); | 101 base::AutoLock locker(lock_); |
59 DCHECK(endpoints_[port]); | 102 DCHECK(endpoints_[port]); |
60 | 103 |
61 return endpoints_[port]->GetType(); | 104 return endpoints_[port]->GetType(); |
62 } | 105 } |
63 | 106 |
64 void MessagePipe::CancelAllWaiters(unsigned port) { | 107 void MessagePipe::CancelAllWaiters(unsigned port) { |
65 DCHECK(port == 0 || port == 1); | 108 DCHECK(port == 0 || port == 1); |
(...skipping 27 matching lines...) Expand all Loading... |
93 unsigned port, | 136 unsigned port, |
94 UserPointer<const void> bytes, | 137 UserPointer<const void> bytes, |
95 uint32_t num_bytes, | 138 uint32_t num_bytes, |
96 std::vector<DispatcherTransport>* transports, | 139 std::vector<DispatcherTransport>* transports, |
97 MojoWriteMessageFlags flags) { | 140 MojoWriteMessageFlags flags) { |
98 DCHECK(port == 0 || port == 1); | 141 DCHECK(port == 0 || port == 1); |
99 return EnqueueMessageInternal( | 142 return EnqueueMessageInternal( |
100 GetPeerPort(port), | 143 GetPeerPort(port), |
101 make_scoped_ptr(new MessageInTransit( | 144 make_scoped_ptr(new MessageInTransit( |
102 MessageInTransit::kTypeMessagePipeEndpoint, | 145 MessageInTransit::kTypeMessagePipeEndpoint, |
103 MessageInTransit::kSubtypeMessagePipeEndpointData, | 146 MessageInTransit::kSubtypeMessagePipeEndpointData, num_bytes, bytes)), |
104 num_bytes, | |
105 bytes)), | |
106 transports); | 147 transports); |
107 } | 148 } |
108 | 149 |
109 MojoResult MessagePipe::ReadMessage(unsigned port, | 150 MojoResult MessagePipe::ReadMessage(unsigned port, |
110 UserPointer<void> bytes, | 151 UserPointer<void> bytes, |
111 UserPointer<uint32_t> num_bytes, | 152 UserPointer<uint32_t> num_bytes, |
112 DispatcherVector* dispatchers, | 153 DispatcherVector* dispatchers, |
113 uint32_t* num_dispatchers, | 154 uint32_t* num_dispatchers, |
114 MojoReadMessageFlags flags) { | 155 MojoReadMessageFlags flags) { |
115 DCHECK(port == 0 || port == 1); | 156 DCHECK(port == 0 || port == 1); |
116 | 157 |
117 base::AutoLock locker(lock_); | 158 base::AutoLock locker(lock_); |
118 DCHECK(endpoints_[port]); | 159 DCHECK(endpoints_[port]); |
119 | 160 |
120 return endpoints_[port]->ReadMessage( | 161 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers, |
121 bytes, num_bytes, dispatchers, num_dispatchers, flags); | 162 num_dispatchers, flags); |
122 } | 163 } |
123 | 164 |
124 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { | 165 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { |
125 DCHECK(port == 0 || port == 1); | 166 DCHECK(port == 0 || port == 1); |
126 | 167 |
127 base::AutoLock locker(const_cast<base::Lock&>(lock_)); | 168 base::AutoLock locker(const_cast<base::Lock&>(lock_)); |
128 DCHECK(endpoints_[port]); | 169 DCHECK(endpoints_[port]); |
129 | 170 |
130 return endpoints_[port]->GetHandleSignalsState(); | 171 return endpoints_[port]->GetHandleSignalsState(); |
131 } | 172 } |
(...skipping 15 matching lines...) Expand all Loading... |
147 Waiter* waiter, | 188 Waiter* waiter, |
148 HandleSignalsState* signals_state) { | 189 HandleSignalsState* signals_state) { |
149 DCHECK(port == 0 || port == 1); | 190 DCHECK(port == 0 || port == 1); |
150 | 191 |
151 base::AutoLock locker(lock_); | 192 base::AutoLock locker(lock_); |
152 DCHECK(endpoints_[port]); | 193 DCHECK(endpoints_[port]); |
153 | 194 |
154 endpoints_[port]->RemoveWaiter(waiter, signals_state); | 195 endpoints_[port]->RemoveWaiter(waiter, signals_state); |
155 } | 196 } |
156 | 197 |
| 198 void MessagePipe::StartSerialize(unsigned /*port*/, |
| 199 Channel* /*channel*/, |
| 200 size_t* max_size, |
| 201 size_t* max_platform_handles) { |
| 202 *max_size = sizeof(SerializedMessagePipe); |
| 203 *max_platform_handles = 0; |
| 204 } |
| 205 |
| 206 bool MessagePipe::EndSerialize( |
| 207 unsigned port, |
| 208 Channel* channel, |
| 209 void* destination, |
| 210 size_t* actual_size, |
| 211 embedder::PlatformHandleVector* /*platform_handles*/) { |
| 212 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); |
| 213 |
| 214 // Convert the local endpoint to a proxy endpoint (moving the message queue) |
| 215 // and attach it to the channel. |
| 216 s->receiver_endpoint_id = |
| 217 channel->AttachAndRunEndpoint(ConvertLocalToProxy(port), false); |
| 218 DVLOG(2) << "Serializing message pipe (remote ID = " |
| 219 << s->receiver_endpoint_id << ")"; |
| 220 *actual_size = sizeof(SerializedMessagePipe); |
| 221 return true; |
| 222 } |
| 223 |
157 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { | 224 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { |
158 DCHECK(port == 0 || port == 1); | 225 DCHECK(port == 0 || port == 1); |
159 | 226 |
160 base::AutoLock locker(lock_); | 227 base::AutoLock locker(lock_); |
161 DCHECK(endpoints_[port]); | 228 DCHECK(endpoints_[port]); |
162 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); | 229 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); |
163 | 230 |
164 // The local peer is already closed, so just make a |ChannelEndpoint| that'll | 231 // The local peer is already closed, so just make a |ChannelEndpoint| that'll |
165 // send the already-queued messages. | 232 // send the already-queued messages. |
166 if (!endpoints_[GetPeerPort(port)]) { | 233 if (!endpoints_[GetPeerPort(port)]) { |
167 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( | 234 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( |
168 nullptr, | 235 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
169 0, | 236 endpoints_[port].get())->message_queue())); |
170 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()) | |
171 ->message_queue())); | |
172 endpoints_[port]->Close(); | 237 endpoints_[port]->Close(); |
173 endpoints_[port].reset(); | 238 endpoints_[port].reset(); |
174 return channel_endpoint; | 239 return channel_endpoint; |
175 } | 240 } |
176 | 241 |
177 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a | 242 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a |
178 // |MessagePipe| with two proxy endpoints, which will then act as a proxy | 243 // |MessagePipe| with two proxy endpoints, which will then act as a proxy |
179 // (rather than trying to connect the two ends directly). | 244 // (rather than trying to connect the two ends directly). |
180 DLOG_IF(WARNING, | 245 DLOG_IF(WARNING, endpoints_[GetPeerPort(port)]->GetType() != |
181 endpoints_[GetPeerPort(port)]->GetType() != | 246 MessagePipeEndpoint::kTypeLocal) |
182 MessagePipeEndpoint::kTypeLocal) | |
183 << "Direct message pipe passing across multiple channels not yet " | 247 << "Direct message pipe passing across multiple channels not yet " |
184 "implemented; will proxy"; | 248 "implemented; will proxy"; |
185 | 249 |
186 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); | 250 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); |
187 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( | 251 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( |
188 this, | 252 this, port, static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get()) |
189 port, | 253 ->message_queue())); |
190 static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get()) | |
191 ->message_queue())); | |
192 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); | 254 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); |
193 old_endpoint->Close(); | 255 old_endpoint->Close(); |
194 | 256 |
195 return channel_endpoint; | 257 return channel_endpoint; |
196 } | 258 } |
197 | 259 |
198 MojoResult MessagePipe::EnqueueMessage(unsigned port, | 260 MojoResult MessagePipe::EnqueueMessage(unsigned port, |
199 scoped_ptr<MessageInTransit> message) { | 261 scoped_ptr<MessageInTransit> message) { |
200 return EnqueueMessageInternal(port, message.Pass(), nullptr); | 262 return EnqueueMessageInternal(port, message.Pass(), nullptr); |
201 } | 263 } |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
273 // Clone the dispatchers and attach them to the message. (This must be done as | 335 // Clone the dispatchers and attach them to the message. (This must be done as |
274 // a separate loop, since we want to leave the dispatchers alone on failure.) | 336 // a separate loop, since we want to leave the dispatchers alone on failure.) |
275 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | 337 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
276 dispatchers->reserve(transports->size()); | 338 dispatchers->reserve(transports->size()); |
277 for (size_t i = 0; i < transports->size(); i++) { | 339 for (size_t i = 0; i < transports->size(); i++) { |
278 if ((*transports)[i].is_valid()) { | 340 if ((*transports)[i].is_valid()) { |
279 dispatchers->push_back( | 341 dispatchers->push_back( |
280 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 342 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
281 } else { | 343 } else { |
282 LOG(WARNING) << "Enqueueing null dispatcher"; | 344 LOG(WARNING) << "Enqueueing null dispatcher"; |
283 dispatchers->push_back(scoped_refptr<Dispatcher>()); | 345 dispatchers->push_back(nullptr); |
284 } | 346 } |
285 } | 347 } |
286 message->SetDispatchers(dispatchers.Pass()); | 348 message->SetDispatchers(dispatchers.Pass()); |
287 return MOJO_RESULT_OK; | 349 return MOJO_RESULT_OK; |
288 } | 350 } |
289 | 351 |
290 MojoResult MessagePipe::HandleControlMessage( | 352 MojoResult MessagePipe::HandleControlMessage( |
291 unsigned /*port*/, | 353 unsigned /*port*/, |
292 scoped_ptr<MessageInTransit> message) { | 354 scoped_ptr<MessageInTransit> message) { |
293 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " | 355 LOG(WARNING) << "Unrecognized MessagePipe control message subtype " |
294 << message->subtype(); | 356 << message->subtype(); |
295 return MOJO_RESULT_UNKNOWN; | 357 return MOJO_RESULT_UNKNOWN; |
296 } | 358 } |
297 | 359 |
298 } // namespace system | 360 } // namespace system |
299 } // namespace mojo | 361 } // namespace mojo |
OLD | NEW |