OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "mojo/edk/system/channel.h" | 8 #include "mojo/edk/system/channel.h" |
9 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
10 #include "mojo/edk/system/channel_endpoint_id.h" | 10 #include "mojo/edk/system/channel_endpoint_id.h" |
| 11 #include "mojo/edk/system/endpoint_relayer.h" |
11 #include "mojo/edk/system/local_message_pipe_endpoint.h" | 12 #include "mojo/edk/system/local_message_pipe_endpoint.h" |
12 #include "mojo/edk/system/message_in_transit.h" | 13 #include "mojo/edk/system/message_in_transit.h" |
13 #include "mojo/edk/system/message_pipe_dispatcher.h" | 14 #include "mojo/edk/system/message_pipe_dispatcher.h" |
14 #include "mojo/edk/system/message_pipe_endpoint.h" | 15 #include "mojo/edk/system/message_pipe_endpoint.h" |
15 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" |
16 | 17 |
17 namespace mojo { | 18 namespace mojo { |
18 namespace system { | 19 namespace system { |
19 | 20 |
20 namespace { | 21 namespace { |
(...skipping 13 matching lines...) Expand all Loading... |
34 MessagePipe* MessagePipe::CreateLocalLocal() { | 35 MessagePipe* MessagePipe::CreateLocalLocal() { |
35 MessagePipe* message_pipe = new MessagePipe(); | 36 MessagePipe* message_pipe = new MessagePipe(); |
36 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 37 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
37 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 38 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
38 return message_pipe; | 39 return message_pipe; |
39 } | 40 } |
40 | 41 |
41 // static | 42 // static |
42 MessagePipe* MessagePipe::CreateLocalProxy( | 43 MessagePipe* MessagePipe::CreateLocalProxy( |
43 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | 44 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
44 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. | 45 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
45 MessagePipe* message_pipe = new MessagePipe(); | 46 MessagePipe* message_pipe = new MessagePipe(); |
46 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); | 47 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
47 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); | 48 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); |
48 message_pipe->endpoints_[1].reset( | 49 message_pipe->endpoints_[1].reset( |
49 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | 50 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
50 return message_pipe; | 51 return message_pipe; |
51 } | 52 } |
52 | 53 |
53 // static | 54 // static |
54 MessagePipe* MessagePipe::CreateProxyLocal( | 55 MessagePipe* MessagePipe::CreateProxyLocal( |
55 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | 56 scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
56 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. | 57 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
57 MessagePipe* message_pipe = new MessagePipe(); | 58 MessagePipe* message_pipe = new MessagePipe(); |
58 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); | 59 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); |
59 message_pipe->endpoints_[0].reset( | 60 message_pipe->endpoints_[0].reset( |
60 new ProxyMessagePipeEndpoint(channel_endpoint->get())); | 61 new ProxyMessagePipeEndpoint(channel_endpoint->get())); |
61 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); | 62 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
62 return message_pipe; | 63 return message_pipe; |
63 } | 64 } |
64 | 65 |
65 // static | 66 // static |
66 unsigned MessagePipe::GetPeerPort(unsigned port) { | 67 unsigned MessagePipe::GetPeerPort(unsigned port) { |
67 DCHECK(port == 0 || port == 1); | 68 DCHECK(port == 0 || port == 1); |
68 return port ^ 1; | 69 return port ^ 1; |
69 } | 70 } |
70 | 71 |
71 // static | 72 // static |
72 bool MessagePipe::Deserialize(Channel* channel, | 73 bool MessagePipe::Deserialize(Channel* channel, |
73 const void* source, | 74 const void* source, |
74 size_t size, | 75 size_t size, |
75 scoped_refptr<MessagePipe>* message_pipe, | 76 scoped_refptr<MessagePipe>* message_pipe, |
76 unsigned* port) { | 77 unsigned* port) { |
77 DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely. | 78 DCHECK(!*message_pipe); // Not technically wrong, but unlikely. |
78 | 79 |
79 if (size != sizeof(SerializedMessagePipe)) { | 80 if (size != sizeof(SerializedMessagePipe)) { |
80 LOG(ERROR) << "Invalid serialized message pipe"; | 81 LOG(ERROR) << "Invalid serialized message pipe"; |
81 return false; | 82 return false; |
82 } | 83 } |
83 | 84 |
84 const SerializedMessagePipe* s = | 85 const SerializedMessagePipe* s = |
85 static_cast<const SerializedMessagePipe*>(source); | 86 static_cast<const SerializedMessagePipe*>(source); |
86 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); | 87 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); |
87 if (!message_pipe->get()) { | 88 if (!*message_pipe) { |
88 LOG(ERROR) << "Failed to deserialize message pipe (ID = " | 89 LOG(ERROR) << "Failed to deserialize message pipe (ID = " |
89 << s->receiver_endpoint_id << ")"; | 90 << s->receiver_endpoint_id << ")"; |
90 return false; | 91 return false; |
91 } | 92 } |
92 | 93 |
93 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " | 94 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " |
94 << s->receiver_endpoint_id << ")"; | 95 << s->receiver_endpoint_id << ")"; |
95 *port = 0; | 96 *port = 0; |
96 return true; | 97 return true; |
97 } | 98 } |
98 | 99 |
99 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { | 100 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
100 DCHECK(port == 0 || port == 1); | 101 DCHECK(port == 0 || port == 1); |
101 base::AutoLock locker(lock_); | 102 base::AutoLock locker(lock_); |
102 DCHECK(endpoints_[port]); | 103 DCHECK(endpoints_[port]); |
103 | 104 |
104 return endpoints_[port]->GetType(); | 105 return endpoints_[port]->GetType(); |
105 } | 106 } |
106 | 107 |
107 void MessagePipe::CancelAllWaiters(unsigned port) { | 108 void MessagePipe::CancelAllAwakables(unsigned port) { |
108 DCHECK(port == 0 || port == 1); | 109 DCHECK(port == 0 || port == 1); |
109 | 110 |
110 base::AutoLock locker(lock_); | 111 base::AutoLock locker(lock_); |
111 DCHECK(endpoints_[port]); | 112 DCHECK(endpoints_[port]); |
112 endpoints_[port]->CancelAllWaiters(); | 113 endpoints_[port]->CancelAllAwakables(); |
113 } | 114 } |
114 | 115 |
115 void MessagePipe::Close(unsigned port) { | 116 void MessagePipe::Close(unsigned port) { |
116 DCHECK(port == 0 || port == 1); | 117 DCHECK(port == 0 || port == 1); |
117 | 118 |
118 unsigned destination_port = GetPeerPort(port); | 119 unsigned peer_port = GetPeerPort(port); |
119 | 120 |
120 base::AutoLock locker(lock_); | 121 base::AutoLock locker(lock_); |
121 // The endpoint's |OnPeerClose()| may have been called first and returned | 122 // The endpoint's |OnPeerClose()| may have been called first and returned |
122 // false, which would have resulted in its destruction. | 123 // false, which would have resulted in its destruction. |
123 if (!endpoints_[port]) | 124 if (!endpoints_[port]) |
124 return; | 125 return; |
125 | 126 |
126 endpoints_[port]->Close(); | 127 endpoints_[port]->Close(); |
127 if (endpoints_[destination_port]) { | 128 if (endpoints_[peer_port]) { |
128 if (!endpoints_[destination_port]->OnPeerClose()) | 129 if (!endpoints_[peer_port]->OnPeerClose()) |
129 endpoints_[destination_port].reset(); | 130 endpoints_[peer_port].reset(); |
130 } | 131 } |
131 endpoints_[port].reset(); | 132 endpoints_[port].reset(); |
132 } | 133 } |
133 | 134 |
134 // TODO(vtl): Handle flags. | 135 // TODO(vtl): Handle flags. |
135 MojoResult MessagePipe::WriteMessage( | 136 MojoResult MessagePipe::WriteMessage( |
136 unsigned port, | 137 unsigned port, |
137 UserPointer<const void> bytes, | 138 UserPointer<const void> bytes, |
138 uint32_t num_bytes, | 139 uint32_t num_bytes, |
139 std::vector<DispatcherTransport>* transports, | 140 std::vector<DispatcherTransport>* transports, |
140 MojoWriteMessageFlags flags) { | 141 MojoWriteMessageFlags flags) { |
141 DCHECK(port == 0 || port == 1); | 142 DCHECK(port == 0 || port == 1); |
142 return EnqueueMessage( | 143 |
| 144 base::AutoLock locker(lock_); |
| 145 return EnqueueMessageNoLock( |
143 GetPeerPort(port), | 146 GetPeerPort(port), |
144 make_scoped_ptr(new MessageInTransit( | 147 make_scoped_ptr(new MessageInTransit( |
145 MessageInTransit::kTypeEndpoint, | 148 MessageInTransit::kTypeEndpoint, |
146 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), | 149 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), |
147 transports); | 150 transports); |
148 } | 151 } |
149 | 152 |
150 MojoResult MessagePipe::ReadMessage(unsigned port, | 153 MojoResult MessagePipe::ReadMessage(unsigned port, |
151 UserPointer<void> bytes, | 154 UserPointer<void> bytes, |
152 UserPointer<uint32_t> num_bytes, | 155 UserPointer<uint32_t> num_bytes, |
(...skipping 11 matching lines...) Expand all Loading... |
164 | 167 |
165 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { | 168 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { |
166 DCHECK(port == 0 || port == 1); | 169 DCHECK(port == 0 || port == 1); |
167 | 170 |
168 base::AutoLock locker(const_cast<base::Lock&>(lock_)); | 171 base::AutoLock locker(const_cast<base::Lock&>(lock_)); |
169 DCHECK(endpoints_[port]); | 172 DCHECK(endpoints_[port]); |
170 | 173 |
171 return endpoints_[port]->GetHandleSignalsState(); | 174 return endpoints_[port]->GetHandleSignalsState(); |
172 } | 175 } |
173 | 176 |
174 MojoResult MessagePipe::AddWaiter(unsigned port, | 177 MojoResult MessagePipe::AddAwakable(unsigned port, |
175 Waiter* waiter, | 178 Awakable* awakable, |
176 MojoHandleSignals signals, | 179 MojoHandleSignals signals, |
177 uint32_t context, | 180 uint32_t context, |
178 HandleSignalsState* signals_state) { | 181 HandleSignalsState* signals_state) { |
179 DCHECK(port == 0 || port == 1); | 182 DCHECK(port == 0 || port == 1); |
180 | 183 |
181 base::AutoLock locker(lock_); | 184 base::AutoLock locker(lock_); |
182 DCHECK(endpoints_[port]); | 185 DCHECK(endpoints_[port]); |
183 | 186 |
184 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state); | 187 return endpoints_[port]->AddAwakable(awakable, signals, context, |
| 188 signals_state); |
185 } | 189 } |
186 | 190 |
187 void MessagePipe::RemoveWaiter(unsigned port, | 191 void MessagePipe::RemoveAwakable(unsigned port, |
188 Waiter* waiter, | 192 Awakable* awakable, |
189 HandleSignalsState* signals_state) { | 193 HandleSignalsState* signals_state) { |
190 DCHECK(port == 0 || port == 1); | 194 DCHECK(port == 0 || port == 1); |
191 | 195 |
192 base::AutoLock locker(lock_); | 196 base::AutoLock locker(lock_); |
193 DCHECK(endpoints_[port]); | 197 DCHECK(endpoints_[port]); |
194 | 198 |
195 endpoints_[port]->RemoveWaiter(waiter, signals_state); | 199 endpoints_[port]->RemoveAwakable(awakable, signals_state); |
196 } | 200 } |
197 | 201 |
198 void MessagePipe::StartSerialize(unsigned /*port*/, | 202 void MessagePipe::StartSerialize(unsigned /*port*/, |
199 Channel* /*channel*/, | 203 Channel* /*channel*/, |
200 size_t* max_size, | 204 size_t* max_size, |
201 size_t* max_platform_handles) { | 205 size_t* max_platform_handles) { |
202 *max_size = sizeof(SerializedMessagePipe); | 206 *max_size = sizeof(SerializedMessagePipe); |
203 *max_platform_handles = 0; | 207 *max_platform_handles = 0; |
204 } | 208 } |
205 | 209 |
(...skipping 30 matching lines...) Expand all Loading... |
236 // |ProxyMessagePipeEndpoint| to replace |port|'s | 240 // |ProxyMessagePipeEndpoint| to replace |port|'s |
237 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer | 241 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer |
238 // port's message pipe dispatcher will continue to hold a reference to | 242 // port's message pipe dispatcher will continue to hold a reference to |
239 // us. | 243 // us. |
240 // | 244 // |
241 // 3. The peer port is remote. | 245 // 3. The peer port is remote. |
242 // | 246 // |
243 // We also pass its |ChannelEndpoint| to the channel, which then decides | 247 // We also pass its |ChannelEndpoint| to the channel, which then decides |
244 // what to do. We have no reason to continue to exist. | 248 // what to do. We have no reason to continue to exist. |
245 // | 249 // |
246 // TODO(vtl): Factor some of this out to |ChannelEndpoint|. | 250 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). |
247 | 251 |
248 if (!endpoints_[GetPeerPort(port)]) { | 252 unsigned peer_port = GetPeerPort(port); |
| 253 if (!endpoints_[peer_port]) { |
249 // Case 1. | 254 // Case 1. |
250 channel_endpoint = new ChannelEndpoint( | 255 channel_endpoint = new ChannelEndpoint( |
251 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( | 256 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
252 endpoints_[port].get())->message_queue()); | 257 endpoints_[port].get())->message_queue()); |
253 endpoints_[port]->Close(); | 258 endpoints_[port]->Close(); |
254 endpoints_[port].reset(); | 259 endpoints_[port].reset(); |
255 } else if (endpoints_[GetPeerPort(port)]->GetType() == | 260 } else if (endpoints_[peer_port]->GetType() == |
256 MessagePipeEndpoint::kTypeLocal) { | 261 MessagePipeEndpoint::kTypeLocal) { |
257 // Case 2. | 262 // Case 2. |
258 channel_endpoint = new ChannelEndpoint( | 263 channel_endpoint = new ChannelEndpoint( |
259 this, port, static_cast<LocalMessagePipeEndpoint*>( | 264 this, port, static_cast<LocalMessagePipeEndpoint*>( |
260 endpoints_[port].get())->message_queue()); | 265 endpoints_[port].get())->message_queue()); |
261 endpoints_[port]->Close(); | 266 endpoints_[port]->Close(); |
262 endpoints_[port].reset( | 267 endpoints_[port].reset( |
263 new ProxyMessagePipeEndpoint(channel_endpoint.get())); | 268 new ProxyMessagePipeEndpoint(channel_endpoint.get())); |
264 } else { | 269 } else { |
265 // Case 3. | 270 // Case 3. |
266 // TODO(vtl): Temporarily the same as case 2. | |
267 DLOG(WARNING) << "Direct message pipe passing across multiple channels " | 271 DLOG(WARNING) << "Direct message pipe passing across multiple channels " |
268 "not yet implemented; will proxy"; | 272 "not yet implemented; will proxy"; |
| 273 |
| 274 // Create an |EndpointRelayer| to replace ourselves (rather than having a |
| 275 // |MessagePipe| object that exists solely to relay messages between two |
| 276 // |ChannelEndpoint|s, owned by the |Channel| through them. |
| 277 // |
| 278 // This reduces overhead somewhat, and more importantly restores some |
| 279 // invariants, e.g., that |MessagePipe|s are owned by dispatchers. |
| 280 // |
| 281 // TODO(vtl): If we get the |Channel| to own/track the relayer directly, |
| 282 // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw |
| 283 // pointer (and not have the |Channel| owning the relayer via its |
| 284 // |ChannelEndpoint|s. |
| 285 // |
| 286 // TODO(vtl): This is not obviously the right place for (all of) this |
| 287 // logic, nor is it obviously factored correctly. |
| 288 |
| 289 DCHECK_EQ(endpoints_[peer_port]->GetType(), |
| 290 MessagePipeEndpoint::kTypeProxy); |
| 291 ProxyMessagePipeEndpoint* peer_endpoint = |
| 292 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); |
| 293 scoped_refptr<ChannelEndpoint> peer_channel_endpoint = |
| 294 peer_endpoint->ReleaseChannelEndpoint(); |
| 295 |
| 296 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); |
| 297 // We'll assign our peer port's endpoint to the relayer's port 1, and this |
| 298 // port's endpoint to the relayer's port 0. |
269 channel_endpoint = new ChannelEndpoint( | 299 channel_endpoint = new ChannelEndpoint( |
270 this, port, static_cast<LocalMessagePipeEndpoint*>( | 300 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( |
271 endpoints_[port].get())->message_queue()); | 301 endpoints_[port].get())->message_queue()); |
| 302 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); |
| 303 peer_channel_endpoint->ReplaceClient(relayer.get(), 1); |
| 304 |
272 endpoints_[port]->Close(); | 305 endpoints_[port]->Close(); |
273 endpoints_[port].reset( | 306 endpoints_[port].reset(); |
274 new ProxyMessagePipeEndpoint(channel_endpoint.get())); | 307 // No need to call |Close()| after |ReleaseChannelEndpoint()|. |
| 308 endpoints_[peer_port].reset(); |
275 } | 309 } |
276 } | 310 } |
277 | 311 |
278 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); | 312 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); |
279 | 313 |
280 // Convert the local endpoint to a proxy endpoint (moving the message queue) | 314 // Convert the local endpoint to a proxy endpoint (moving the message queue) |
281 // and attach it to the channel. | 315 // and attach it to the channel. |
282 s->receiver_endpoint_id = | 316 s->receiver_endpoint_id = |
283 channel->AttachAndRunEndpoint(channel_endpoint, false); | 317 channel->AttachAndRunEndpoint(channel_endpoint, false); |
284 DVLOG(2) << "Serializing message pipe (remote ID = " | 318 DVLOG(2) << "Serializing message pipe (remote ID = " |
285 << s->receiver_endpoint_id << ")"; | 319 << s->receiver_endpoint_id << ")"; |
286 *actual_size = sizeof(SerializedMessagePipe); | 320 *actual_size = sizeof(SerializedMessagePipe); |
287 return true; | 321 return true; |
288 } | 322 } |
289 | 323 |
290 void MessagePipe::OnReadMessage(unsigned port, | 324 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
291 scoped_ptr<MessageInTransit> message) { | 325 base::AutoLock locker(lock_); |
| 326 |
| 327 if (!endpoints_[port]) { |
| 328 // This will happen only on the rare occasion that the call to |
| 329 // |OnReadMessage()| is racing with us calling |
| 330 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
| 331 // and the |ChannelEndpoint| can retry (calling the new client's |
| 332 // |OnReadMessage()|). |
| 333 return false; |
| 334 } |
| 335 |
292 // This is called when the |ChannelEndpoint| for the | 336 // This is called when the |ChannelEndpoint| for the |
293 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). | 337 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). |
294 // We need to pass this message on to its peer port (typically a | 338 // We need to pass this message on to its peer port (typically a |
295 // |LocalMessagePipeEndpoint|). | 339 // |LocalMessagePipeEndpoint|). |
296 MojoResult result = | 340 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port), |
297 EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr); | 341 make_scoped_ptr(message), nullptr); |
298 DLOG_IF(WARNING, result != MOJO_RESULT_OK) | 342 DLOG_IF(WARNING, result != MOJO_RESULT_OK) |
299 << "EnqueueMessage() failed (result = " << result << ")"; | 343 << "EnqueueMessageNoLock() failed (result = " << result << ")"; |
| 344 return true; |
300 } | 345 } |
301 | 346 |
302 void MessagePipe::OnDetachFromChannel(unsigned port) { | 347 void MessagePipe::OnDetachFromChannel(unsigned port) { |
303 Close(port); | 348 Close(port); |
304 } | 349 } |
305 | 350 |
306 MessagePipe::MessagePipe() { | 351 MessagePipe::MessagePipe() { |
307 } | 352 } |
308 | 353 |
309 MessagePipe::~MessagePipe() { | 354 MessagePipe::~MessagePipe() { |
310 // Owned by the dispatchers. The owning dispatchers should only release us via | 355 // Owned by the dispatchers. The owning dispatchers should only release us via |
311 // their |Close()| method, which should inform us of being closed via our | 356 // their |Close()| method, which should inform us of being closed via our |
312 // |Close()|. Thus these should already be null. | 357 // |Close()|. Thus these should already be null. |
313 DCHECK(!endpoints_[0]); | 358 DCHECK(!endpoints_[0]); |
314 DCHECK(!endpoints_[1]); | 359 DCHECK(!endpoints_[1]); |
315 } | 360 } |
316 | 361 |
317 MojoResult MessagePipe::EnqueueMessage( | 362 MojoResult MessagePipe::EnqueueMessageNoLock( |
318 unsigned port, | 363 unsigned port, |
319 scoped_ptr<MessageInTransit> message, | 364 scoped_ptr<MessageInTransit> message, |
320 std::vector<DispatcherTransport>* transports) { | 365 std::vector<DispatcherTransport>* transports) { |
321 DCHECK(port == 0 || port == 1); | 366 DCHECK(port == 0 || port == 1); |
322 DCHECK(message); | 367 DCHECK(message); |
323 | 368 |
324 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); | 369 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); |
325 | |
326 base::AutoLock locker(lock_); | |
327 DCHECK(endpoints_[GetPeerPort(port)]); | 370 DCHECK(endpoints_[GetPeerPort(port)]); |
328 | 371 |
329 // The destination port need not be open, unlike the source port. | 372 // The destination port need not be open, unlike the source port. |
330 if (!endpoints_[port]) | 373 if (!endpoints_[port]) |
331 return MOJO_RESULT_FAILED_PRECONDITION; | 374 return MOJO_RESULT_FAILED_PRECONDITION; |
332 | 375 |
333 if (transports) { | 376 if (transports) { |
334 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); | 377 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); |
335 if (result != MOJO_RESULT_OK) | 378 if (result != MOJO_RESULT_OK) |
336 return result; | 379 return result; |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
380 LOG(WARNING) << "Enqueueing null dispatcher"; | 423 LOG(WARNING) << "Enqueueing null dispatcher"; |
381 dispatchers->push_back(nullptr); | 424 dispatchers->push_back(nullptr); |
382 } | 425 } |
383 } | 426 } |
384 message->SetDispatchers(dispatchers.Pass()); | 427 message->SetDispatchers(dispatchers.Pass()); |
385 return MOJO_RESULT_OK; | 428 return MOJO_RESULT_OK; |
386 } | 429 } |
387 | 430 |
388 } // namespace system | 431 } // namespace system |
389 } // namespace mojo | 432 } // namespace mojo |
OLD | NEW |