| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/remote_producer_data_pipe_impl.h" | 5 #include "mojo/edk/system/remote_producer_data_pipe_impl.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <memory> |
| 10 #include <utility> | 11 #include <utility> |
| 11 | 12 |
| 12 #include "base/logging.h" | 13 #include "base/logging.h" |
| 13 #include "base/memory/scoped_ptr.h" | |
| 14 #include "mojo/edk/system/channel.h" | 14 #include "mojo/edk/system/channel.h" |
| 15 #include "mojo/edk/system/channel_endpoint.h" | 15 #include "mojo/edk/system/channel_endpoint.h" |
| 16 #include "mojo/edk/system/configuration.h" | 16 #include "mojo/edk/system/configuration.h" |
| 17 #include "mojo/edk/system/data_pipe.h" | 17 #include "mojo/edk/system/data_pipe.h" |
| 18 #include "mojo/edk/system/message_in_transit.h" | 18 #include "mojo/edk/system/message_in_transit.h" |
| 19 #include "mojo/edk/system/message_in_transit_queue.h" | 19 #include "mojo/edk/system/message_in_transit_queue.h" |
| 20 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" | 20 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
| 21 #include "mojo/edk/system/remote_data_pipe_ack.h" | 21 #include "mojo/edk/system/remote_data_pipe_ack.h" |
| 22 | 22 |
| 23 namespace mojo { | 23 namespace mojo { |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 90 const size_t element_num_bytes = validated_options.element_num_bytes; | 90 const size_t element_num_bytes = validated_options.element_num_bytes; |
| 91 const size_t capacity_num_bytes = validated_options.capacity_num_bytes; | 91 const size_t capacity_num_bytes = validated_options.capacity_num_bytes; |
| 92 | 92 |
| 93 std::unique_ptr<char, base::AlignedFreeDeleter> new_buffer(static_cast<char*>( | 93 std::unique_ptr<char, base::AlignedFreeDeleter> new_buffer(static_cast<char*>( |
| 94 base::AlignedAlloc(capacity_num_bytes, | 94 base::AlignedAlloc(capacity_num_bytes, |
| 95 GetConfiguration().data_pipe_buffer_alignment_bytes))); | 95 GetConfiguration().data_pipe_buffer_alignment_bytes))); |
| 96 | 96 |
| 97 size_t current_num_bytes = 0; | 97 size_t current_num_bytes = 0; |
| 98 if (messages) { | 98 if (messages) { |
| 99 while (!messages->IsEmpty()) { | 99 while (!messages->IsEmpty()) { |
| 100 scoped_ptr<MessageInTransit> message(messages->GetMessage()); | 100 std::unique_ptr<MessageInTransit> message(messages->GetMessage()); |
| 101 if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, | 101 if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, |
| 102 current_num_bytes, message.get())) { | 102 current_num_bytes, message.get())) { |
| 103 messages->Clear(); | 103 messages->Clear(); |
| 104 return false; | 104 return false; |
| 105 } | 105 } |
| 106 | 106 |
| 107 memcpy(new_buffer.get() + current_num_bytes, message->bytes(), | 107 memcpy(new_buffer.get() + current_num_bytes, message->bytes(), |
| 108 message->num_bytes()); | 108 message->num_bytes()); |
| 109 current_num_bytes += message->num_bytes(); | 109 current_num_bytes += message->num_bytes(); |
| 110 } | 110 } |
| (...skipping 243 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 354 // |OnReadMessage()| is racing with us calling | 354 // |OnReadMessage()| is racing with us calling |
| 355 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, | 355 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
| 356 // and the |ChannelEndpoint| can retry (calling the new client's | 356 // and the |ChannelEndpoint| can retry (calling the new client's |
| 357 // |OnReadMessage()|). | 357 // |OnReadMessage()|). |
| 358 DCHECK(!channel_endpoint_); | 358 DCHECK(!channel_endpoint_); |
| 359 return false; | 359 return false; |
| 360 } | 360 } |
| 361 | 361 |
| 362 // Otherwise, we take ownership of the message. (This means that we should | 362 // Otherwise, we take ownership of the message. (This means that we should |
| 363 // always return true below.) | 363 // always return true below.) |
| 364 scoped_ptr<MessageInTransit> msg(message); | 364 std::unique_ptr<MessageInTransit> msg(message); |
| 365 | 365 |
| 366 if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), | 366 if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), |
| 367 current_num_bytes_, msg.get())) { | 367 current_num_bytes_, msg.get())) { |
| 368 Disconnect(); | 368 Disconnect(); |
| 369 return true; | 369 return true; |
| 370 } | 370 } |
| 371 | 371 |
| 372 size_t num_bytes = msg->num_bytes(); | 372 size_t num_bytes = msg->num_bytes(); |
| 373 // The amount we can write in our first copy. | 373 // The amount we can write in our first copy. |
| 374 size_t num_bytes_to_copy_first = std::min(num_bytes, GetMaxNumBytesToWrite()); | 374 size_t num_bytes_to_copy_first = std::min(num_bytes, GetMaxNumBytesToWrite()); |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 443 start_index_ %= capacity_num_bytes(); | 443 start_index_ %= capacity_num_bytes(); |
| 444 current_num_bytes_ -= num_bytes; | 444 current_num_bytes_ -= num_bytes; |
| 445 | 445 |
| 446 if (!producer_open()) { | 446 if (!producer_open()) { |
| 447 DCHECK(!channel_endpoint_); | 447 DCHECK(!channel_endpoint_); |
| 448 return; | 448 return; |
| 449 } | 449 } |
| 450 | 450 |
| 451 RemoteDataPipeAck ack_data = {}; | 451 RemoteDataPipeAck ack_data = {}; |
| 452 ack_data.num_bytes_consumed = static_cast<uint32_t>(num_bytes); | 452 ack_data.num_bytes_consumed = static_cast<uint32_t>(num_bytes); |
| 453 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 453 std::unique_ptr<MessageInTransit> message(new MessageInTransit( |
| 454 MessageInTransit::Type::ENDPOINT_CLIENT, | 454 MessageInTransit::Type::ENDPOINT_CLIENT, |
| 455 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA_PIPE_ACK, | 455 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA_PIPE_ACK, |
| 456 static_cast<uint32_t>(sizeof(ack_data)), &ack_data)); | 456 static_cast<uint32_t>(sizeof(ack_data)), &ack_data)); |
| 457 if (!channel_endpoint_->EnqueueMessage(message.Pass())) | 457 if (!channel_endpoint_->EnqueueMessage(std::move(message))) |
| 458 Disconnect(); | 458 Disconnect(); |
| 459 } | 459 } |
| 460 | 460 |
| 461 void RemoteProducerDataPipeImpl::Disconnect() { | 461 void RemoteProducerDataPipeImpl::Disconnect() { |
| 462 DCHECK(producer_open()); | 462 DCHECK(producer_open()); |
| 463 DCHECK(channel_endpoint_); | 463 DCHECK(channel_endpoint_); |
| 464 SetProducerClosed(); | 464 SetProducerClosed(); |
| 465 channel_endpoint_->DetachFromClient(); | 465 channel_endpoint_->DetachFromClient(); |
| 466 channel_endpoint_ = nullptr; | 466 channel_endpoint_ = nullptr; |
| 467 // If the consumer is still open and we still have data, we have to keep the | 467 // If the consumer is still open and we still have data, we have to keep the |
| 468 // buffer around. Currently, we won't free it even if it empties later. (We | 468 // buffer around. Currently, we won't free it even if it empties later. (We |
| 469 // could do this -- requiring a check on every read -- but that seems to be | 469 // could do this -- requiring a check on every read -- but that seems to be |
| 470 // optimizing for the uncommon case.) | 470 // optimizing for the uncommon case.) |
| 471 if (!consumer_open() || !current_num_bytes_) | 471 if (!consumer_open() || !current_num_bytes_) |
| 472 DestroyBuffer(); | 472 DestroyBuffer(); |
| 473 } | 473 } |
| 474 | 474 |
| 475 } // namespace system | 475 } // namespace system |
| 476 } // namespace mojo | 476 } // namespace mojo |
| OLD | NEW |