OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe.h" | 5 #include "mojo/edk/system/message_pipe.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "mojo/edk/system/channel.h" | 8 #include "mojo/edk/system/channel.h" |
9 #include "mojo/edk/system/channel_endpoint.h" | 9 #include "mojo/edk/system/channel_endpoint.h" |
10 #include "mojo/edk/system/channel_endpoint_id.h" | 10 #include "mojo/edk/system/channel_endpoint_id.h" |
(...skipping 188 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
199 Awakable* awakable, | 199 Awakable* awakable, |
200 HandleSignalsState* signals_state) { | 200 HandleSignalsState* signals_state) { |
201 DCHECK(port == 0 || port == 1); | 201 DCHECK(port == 0 || port == 1); |
202 | 202 |
203 base::AutoLock locker(lock_); | 203 base::AutoLock locker(lock_); |
204 DCHECK(endpoints_[port]); | 204 DCHECK(endpoints_[port]); |
205 | 205 |
206 endpoints_[port]->RemoveAwakable(awakable, signals_state); | 206 endpoints_[port]->RemoveAwakable(awakable, signals_state); |
207 } | 207 } |
208 | 208 |
| 209 MojoResult MessagePipe::SetAsyncMessageCallback( |
| 210 unsigned port, |
| 211 const AsyncMessageCallback& callback) { |
| 212 base::AutoLock locker(lock_); |
| 213 DCHECK(endpoints_[port]); |
| 214 return endpoints_[port]->SetAsyncMessageCallback(callback); |
| 215 } |
| 216 |
209 void MessagePipe::StartSerialize(unsigned /*port*/, | 217 void MessagePipe::StartSerialize(unsigned /*port*/, |
210 Channel* channel, | 218 Channel* channel, |
211 size_t* max_size, | 219 size_t* max_size, |
212 size_t* max_platform_handles) { | 220 size_t* max_platform_handles) { |
213 *max_size = channel->GetSerializedEndpointSize(); | 221 *max_size = channel->GetSerializedEndpointSize(); |
214 *max_platform_handles = 0; | 222 *max_platform_handles = 0; |
215 } | 223 } |
216 | 224 |
217 bool MessagePipe::EndSerialize( | 225 bool MessagePipe::EndSerialize( |
218 unsigned port, | 226 unsigned port, |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
266 endpoints_[peer_port].reset(); | 274 endpoints_[peer_port].reset(); |
267 } | 275 } |
268 | 276 |
269 endpoints_[port]->Close(); | 277 endpoints_[port]->Close(); |
270 endpoints_[port].reset(replacement_endpoint); | 278 endpoints_[port].reset(replacement_endpoint); |
271 | 279 |
272 *actual_size = channel->GetSerializedEndpointSize(); | 280 *actual_size = channel->GetSerializedEndpointSize(); |
273 return true; | 281 return true; |
274 } | 282 } |
275 | 283 |
276 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { | 284 bool MessagePipe::OnReadMessage( |
| 285 unsigned port, |
| 286 MessageInTransit::ReadContext& reading_message) { |
277 base::AutoLock locker(lock_); | 287 base::AutoLock locker(lock_); |
278 | 288 |
279 if (!endpoints_[port]) { | 289 if (!endpoints_[port]) { |
280 // This will happen only on the rare occasion that the call to | 290 // This will happen only on the rare occasion that the call to |
281 // |OnReadMessage()| is racing with us calling | 291 // |OnReadMessage()| is racing with us calling |
282 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, | 292 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
283 // and the |ChannelEndpoint| can retry (calling the new client's | 293 // and the |ChannelEndpoint| can retry (calling the new client's |
284 // |OnReadMessage()|). | 294 // |OnReadMessage()|). |
285 return false; | 295 return false; |
286 } | 296 } |
287 | 297 |
288 // This is called when the |ChannelEndpoint| for the | 298 // This is called when the |ChannelEndpoint| for the |
289 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). | 299 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). |
290 // We need to pass this message on to its peer port (typically a | 300 // We need to pass this message on to its peer port (typically a |
291 // |LocalMessagePipeEndpoint|). | 301 // |LocalMessagePipeEndpoint|). |
292 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port), | 302 MojoResult result = |
293 make_scoped_ptr(message), nullptr); | 303 DispatchOnReadMessageNoLock(GetPeerPort(port), reading_message); |
294 DLOG_IF(WARNING, result != MOJO_RESULT_OK) | 304 DLOG_IF(WARNING, result != MOJO_RESULT_OK) |
295 << "EnqueueMessageNoLock() failed (result = " << result << ")"; | 305 << "DispatchOnReadMessageNoLock() failed (result = " << result << ")"; |
296 return true; | 306 return true; |
297 } | 307 } |
298 | 308 |
299 void MessagePipe::OnDetachFromChannel(unsigned port) { | 309 void MessagePipe::OnDetachFromChannel(unsigned port) { |
300 Close(port); | 310 Close(port); |
301 } | 311 } |
302 | 312 |
303 MessagePipe::MessagePipe() { | 313 MessagePipe::MessagePipe() { |
304 } | 314 } |
305 | 315 |
306 MessagePipe::~MessagePipe() { | 316 MessagePipe::~MessagePipe() { |
307 // Owned by the dispatchers. The owning dispatchers should only release us via | 317 // Owned by the dispatchers. The owning dispatchers should only release us via |
308 // their |Close()| method, which should inform us of being closed via our | 318 // their |Close()| method, which should inform us of being closed via our |
309 // |Close()|. Thus these should already be null. | 319 // |Close()|. Thus these should already be null. |
310 DCHECK(!endpoints_[0]); | 320 DCHECK(!endpoints_[0]); |
311 DCHECK(!endpoints_[1]); | 321 DCHECK(!endpoints_[1]); |
312 } | 322 } |
313 | 323 |
| 324 MojoResult MessagePipe::DispatchOnReadMessageNoLock( |
| 325 unsigned port, |
| 326 MessageInTransit::ReadContext& reading_message) { |
| 327 DCHECK(port == 0 || port == 1); |
| 328 |
| 329 DCHECK_EQ(reading_message.view().type(), MessageInTransit::kTypeEndpoint); |
| 330 DCHECK(endpoints_[GetPeerPort(port)]); |
| 331 |
| 332 // The destination port need not be open, unlike the source port. |
| 333 if (!endpoints_[port]) |
| 334 return MOJO_RESULT_FAILED_PRECONDITION; |
| 335 |
| 336 // The endpoint's |EnqueueMessage()| may not report failure. |
| 337 endpoints_[port]->OnReadMessage(reading_message); |
| 338 return MOJO_RESULT_OK; |
| 339 } |
| 340 |
314 MojoResult MessagePipe::EnqueueMessageNoLock( | 341 MojoResult MessagePipe::EnqueueMessageNoLock( |
315 unsigned port, | 342 unsigned port, |
316 scoped_ptr<MessageInTransit> message, | 343 scoped_ptr<MessageInTransit> message, |
317 std::vector<DispatcherTransport>* transports) { | 344 std::vector<DispatcherTransport>* transports) { |
318 DCHECK(port == 0 || port == 1); | 345 DCHECK(port == 0 || port == 1); |
319 DCHECK(message); | 346 DCHECK(message); |
320 | 347 |
321 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); | 348 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); |
322 DCHECK(endpoints_[GetPeerPort(port)]); | 349 DCHECK(endpoints_[GetPeerPort(port)]); |
323 | 350 |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
375 LOG(WARNING) << "Enqueueing null dispatcher"; | 402 LOG(WARNING) << "Enqueueing null dispatcher"; |
376 dispatchers->push_back(nullptr); | 403 dispatchers->push_back(nullptr); |
377 } | 404 } |
378 } | 405 } |
379 message->SetDispatchers(dispatchers.Pass()); | 406 message->SetDispatchers(dispatchers.Pass()); |
380 return MOJO_RESULT_OK; | 407 return MOJO_RESULT_OK; |
381 } | 408 } |
382 | 409 |
383 } // namespace system | 410 } // namespace system |
384 } // namespace mojo | 411 } // namespace mojo |
OLD | NEW |