OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" | 5 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <memory> |
| 11 #include <utility> |
10 | 12 |
11 #include "base/logging.h" | 13 #include "base/logging.h" |
12 #include "base/memory/scoped_ptr.h" | |
13 #include "mojo/edk/system/channel.h" | 14 #include "mojo/edk/system/channel.h" |
14 #include "mojo/edk/system/channel_endpoint.h" | 15 #include "mojo/edk/system/channel_endpoint.h" |
15 #include "mojo/edk/system/configuration.h" | 16 #include "mojo/edk/system/configuration.h" |
16 #include "mojo/edk/system/data_pipe.h" | 17 #include "mojo/edk/system/data_pipe.h" |
17 #include "mojo/edk/system/message_in_transit.h" | 18 #include "mojo/edk/system/message_in_transit.h" |
18 #include "mojo/edk/system/remote_data_pipe_ack.h" | 19 #include "mojo/edk/system/remote_data_pipe_ack.h" |
19 | 20 |
20 namespace mojo { | 21 namespace mojo { |
21 namespace system { | 22 namespace system { |
22 | 23 |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
89 // static | 90 // static |
90 bool RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( | 91 bool RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
91 const MojoCreateDataPipeOptions& validated_options, | 92 const MojoCreateDataPipeOptions& validated_options, |
92 size_t* consumer_num_bytes, | 93 size_t* consumer_num_bytes, |
93 MessageInTransitQueue* messages) { | 94 MessageInTransitQueue* messages) { |
94 const size_t element_num_bytes = validated_options.element_num_bytes; | 95 const size_t element_num_bytes = validated_options.element_num_bytes; |
95 const size_t capacity_num_bytes = validated_options.capacity_num_bytes; | 96 const size_t capacity_num_bytes = validated_options.capacity_num_bytes; |
96 | 97 |
97 if (messages) { | 98 if (messages) { |
98 while (!messages->IsEmpty()) { | 99 while (!messages->IsEmpty()) { |
99 scoped_ptr<MessageInTransit> message(messages->GetMessage()); | 100 std::unique_ptr<MessageInTransit> message(messages->GetMessage()); |
100 if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, | 101 if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, |
101 *consumer_num_bytes, message.get())) { | 102 *consumer_num_bytes, message.get())) { |
102 messages->Clear(); | 103 messages->Clear(); |
103 return false; | 104 return false; |
104 } | 105 } |
105 | 106 |
106 const RemoteDataPipeAck* ack = | 107 const RemoteDataPipeAck* ack = |
107 static_cast<const RemoteDataPipeAck*>(message->bytes()); | 108 static_cast<const RemoteDataPipeAck*>(message->bytes()); |
108 size_t num_bytes_consumed = ack->num_bytes_consumed; | 109 size_t num_bytes_consumed = ack->num_bytes_consumed; |
109 *consumer_num_bytes -= num_bytes_consumed; | 110 *consumer_num_bytes -= num_bytes_consumed; |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
141 // element size. | 142 // element size. |
142 // TODO(vtl): Copied from |LocalDataPipeImpl::ConvertDataToMessages()|. | 143 // TODO(vtl): Copied from |LocalDataPipeImpl::ConvertDataToMessages()|. |
143 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | 144 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
144 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); | 145 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); |
145 DCHECK_GT(max_message_num_bytes, 0u); | 146 DCHECK_GT(max_message_num_bytes, 0u); |
146 | 147 |
147 size_t offset = 0; | 148 size_t offset = 0; |
148 while (offset < num_bytes_to_write) { | 149 while (offset < num_bytes_to_write) { |
149 size_t message_num_bytes = | 150 size_t message_num_bytes = |
150 std::min(max_message_num_bytes, num_bytes_to_write - offset); | 151 std::min(max_message_num_bytes, num_bytes_to_write - offset); |
151 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 152 std::unique_ptr<MessageInTransit> message(new MessageInTransit( |
152 MessageInTransit::Type::ENDPOINT_CLIENT, | 153 MessageInTransit::Type::ENDPOINT_CLIENT, |
153 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, | 154 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, |
154 static_cast<uint32_t>(message_num_bytes), elements.At(offset))); | 155 static_cast<uint32_t>(message_num_bytes), elements.At(offset))); |
155 if (!channel_endpoint_->EnqueueMessage(message.Pass())) { | 156 if (!channel_endpoint_->EnqueueMessage(std::move(message))) { |
156 Disconnect(); | 157 Disconnect(); |
157 break; | 158 break; |
158 } | 159 } |
159 | 160 |
160 offset += message_num_bytes; | 161 offset += message_num_bytes; |
161 consumer_num_bytes_ += message_num_bytes; | 162 consumer_num_bytes_ += message_num_bytes; |
162 } | 163 } |
163 | 164 |
164 DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); | 165 DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); |
165 // TODO(vtl): We report |num_bytes_to_write|, instead of |offset|, even if we | 166 // TODO(vtl): We report |num_bytes_to_write|, instead of |offset|, even if we |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
220 // element size. | 221 // element size. |
221 // TODO(vtl): Mostly copied from |LocalDataPipeImpl::ConvertDataToMessages()|. | 222 // TODO(vtl): Mostly copied from |LocalDataPipeImpl::ConvertDataToMessages()|. |
222 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | 223 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
223 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); | 224 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); |
224 DCHECK_GT(max_message_num_bytes, 0u); | 225 DCHECK_GT(max_message_num_bytes, 0u); |
225 | 226 |
226 size_t offset = 0; | 227 size_t offset = 0; |
227 while (offset < num_bytes_written) { | 228 while (offset < num_bytes_written) { |
228 size_t message_num_bytes = | 229 size_t message_num_bytes = |
229 std::min(max_message_num_bytes, num_bytes_written - offset); | 230 std::min(max_message_num_bytes, num_bytes_written - offset); |
230 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 231 std::unique_ptr<MessageInTransit> message(new MessageInTransit( |
231 MessageInTransit::Type::ENDPOINT_CLIENT, | 232 MessageInTransit::Type::ENDPOINT_CLIENT, |
232 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, | 233 MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA, |
233 static_cast<uint32_t>(message_num_bytes), buffer_.get() + offset)); | 234 static_cast<uint32_t>(message_num_bytes), buffer_.get() + offset)); |
234 if (!channel_endpoint_->EnqueueMessage(message.Pass())) { | 235 if (!channel_endpoint_->EnqueueMessage(std::move(message))) { |
235 set_producer_two_phase_max_num_bytes_written(0); | 236 set_producer_two_phase_max_num_bytes_written(0); |
236 Disconnect(); | 237 Disconnect(); |
237 return MOJO_RESULT_OK; | 238 return MOJO_RESULT_OK; |
238 } | 239 } |
239 | 240 |
240 offset += message_num_bytes; | 241 offset += message_num_bytes; |
241 consumer_num_bytes_ += message_num_bytes; | 242 consumer_num_bytes_ += message_num_bytes; |
242 } | 243 } |
243 | 244 |
244 DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); | 245 DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
366 size_t* /*actual_size*/, | 367 size_t* /*actual_size*/, |
367 embedder::PlatformHandleVector* /*platform_handles*/) { | 368 embedder::PlatformHandleVector* /*platform_handles*/) { |
368 NOTREACHED(); | 369 NOTREACHED(); |
369 return false; | 370 return false; |
370 } | 371 } |
371 | 372 |
372 bool RemoteConsumerDataPipeImpl::OnReadMessage(unsigned /*port*/, | 373 bool RemoteConsumerDataPipeImpl::OnReadMessage(unsigned /*port*/, |
373 MessageInTransit* message) { | 374 MessageInTransit* message) { |
374 // Always take ownership of the message. (This means that we should always | 375 // Always take ownership of the message. (This means that we should always |
375 // return true.) | 376 // return true.) |
376 scoped_ptr<MessageInTransit> msg(message); | 377 std::unique_ptr<MessageInTransit> msg(message); |
377 | 378 |
378 if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), | 379 if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), |
379 consumer_num_bytes_, msg.get())) { | 380 consumer_num_bytes_, msg.get())) { |
380 Disconnect(); | 381 Disconnect(); |
381 return true; | 382 return true; |
382 } | 383 } |
383 | 384 |
384 const RemoteDataPipeAck* ack = | 385 const RemoteDataPipeAck* ack = |
385 static_cast<const RemoteDataPipeAck*>(msg->bytes()); | 386 static_cast<const RemoteDataPipeAck*>(msg->bytes()); |
386 size_t num_bytes_consumed = ack->num_bytes_consumed; | 387 size_t num_bytes_consumed = ack->num_bytes_consumed; |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
421 DCHECK(channel_endpoint_); | 422 DCHECK(channel_endpoint_); |
422 SetConsumerClosed(); | 423 SetConsumerClosed(); |
423 channel_endpoint_->DetachFromClient(); | 424 channel_endpoint_->DetachFromClient(); |
424 channel_endpoint_ = nullptr; | 425 channel_endpoint_ = nullptr; |
425 if (!producer_in_two_phase_write()) | 426 if (!producer_in_two_phase_write()) |
426 DestroyBuffer(); | 427 DestroyBuffer(); |
427 } | 428 } |
428 | 429 |
429 } // namespace system | 430 } // namespace system |
430 } // namespace mojo | 431 } // namespace mojo |
OLD | NEW |