Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(153)

Side by Side Diff: mojo/edk/system/message_pipe.cc

Issue 782693004: Update mojo sdk to rev f6c8ec07c01deebc13178d516225fd12695c3dc2 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: hack mojo_system_impl gypi for android :| Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe.h" 5 #include "mojo/edk/system/message_pipe.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h" 8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/channel_endpoint.h" 9 #include "mojo/edk/system/channel_endpoint.h"
10 #include "mojo/edk/system/channel_endpoint_id.h" 10 #include "mojo/edk/system/channel_endpoint_id.h"
11 #include "mojo/edk/system/endpoint_relayer.h"
11 #include "mojo/edk/system/local_message_pipe_endpoint.h" 12 #include "mojo/edk/system/local_message_pipe_endpoint.h"
12 #include "mojo/edk/system/message_in_transit.h" 13 #include "mojo/edk/system/message_in_transit.h"
13 #include "mojo/edk/system/message_pipe_dispatcher.h" 14 #include "mojo/edk/system/message_pipe_dispatcher.h"
14 #include "mojo/edk/system/message_pipe_endpoint.h" 15 #include "mojo/edk/system/message_pipe_endpoint.h"
15 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
16 17
17 namespace mojo { 18 namespace mojo {
18 namespace system { 19 namespace system {
19 20
20 namespace { 21 namespace {
(...skipping 13 matching lines...) Expand all
34 MessagePipe* MessagePipe::CreateLocalLocal() { 35 MessagePipe* MessagePipe::CreateLocalLocal() {
35 MessagePipe* message_pipe = new MessagePipe(); 36 MessagePipe* message_pipe = new MessagePipe();
36 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 37 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
37 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 38 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
38 return message_pipe; 39 return message_pipe;
39 } 40 }
40 41
41 // static 42 // static
42 MessagePipe* MessagePipe::CreateLocalProxy( 43 MessagePipe* MessagePipe::CreateLocalProxy(
43 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 44 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
44 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. 45 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
45 MessagePipe* message_pipe = new MessagePipe(); 46 MessagePipe* message_pipe = new MessagePipe();
46 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); 47 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
47 *channel_endpoint = new ChannelEndpoint(message_pipe, 1); 48 *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
48 message_pipe->endpoints_[1].reset( 49 message_pipe->endpoints_[1].reset(
49 new ProxyMessagePipeEndpoint(channel_endpoint->get())); 50 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
50 return message_pipe; 51 return message_pipe;
51 } 52 }
52 53
53 // static 54 // static
54 MessagePipe* MessagePipe::CreateProxyLocal( 55 MessagePipe* MessagePipe::CreateProxyLocal(
55 scoped_refptr<ChannelEndpoint>* channel_endpoint) { 56 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
56 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. 57 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
57 MessagePipe* message_pipe = new MessagePipe(); 58 MessagePipe* message_pipe = new MessagePipe();
58 *channel_endpoint = new ChannelEndpoint(message_pipe, 0); 59 *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
59 message_pipe->endpoints_[0].reset( 60 message_pipe->endpoints_[0].reset(
60 new ProxyMessagePipeEndpoint(channel_endpoint->get())); 61 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
61 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); 62 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
62 return message_pipe; 63 return message_pipe;
63 } 64 }
64 65
65 // static 66 // static
66 unsigned MessagePipe::GetPeerPort(unsigned port) { 67 unsigned MessagePipe::GetPeerPort(unsigned port) {
67 DCHECK(port == 0 || port == 1); 68 DCHECK(port == 0 || port == 1);
68 return port ^ 1; 69 return port ^ 1;
69 } 70 }
70 71
71 // static 72 // static
72 bool MessagePipe::Deserialize(Channel* channel, 73 bool MessagePipe::Deserialize(Channel* channel,
73 const void* source, 74 const void* source,
74 size_t size, 75 size_t size,
75 scoped_refptr<MessagePipe>* message_pipe, 76 scoped_refptr<MessagePipe>* message_pipe,
76 unsigned* port) { 77 unsigned* port) {
77 DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely. 78 DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
78 79
79 if (size != sizeof(SerializedMessagePipe)) { 80 if (size != sizeof(SerializedMessagePipe)) {
80 LOG(ERROR) << "Invalid serialized message pipe"; 81 LOG(ERROR) << "Invalid serialized message pipe";
81 return false; 82 return false;
82 } 83 }
83 84
84 const SerializedMessagePipe* s = 85 const SerializedMessagePipe* s =
85 static_cast<const SerializedMessagePipe*>(source); 86 static_cast<const SerializedMessagePipe*>(source);
86 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); 87 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
87 if (!message_pipe->get()) { 88 if (!*message_pipe) {
88 LOG(ERROR) << "Failed to deserialize message pipe (ID = " 89 LOG(ERROR) << "Failed to deserialize message pipe (ID = "
89 << s->receiver_endpoint_id << ")"; 90 << s->receiver_endpoint_id << ")";
90 return false; 91 return false;
91 } 92 }
92 93
93 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " 94 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
94 << s->receiver_endpoint_id << ")"; 95 << s->receiver_endpoint_id << ")";
95 *port = 0; 96 *port = 0;
96 return true; 97 return true;
97 } 98 }
98 99
99 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { 100 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
100 DCHECK(port == 0 || port == 1); 101 DCHECK(port == 0 || port == 1);
101 base::AutoLock locker(lock_); 102 base::AutoLock locker(lock_);
102 DCHECK(endpoints_[port]); 103 DCHECK(endpoints_[port]);
103 104
104 return endpoints_[port]->GetType(); 105 return endpoints_[port]->GetType();
105 } 106 }
106 107
107 void MessagePipe::CancelAllWaiters(unsigned port) { 108 void MessagePipe::CancelAllAwakables(unsigned port) {
108 DCHECK(port == 0 || port == 1); 109 DCHECK(port == 0 || port == 1);
109 110
110 base::AutoLock locker(lock_); 111 base::AutoLock locker(lock_);
111 DCHECK(endpoints_[port]); 112 DCHECK(endpoints_[port]);
112 endpoints_[port]->CancelAllWaiters(); 113 endpoints_[port]->CancelAllAwakables();
113 } 114 }
114 115
115 void MessagePipe::Close(unsigned port) { 116 void MessagePipe::Close(unsigned port) {
116 DCHECK(port == 0 || port == 1); 117 DCHECK(port == 0 || port == 1);
117 118
118 unsigned destination_port = GetPeerPort(port); 119 unsigned peer_port = GetPeerPort(port);
119 120
120 base::AutoLock locker(lock_); 121 base::AutoLock locker(lock_);
121 // The endpoint's |OnPeerClose()| may have been called first and returned 122 // The endpoint's |OnPeerClose()| may have been called first and returned
122 // false, which would have resulted in its destruction. 123 // false, which would have resulted in its destruction.
123 if (!endpoints_[port]) 124 if (!endpoints_[port])
124 return; 125 return;
125 126
126 endpoints_[port]->Close(); 127 endpoints_[port]->Close();
127 if (endpoints_[destination_port]) { 128 if (endpoints_[peer_port]) {
128 if (!endpoints_[destination_port]->OnPeerClose()) 129 if (!endpoints_[peer_port]->OnPeerClose())
129 endpoints_[destination_port].reset(); 130 endpoints_[peer_port].reset();
130 } 131 }
131 endpoints_[port].reset(); 132 endpoints_[port].reset();
132 } 133 }
133 134
134 // TODO(vtl): Handle flags. 135 // TODO(vtl): Handle flags.
135 MojoResult MessagePipe::WriteMessage( 136 MojoResult MessagePipe::WriteMessage(
136 unsigned port, 137 unsigned port,
137 UserPointer<const void> bytes, 138 UserPointer<const void> bytes,
138 uint32_t num_bytes, 139 uint32_t num_bytes,
139 std::vector<DispatcherTransport>* transports, 140 std::vector<DispatcherTransport>* transports,
140 MojoWriteMessageFlags flags) { 141 MojoWriteMessageFlags flags) {
141 DCHECK(port == 0 || port == 1); 142 DCHECK(port == 0 || port == 1);
142 return EnqueueMessage( 143
144 base::AutoLock locker(lock_);
145 return EnqueueMessageNoLock(
143 GetPeerPort(port), 146 GetPeerPort(port),
144 make_scoped_ptr(new MessageInTransit( 147 make_scoped_ptr(new MessageInTransit(
145 MessageInTransit::kTypeEndpoint, 148 MessageInTransit::kTypeEndpoint,
146 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), 149 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)),
147 transports); 150 transports);
148 } 151 }
149 152
150 MojoResult MessagePipe::ReadMessage(unsigned port, 153 MojoResult MessagePipe::ReadMessage(unsigned port,
151 UserPointer<void> bytes, 154 UserPointer<void> bytes,
152 UserPointer<uint32_t> num_bytes, 155 UserPointer<uint32_t> num_bytes,
(...skipping 11 matching lines...) Expand all
164 167
165 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { 168 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
166 DCHECK(port == 0 || port == 1); 169 DCHECK(port == 0 || port == 1);
167 170
168 base::AutoLock locker(const_cast<base::Lock&>(lock_)); 171 base::AutoLock locker(const_cast<base::Lock&>(lock_));
169 DCHECK(endpoints_[port]); 172 DCHECK(endpoints_[port]);
170 173
171 return endpoints_[port]->GetHandleSignalsState(); 174 return endpoints_[port]->GetHandleSignalsState();
172 } 175 }
173 176
174 MojoResult MessagePipe::AddWaiter(unsigned port, 177 MojoResult MessagePipe::AddAwakable(unsigned port,
175 Waiter* waiter, 178 Awakable* awakable,
176 MojoHandleSignals signals, 179 MojoHandleSignals signals,
177 uint32_t context, 180 uint32_t context,
178 HandleSignalsState* signals_state) { 181 HandleSignalsState* signals_state) {
179 DCHECK(port == 0 || port == 1); 182 DCHECK(port == 0 || port == 1);
180 183
181 base::AutoLock locker(lock_); 184 base::AutoLock locker(lock_);
182 DCHECK(endpoints_[port]); 185 DCHECK(endpoints_[port]);
183 186
184 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state); 187 return endpoints_[port]->AddAwakable(awakable, signals, context,
188 signals_state);
185 } 189 }
186 190
187 void MessagePipe::RemoveWaiter(unsigned port, 191 void MessagePipe::RemoveAwakable(unsigned port,
188 Waiter* waiter, 192 Awakable* awakable,
189 HandleSignalsState* signals_state) { 193 HandleSignalsState* signals_state) {
190 DCHECK(port == 0 || port == 1); 194 DCHECK(port == 0 || port == 1);
191 195
192 base::AutoLock locker(lock_); 196 base::AutoLock locker(lock_);
193 DCHECK(endpoints_[port]); 197 DCHECK(endpoints_[port]);
194 198
195 endpoints_[port]->RemoveWaiter(waiter, signals_state); 199 endpoints_[port]->RemoveAwakable(awakable, signals_state);
196 } 200 }
197 201
198 void MessagePipe::StartSerialize(unsigned /*port*/, 202 void MessagePipe::StartSerialize(unsigned /*port*/,
199 Channel* /*channel*/, 203 Channel* /*channel*/,
200 size_t* max_size, 204 size_t* max_size,
201 size_t* max_platform_handles) { 205 size_t* max_platform_handles) {
202 *max_size = sizeof(SerializedMessagePipe); 206 *max_size = sizeof(SerializedMessagePipe);
203 *max_platform_handles = 0; 207 *max_platform_handles = 0;
204 } 208 }
205 209
(...skipping 30 matching lines...) Expand all
236 // |ProxyMessagePipeEndpoint| to replace |port|'s 240 // |ProxyMessagePipeEndpoint| to replace |port|'s
237 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer 241 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer
238 // port's message pipe dispatcher will continue to hold a reference to 242 // port's message pipe dispatcher will continue to hold a reference to
239 // us. 243 // us.
240 // 244 //
241 // 3. The peer port is remote. 245 // 3. The peer port is remote.
242 // 246 //
243 // We also pass its |ChannelEndpoint| to the channel, which then decides 247 // We also pass its |ChannelEndpoint| to the channel, which then decides
244 // what to do. We have no reason to continue to exist. 248 // what to do. We have no reason to continue to exist.
245 // 249 //
246 // TODO(vtl): Factor some of this out to |ChannelEndpoint|. 250 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
247 251
248 if (!endpoints_[GetPeerPort(port)]) { 252 unsigned peer_port = GetPeerPort(port);
253 if (!endpoints_[peer_port]) {
249 // Case 1. 254 // Case 1.
250 channel_endpoint = new ChannelEndpoint( 255 channel_endpoint = new ChannelEndpoint(
251 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( 256 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
252 endpoints_[port].get())->message_queue()); 257 endpoints_[port].get())->message_queue());
253 endpoints_[port]->Close(); 258 endpoints_[port]->Close();
254 endpoints_[port].reset(); 259 endpoints_[port].reset();
255 } else if (endpoints_[GetPeerPort(port)]->GetType() == 260 } else if (endpoints_[peer_port]->GetType() ==
256 MessagePipeEndpoint::kTypeLocal) { 261 MessagePipeEndpoint::kTypeLocal) {
257 // Case 2. 262 // Case 2.
258 channel_endpoint = new ChannelEndpoint( 263 channel_endpoint = new ChannelEndpoint(
259 this, port, static_cast<LocalMessagePipeEndpoint*>( 264 this, port, static_cast<LocalMessagePipeEndpoint*>(
260 endpoints_[port].get())->message_queue()); 265 endpoints_[port].get())->message_queue());
261 endpoints_[port]->Close(); 266 endpoints_[port]->Close();
262 endpoints_[port].reset( 267 endpoints_[port].reset(
263 new ProxyMessagePipeEndpoint(channel_endpoint.get())); 268 new ProxyMessagePipeEndpoint(channel_endpoint.get()));
264 } else { 269 } else {
265 // Case 3. 270 // Case 3.
266 // TODO(vtl): Temporarily the same as case 2.
267 DLOG(WARNING) << "Direct message pipe passing across multiple channels " 271 DLOG(WARNING) << "Direct message pipe passing across multiple channels "
268 "not yet implemented; will proxy"; 272 "not yet implemented; will proxy";
273
274 // Create an |EndpointRelayer| to replace ourselves (rather than having a
275 // |MessagePipe| object that exists solely to relay messages between two
276 // |ChannelEndpoint|s, owned by the |Channel| through them.
277 //
278 // This reduces overhead somewhat, and more importantly restores some
279 // invariants, e.g., that |MessagePipe|s are owned by dispatchers.
280 //
281 // TODO(vtl): If we get the |Channel| to own/track the relayer directly,
282 // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw
283 // pointer (and not have the |Channel| owning the relayer via its
284 // |ChannelEndpoint|s.
285 //
286 // TODO(vtl): This is not obviously the right place for (all of) this
287 // logic, nor is it obviously factored correctly.
288
289 DCHECK_EQ(endpoints_[peer_port]->GetType(),
290 MessagePipeEndpoint::kTypeProxy);
291 ProxyMessagePipeEndpoint* peer_endpoint =
292 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
293 scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
294 peer_endpoint->ReleaseChannelEndpoint();
295
296 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer());
297 // We'll assign our peer port's endpoint to the relayer's port 1, and this
298 // port's endpoint to the relayer's port 0.
269 channel_endpoint = new ChannelEndpoint( 299 channel_endpoint = new ChannelEndpoint(
270 this, port, static_cast<LocalMessagePipeEndpoint*>( 300 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>(
271 endpoints_[port].get())->message_queue()); 301 endpoints_[port].get())->message_queue());
302 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
303 peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
304
272 endpoints_[port]->Close(); 305 endpoints_[port]->Close();
273 endpoints_[port].reset( 306 endpoints_[port].reset();
274 new ProxyMessagePipeEndpoint(channel_endpoint.get())); 307 // No need to call |Close()| after |ReleaseChannelEndpoint()|.
308 endpoints_[peer_port].reset();
275 } 309 }
276 } 310 }
277 311
278 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); 312 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination);
279 313
280 // Convert the local endpoint to a proxy endpoint (moving the message queue) 314 // Convert the local endpoint to a proxy endpoint (moving the message queue)
281 // and attach it to the channel. 315 // and attach it to the channel.
282 s->receiver_endpoint_id = 316 s->receiver_endpoint_id =
283 channel->AttachAndRunEndpoint(channel_endpoint, false); 317 channel->AttachAndRunEndpoint(channel_endpoint, false);
284 DVLOG(2) << "Serializing message pipe (remote ID = " 318 DVLOG(2) << "Serializing message pipe (remote ID = "
285 << s->receiver_endpoint_id << ")"; 319 << s->receiver_endpoint_id << ")";
286 *actual_size = sizeof(SerializedMessagePipe); 320 *actual_size = sizeof(SerializedMessagePipe);
287 return true; 321 return true;
288 } 322 }
289 323
290 void MessagePipe::OnReadMessage(unsigned port, 324 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
291 scoped_ptr<MessageInTransit> message) { 325 base::AutoLock locker(lock_);
326
327 if (!endpoints_[port]) {
328 // This will happen only on the rare occasion that the call to
329 // |OnReadMessage()| is racing with us calling
330 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
331 // and the |ChannelEndpoint| can retry (calling the new client's
332 // |OnReadMessage()|).
333 return false;
334 }
335
292 // This is called when the |ChannelEndpoint| for the 336 // This is called when the |ChannelEndpoint| for the
293 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). 337 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
294 // We need to pass this message on to its peer port (typically a 338 // We need to pass this message on to its peer port (typically a
295 // |LocalMessagePipeEndpoint|). 339 // |LocalMessagePipeEndpoint|).
296 MojoResult result = 340 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port),
297 EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr); 341 make_scoped_ptr(message), nullptr);
298 DLOG_IF(WARNING, result != MOJO_RESULT_OK) 342 DLOG_IF(WARNING, result != MOJO_RESULT_OK)
299 << "EnqueueMessage() failed (result = " << result << ")"; 343 << "EnqueueMessageNoLock() failed (result = " << result << ")";
344 return true;
300 } 345 }
301 346
302 void MessagePipe::OnDetachFromChannel(unsigned port) { 347 void MessagePipe::OnDetachFromChannel(unsigned port) {
303 Close(port); 348 Close(port);
304 } 349 }
305 350
306 MessagePipe::MessagePipe() { 351 MessagePipe::MessagePipe() {
307 } 352 }
308 353
309 MessagePipe::~MessagePipe() { 354 MessagePipe::~MessagePipe() {
310 // Owned by the dispatchers. The owning dispatchers should only release us via 355 // Owned by the dispatchers. The owning dispatchers should only release us via
311 // their |Close()| method, which should inform us of being closed via our 356 // their |Close()| method, which should inform us of being closed via our
312 // |Close()|. Thus these should already be null. 357 // |Close()|. Thus these should already be null.
313 DCHECK(!endpoints_[0]); 358 DCHECK(!endpoints_[0]);
314 DCHECK(!endpoints_[1]); 359 DCHECK(!endpoints_[1]);
315 } 360 }
316 361
317 MojoResult MessagePipe::EnqueueMessage( 362 MojoResult MessagePipe::EnqueueMessageNoLock(
318 unsigned port, 363 unsigned port,
319 scoped_ptr<MessageInTransit> message, 364 scoped_ptr<MessageInTransit> message,
320 std::vector<DispatcherTransport>* transports) { 365 std::vector<DispatcherTransport>* transports) {
321 DCHECK(port == 0 || port == 1); 366 DCHECK(port == 0 || port == 1);
322 DCHECK(message); 367 DCHECK(message);
323 368
324 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); 369 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint);
325
326 base::AutoLock locker(lock_);
327 DCHECK(endpoints_[GetPeerPort(port)]); 370 DCHECK(endpoints_[GetPeerPort(port)]);
328 371
329 // The destination port need not be open, unlike the source port. 372 // The destination port need not be open, unlike the source port.
330 if (!endpoints_[port]) 373 if (!endpoints_[port])
331 return MOJO_RESULT_FAILED_PRECONDITION; 374 return MOJO_RESULT_FAILED_PRECONDITION;
332 375
333 if (transports) { 376 if (transports) {
334 MojoResult result = AttachTransportsNoLock(port, message.get(), transports); 377 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
335 if (result != MOJO_RESULT_OK) 378 if (result != MOJO_RESULT_OK)
336 return result; 379 return result;
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
380 LOG(WARNING) << "Enqueueing null dispatcher"; 423 LOG(WARNING) << "Enqueueing null dispatcher";
381 dispatchers->push_back(nullptr); 424 dispatchers->push_back(nullptr);
382 } 425 }
383 } 426 }
384 message->SetDispatchers(dispatchers.Pass()); 427 message->SetDispatchers(dispatchers.Pass());
385 return MOJO_RESULT_OK; 428 return MOJO_RESULT_OK;
386 } 429 }
387 430
388 } // namespace system 431 } // namespace system
389 } // namespace mojo 432 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698