| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" | 5 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <memory> |
| 11 #include <utility> |
| 10 | 12 |
| 11 #include "base/logging.h" | 13 #include "base/logging.h" |
| 12 #include "base/memory/scoped_ptr.h" | |
| 13 #include "mojo/edk/system/channel.h" | 14 #include "mojo/edk/system/channel.h" |
| 14 #include "mojo/edk/system/channel_endpoint.h" | 15 #include "mojo/edk/system/channel_endpoint.h" |
| 15 #include "mojo/edk/system/configuration.h" | 16 #include "mojo/edk/system/configuration.h" |
| 16 #include "mojo/edk/system/data_pipe.h" | 17 #include "mojo/edk/system/data_pipe.h" |
| 17 #include "mojo/edk/system/message_in_transit.h" | 18 #include "mojo/edk/system/message_in_transit.h" |
| 18 #include "mojo/edk/system/remote_data_pipe_ack.h" | 19 #include "mojo/edk/system/remote_data_pipe_ack.h" |
| 19 | 20 |
| 20 namespace mojo { | 21 namespace mojo { |
| 21 namespace system { | 22 namespace system { |
| 22 | 23 |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 89 // static | 90 // static |
| 90 bool RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( | 91 bool RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
| 91 const MojoCreateDataPipeOptions& validated_options, | 92 const MojoCreateDataPipeOptions& validated_options, |
| 92 size_t* consumer_num_bytes, | 93 size_t* consumer_num_bytes, |
| 93 MessageInTransitQueue* messages) { | 94 MessageInTransitQueue* messages) { |
| 94 const size_t element_num_bytes = validated_options.element_num_bytes; | 95 const size_t element_num_bytes = validated_options.element_num_bytes; |
| 95 const size_t capacity_num_bytes = validated_options.capacity_num_bytes; | 96 const size_t capacity_num_bytes = validated_options.capacity_num_bytes; |
| 96 | 97 |
| 97 if (messages) { | 98 if (messages) { |
| 98 while (!messages->IsEmpty()) { | 99 while (!messages->IsEmpty()) { |
| 99 scoped_ptr<MessageInTransit> message(messages->GetMessage()); | 100 std::unique_ptr<MessageInTransit> message(messages->GetMessage()); |
| 100 if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, | 101 if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, |
| 101 *consumer_num_bytes, message.get())) { | 102 *consumer_num_bytes, message.get())) { |
| 102 messages->Clear(); | 103 messages->Clear(); |
| 103 return false; | 104 return false; |
| 104 } | 105 } |
| 105 | 106 |
| 106 const RemoteDataPipeAck* ack = | 107 const RemoteDataPipeAck* ack = |
| 107 static_cast<const RemoteDataPipeAck*>(message->bytes()); | 108 static_cast<const RemoteDataPipeAck*>(message->bytes()); |
| 108 size_t num_bytes_consumed = ack->num_bytes_consumed; | 109 size_t num_bytes_consumed = ack->num_bytes_consumed; |
| 109 *consumer_num_bytes -= num_bytes_consumed; | 110 *consumer_num_bytes -= num_bytes_consumed; |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 141 // element size. | 142 // element size. |
| 142 // TODO(vtl): Copied from |LocalDataPipeImpl::ConvertDataToMessages()|. | 143 // TODO(vtl): Copied from |LocalDataPipeImpl::ConvertDataToMessages()|. |
| 143 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | 144 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
| 144 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); | 145 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); |
| 145 DCHECK_GT(max_message_num_bytes, 0u); | 146 DCHECK_GT(max_message_num_bytes, 0u); |
| 146 | 147 |
| 147 size_t offset = 0; | 148 size_t offset = 0; |
| 148 while (offset < num_bytes_to_write) { | 149 while (offset < num_bytes_to_write) { |
| 149 size_t message_num_bytes = | 150 size_t message_num_bytes = |
| 150 std::min(max_message_num_bytes, num_bytes_to_write - offset); | 151 std::min(max_message_num_bytes, num_bytes_to_write - offset); |
| 151 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 152 std::unique_ptr<MessageInTransit> message(new MessageInTransit( |
| 152 MessageInTransit::Type::ENDPOINT_CLIENT, | 153 MessageInTransit::Type::ENDPOINT_CLIENT, |
| 153 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, | 154 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, |
| 154 static_cast<uint32_t>(message_num_bytes), elements.At(offset))); | 155 static_cast<uint32_t>(message_num_bytes), elements.At(offset))); |
| 155 if (!channel_endpoint_->EnqueueMessage(message.Pass())) { | 156 if (!channel_endpoint_->EnqueueMessage(std::move(message))) { |
| 156 Disconnect(); | 157 Disconnect(); |
| 157 break; | 158 break; |
| 158 } | 159 } |
| 159 | 160 |
| 160 offset += message_num_bytes; | 161 offset += message_num_bytes; |
| 161 consumer_num_bytes_ += message_num_bytes; | 162 consumer_num_bytes_ += message_num_bytes; |
| 162 } | 163 } |
| 163 | 164 |
| 164 DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); | 165 DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); |
| 165 // TODO(vtl): We report |num_bytes_to_write|, instead of |offset|, even if we | 166 // TODO(vtl): We report |num_bytes_to_write|, instead of |offset|, even if we |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 220 // element size. | 221 // element size. |
| 221 // TODO(vtl): Mostly copied from |LocalDataPipeImpl::ConvertDataToMessages()|. | 222 // TODO(vtl): Mostly copied from |LocalDataPipeImpl::ConvertDataToMessages()|. |
| 222 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | 223 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
| 223 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); | 224 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); |
| 224 DCHECK_GT(max_message_num_bytes, 0u); | 225 DCHECK_GT(max_message_num_bytes, 0u); |
| 225 | 226 |
| 226 size_t offset = 0; | 227 size_t offset = 0; |
| 227 while (offset < num_bytes_written) { | 228 while (offset < num_bytes_written) { |
| 228 size_t message_num_bytes = | 229 size_t message_num_bytes = |
| 229 std::min(max_message_num_bytes, num_bytes_written - offset); | 230 std::min(max_message_num_bytes, num_bytes_written - offset); |
| 230 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 231 std::unique_ptr<MessageInTransit> message(new MessageInTransit( |
| 231 MessageInTransit::Type::ENDPOINT_CLIENT, | 232 MessageInTransit::Type::ENDPOINT_CLIENT, |
| 232 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, | 233 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, |
| 233 static_cast<uint32_t>(message_num_bytes), buffer_.get() + offset)); | 234 static_cast<uint32_t>(message_num_bytes), buffer_.get() + offset)); |
| 234 if (!channel_endpoint_->EnqueueMessage(message.Pass())) { | 235 if (!channel_endpoint_->EnqueueMessage(std::move(message))) { |
| 235 set_producer_two_phase_max_num_bytes_written(0); | 236 set_producer_two_phase_max_num_bytes_written(0); |
| 236 Disconnect(); | 237 Disconnect(); |
| 237 return MOJO_RESULT_OK; | 238 return MOJO_RESULT_OK; |
| 238 } | 239 } |
| 239 | 240 |
| 240 offset += message_num_bytes; | 241 offset += message_num_bytes; |
| 241 consumer_num_bytes_ += message_num_bytes; | 242 consumer_num_bytes_ += message_num_bytes; |
| 242 } | 243 } |
| 243 | 244 |
| 244 DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); | 245 DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 366 size_t* /*actual_size*/, | 367 size_t* /*actual_size*/, |
| 367 embedder::PlatformHandleVector* /*platform_handles*/) { | 368 embedder::PlatformHandleVector* /*platform_handles*/) { |
| 368 NOTREACHED(); | 369 NOTREACHED(); |
| 369 return false; | 370 return false; |
| 370 } | 371 } |
| 371 | 372 |
| 372 bool RemoteConsumerDataPipeImpl::OnReadMessage(unsigned /*port*/, | 373 bool RemoteConsumerDataPipeImpl::OnReadMessage(unsigned /*port*/, |
| 373 MessageInTransit* message) { | 374 MessageInTransit* message) { |
| 374 // Always take ownership of the message. (This means that we should always | 375 // Always take ownership of the message. (This means that we should always |
| 375 // return true.) | 376 // return true.) |
| 376 scoped_ptr<MessageInTransit> msg(message); | 377 std::unique_ptr<MessageInTransit> msg(message); |
| 377 | 378 |
| 378 if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), | 379 if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), |
| 379 consumer_num_bytes_, msg.get())) { | 380 consumer_num_bytes_, msg.get())) { |
| 380 Disconnect(); | 381 Disconnect(); |
| 381 return true; | 382 return true; |
| 382 } | 383 } |
| 383 | 384 |
| 384 const RemoteDataPipeAck* ack = | 385 const RemoteDataPipeAck* ack = |
| 385 static_cast<const RemoteDataPipeAck*>(msg->bytes()); | 386 static_cast<const RemoteDataPipeAck*>(msg->bytes()); |
| 386 size_t num_bytes_consumed = ack->num_bytes_consumed; | 387 size_t num_bytes_consumed = ack->num_bytes_consumed; |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 421 DCHECK(channel_endpoint_); | 422 DCHECK(channel_endpoint_); |
| 422 SetConsumerClosed(); | 423 SetConsumerClosed(); |
| 423 channel_endpoint_->DetachFromClient(); | 424 channel_endpoint_->DetachFromClient(); |
| 424 channel_endpoint_ = nullptr; | 425 channel_endpoint_ = nullptr; |
| 425 if (!producer_in_two_phase_write()) | 426 if (!producer_in_two_phase_write()) |
| 426 DestroyBuffer(); | 427 DestroyBuffer(); |
| 427 } | 428 } |
| 428 | 429 |
| 429 } // namespace system | 430 } // namespace system |
| 430 } // namespace mojo | 431 } // namespace mojo |
| OLD | NEW |