OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/remote_producer_data_pipe_impl.h" | 5 #include "mojo/edk/system/remote_producer_data_pipe_impl.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <memory> |
10 #include <utility> | 11 #include <utility> |
11 | 12 |
12 #include "base/logging.h" | 13 #include "base/logging.h" |
13 #include "base/memory/scoped_ptr.h" | |
14 #include "mojo/edk/system/channel.h" | 14 #include "mojo/edk/system/channel.h" |
15 #include "mojo/edk/system/channel_endpoint.h" | 15 #include "mojo/edk/system/channel_endpoint.h" |
16 #include "mojo/edk/system/configuration.h" | 16 #include "mojo/edk/system/configuration.h" |
17 #include "mojo/edk/system/data_pipe.h" | 17 #include "mojo/edk/system/data_pipe.h" |
18 #include "mojo/edk/system/message_in_transit.h" | 18 #include "mojo/edk/system/message_in_transit.h" |
19 #include "mojo/edk/system/message_in_transit_queue.h" | 19 #include "mojo/edk/system/message_in_transit_queue.h" |
20 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" | 20 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
21 #include "mojo/edk/system/remote_data_pipe_ack.h" | 21 #include "mojo/edk/system/remote_data_pipe_ack.h" |
22 | 22 |
23 namespace mojo { | 23 namespace mojo { |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 const size_t element_num_bytes = validated_options.element_num_bytes; | 90 const size_t element_num_bytes = validated_options.element_num_bytes; |
91 const size_t capacity_num_bytes = validated_options.capacity_num_bytes; | 91 const size_t capacity_num_bytes = validated_options.capacity_num_bytes; |
92 | 92 |
93 std::unique_ptr<char, base::AlignedFreeDeleter> new_buffer(static_cast<char*>( | 93 std::unique_ptr<char, base::AlignedFreeDeleter> new_buffer(static_cast<char*>( |
94 base::AlignedAlloc(capacity_num_bytes, | 94 base::AlignedAlloc(capacity_num_bytes, |
95 GetConfiguration().data_pipe_buffer_alignment_bytes))); | 95 GetConfiguration().data_pipe_buffer_alignment_bytes))); |
96 | 96 |
97 size_t current_num_bytes = 0; | 97 size_t current_num_bytes = 0; |
98 if (messages) { | 98 if (messages) { |
99 while (!messages->IsEmpty()) { | 99 while (!messages->IsEmpty()) { |
100 scoped_ptr<MessageInTransit> message(messages->GetMessage()); | 100 std::unique_ptr<MessageInTransit> message(messages->GetMessage()); |
101 if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, | 101 if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, |
102 current_num_bytes, message.get())) { | 102 current_num_bytes, message.get())) { |
103 messages->Clear(); | 103 messages->Clear(); |
104 return false; | 104 return false; |
105 } | 105 } |
106 | 106 |
107 memcpy(new_buffer.get() + current_num_bytes, message->bytes(), | 107 memcpy(new_buffer.get() + current_num_bytes, message->bytes(), |
108 message->num_bytes()); | 108 message->num_bytes()); |
109 current_num_bytes += message->num_bytes(); | 109 current_num_bytes += message->num_bytes(); |
110 } | 110 } |
(...skipping 243 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
354 // |OnReadMessage()| is racing with us calling | 354 // |OnReadMessage()| is racing with us calling |
355 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, | 355 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
356 // and the |ChannelEndpoint| can retry (calling the new client's | 356 // and the |ChannelEndpoint| can retry (calling the new client's |
357 // |OnReadMessage()|). | 357 // |OnReadMessage()|). |
358 DCHECK(!channel_endpoint_); | 358 DCHECK(!channel_endpoint_); |
359 return false; | 359 return false; |
360 } | 360 } |
361 | 361 |
362 // Otherwise, we take ownership of the message. (This means that we should | 362 // Otherwise, we take ownership of the message. (This means that we should |
363 // always return true below.) | 363 // always return true below.) |
364 scoped_ptr<MessageInTransit> msg(message); | 364 std::unique_ptr<MessageInTransit> msg(message); |
365 | 365 |
366 if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), | 366 if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), |
367 current_num_bytes_, msg.get())) { | 367 current_num_bytes_, msg.get())) { |
368 Disconnect(); | 368 Disconnect(); |
369 return true; | 369 return true; |
370 } | 370 } |
371 | 371 |
372 size_t num_bytes = msg->num_bytes(); | 372 size_t num_bytes = msg->num_bytes(); |
373 // The amount we can write in our first copy. | 373 // The amount we can write in our first copy. |
374 size_t num_bytes_to_copy_first = std::min(num_bytes, GetMaxNumBytesToWrite()); | 374 size_t num_bytes_to_copy_first = std::min(num_bytes, GetMaxNumBytesToWrite()); |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
443 start_index_ %= capacity_num_bytes(); | 443 start_index_ %= capacity_num_bytes(); |
444 current_num_bytes_ -= num_bytes; | 444 current_num_bytes_ -= num_bytes; |
445 | 445 |
446 if (!producer_open()) { | 446 if (!producer_open()) { |
447 DCHECK(!channel_endpoint_); | 447 DCHECK(!channel_endpoint_); |
448 return; | 448 return; |
449 } | 449 } |
450 | 450 |
451 RemoteDataPipeAck ack_data = {}; | 451 RemoteDataPipeAck ack_data = {}; |
452 ack_data.num_bytes_consumed = static_cast<uint32_t>(num_bytes); | 452 ack_data.num_bytes_consumed = static_cast<uint32_t>(num_bytes); |
453 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 453 std::unique_ptr<MessageInTransit> message(new MessageInTransit( |
454 MessageInTransit::Type::ENDPOINT_CLIENT, | 454 MessageInTransit::Type::ENDPOINT_CLIENT, |
455 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA_PIPE_ACK, | 455 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA_PIPE_ACK, |
456 static_cast<uint32_t>(sizeof(ack_data)), &ack_data)); | 456 static_cast<uint32_t>(sizeof(ack_data)), &ack_data)); |
457 if (!channel_endpoint_->EnqueueMessage(message.Pass())) | 457 if (!channel_endpoint_->EnqueueMessage(std::move(message))) |
458 Disconnect(); | 458 Disconnect(); |
459 } | 459 } |
460 | 460 |
461 void RemoteProducerDataPipeImpl::Disconnect() { | 461 void RemoteProducerDataPipeImpl::Disconnect() { |
462 DCHECK(producer_open()); | 462 DCHECK(producer_open()); |
463 DCHECK(channel_endpoint_); | 463 DCHECK(channel_endpoint_); |
464 SetProducerClosed(); | 464 SetProducerClosed(); |
465 channel_endpoint_->DetachFromClient(); | 465 channel_endpoint_->DetachFromClient(); |
466 channel_endpoint_ = nullptr; | 466 channel_endpoint_ = nullptr; |
467 // If the consumer is still open and we still have data, we have to keep the | 467 // If the consumer is still open and we still have data, we have to keep the |
468 // buffer around. Currently, we won't free it even if it empties later. (We | 468 // buffer around. Currently, we won't free it even if it empties later. (We |
469 // could do this -- requiring a check on every read -- but that seems to be | 469 // could do this -- requiring a check on every read -- but that seems to be |
470 // optimizing for the uncommon case.) | 470 // optimizing for the uncommon case.) |
471 if (!consumer_open() || !current_num_bytes_) | 471 if (!consumer_open() || !current_num_bytes_) |
472 DestroyBuffer(); | 472 DestroyBuffer(); |
473 } | 473 } |
474 | 474 |
475 } // namespace system | 475 } // namespace system |
476 } // namespace mojo | 476 } // namespace mojo |
OLD | NEW |