OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/remote_producer_data_pipe_impl.h" | 5 #include "mojo/edk/system/remote_producer_data_pipe_impl.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
11 #include "base/logging.h" | 11 #include "base/logging.h" |
12 #include "base/memory/scoped_ptr.h" | 12 #include "base/memory/scoped_ptr.h" |
| 13 #include "mojo/edk/system/channel.h" |
13 #include "mojo/edk/system/channel_endpoint.h" | 14 #include "mojo/edk/system/channel_endpoint.h" |
14 #include "mojo/edk/system/configuration.h" | 15 #include "mojo/edk/system/configuration.h" |
15 #include "mojo/edk/system/data_pipe.h" | 16 #include "mojo/edk/system/data_pipe.h" |
16 #include "mojo/edk/system/message_in_transit.h" | 17 #include "mojo/edk/system/message_in_transit.h" |
17 #include "mojo/edk/system/message_in_transit_queue.h" | 18 #include "mojo/edk/system/message_in_transit_queue.h" |
18 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" | 19 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
19 #include "mojo/edk/system/remote_data_pipe_ack.h" | 20 #include "mojo/edk/system/remote_data_pipe_ack.h" |
20 | 21 |
21 namespace mojo { | 22 namespace mojo { |
22 namespace system { | 23 namespace system { |
(...skipping 272 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
295 if (!producer_open()) | 296 if (!producer_open()) |
296 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 297 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
297 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 298 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
298 return rv; | 299 return rv; |
299 } | 300 } |
300 | 301 |
301 void RemoteProducerDataPipeImpl::ConsumerStartSerialize( | 302 void RemoteProducerDataPipeImpl::ConsumerStartSerialize( |
302 Channel* channel, | 303 Channel* channel, |
303 size_t* max_size, | 304 size_t* max_size, |
304 size_t* max_platform_handles) { | 305 size_t* max_platform_handles) { |
305 // TODO(vtl): Support serializing consumer data pipe handles. | 306 *max_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
306 NOTIMPLEMENTED(); // FIXME | 307 channel->GetSerializedEndpointSize(); |
307 *max_size = 0; | |
308 *max_platform_handles = 0; | 308 *max_platform_handles = 0; |
309 } | 309 } |
310 | 310 |
311 bool RemoteProducerDataPipeImpl::ConsumerEndSerialize( | 311 bool RemoteProducerDataPipeImpl::ConsumerEndSerialize( |
312 Channel* channel, | 312 Channel* channel, |
313 void* destination, | 313 void* destination, |
314 size_t* actual_size, | 314 size_t* actual_size, |
315 embedder::PlatformHandleVector* platform_handles) { | 315 embedder::PlatformHandleVector* platform_handles) { |
316 // TODO(vtl): Support serializing consumer data pipe handles. | 316 SerializedDataPipeConsumerDispatcher* s = |
317 NOTIMPLEMENTED(); // FIXME | 317 static_cast<SerializedDataPipeConsumerDispatcher*>(destination); |
318 owner()->ConsumerCloseNoLock(); | 318 s->validated_options = validated_options(); |
319 return false; | 319 void* destination_for_endpoint = static_cast<char*>(destination) + |
| 320 sizeof(SerializedDataPipeConsumerDispatcher); |
| 321 |
| 322 MessageInTransitQueue message_queue; |
| 323 ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, |
| 324 &message_queue); |
| 325 |
| 326 if (!producer_open()) { |
| 327 // Case 1: The producer is closed. |
| 328 channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, |
| 329 &message_queue); |
| 330 *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
| 331 channel->GetSerializedEndpointSize(); |
| 332 return true; |
| 333 } |
| 334 |
| 335 // Case 2: The producer isn't closed. We pass |channel_endpoint| back to the |
| 336 // |Channel|. There's no reason for us to continue to exist afterwards. |
| 337 |
| 338 // Note: We don't use |port|. |
| 339 scoped_refptr<ChannelEndpoint> channel_endpoint; |
| 340 channel_endpoint.swap(channel_endpoint_); |
| 341 channel->SerializeEndpointWithRemotePeer(destination_for_endpoint, |
| 342 &message_queue, channel_endpoint); |
| 343 owner()->SetProducerClosedNoLock(); |
| 344 |
| 345 *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
| 346 channel->GetSerializedEndpointSize(); |
| 347 return true; |
320 } | 348 } |
321 | 349 |
322 bool RemoteProducerDataPipeImpl::OnReadMessage(unsigned /*port*/, | 350 bool RemoteProducerDataPipeImpl::OnReadMessage(unsigned /*port*/, |
323 MessageInTransit* message) { | 351 MessageInTransit* message) { |
324 // Always take ownership of the message. (This means that we should always | 352 // Always take ownership of the message. (This means that we should always |
325 // return true.) | 353 // return true.) |
326 scoped_ptr<MessageInTransit> msg(message); | 354 scoped_ptr<MessageInTransit> msg(message); |
327 | 355 |
328 if (!producer_open()) { | 356 if (!producer_open()) { |
329 DCHECK(!channel_endpoint_); | 357 DCHECK(!channel_endpoint_); |
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
438 if (!consumer_open() || !current_num_bytes_) { | 466 if (!consumer_open() || !current_num_bytes_) { |
439 // Note: There can only be a two-phase *read* (by the consumer) if we still | 467 // Note: There can only be a two-phase *read* (by the consumer) if we still |
440 // have data. | 468 // have data. |
441 DCHECK(!consumer_in_two_phase_read()); | 469 DCHECK(!consumer_in_two_phase_read()); |
442 DestroyBuffer(); | 470 DestroyBuffer(); |
443 } | 471 } |
444 } | 472 } |
445 | 473 |
446 } // namespace system | 474 } // namespace system |
447 } // namespace mojo | 475 } // namespace mojo |
OLD | NEW |