OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // TODO(vtl): I currently potentially overflow in doing index calculations. | 5 // TODO(vtl): I currently potentially overflow in doing index calculations. |
6 // E.g., |start_index_| and |current_num_bytes_| fit into a |uint32_t|, but | 6 // E.g., |start_index_| and |current_num_bytes_| fit into a |uint32_t|, but |
7 // their sum may not. This is bad and poses a security risk. (We're currently | 7 // their sum may not. This is bad and poses a security risk. (We're currently |
8 // saved by the limit on capacity -- the maximum size of the buffer, checked in | 8 // saved by the limit on capacity -- the maximum size of the buffer, checked in |
9 // |DataPipe::ValidateOptions()|, is currently sufficiently small.) | 9 // |DataPipe::ValidateOptions()|, is currently sufficiently small.) |
10 | 10 |
(...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
172 embedder::PlatformHandleVector* platform_handles) { | 172 embedder::PlatformHandleVector* platform_handles) { |
173 SerializedDataPipeProducerDispatcher* s = | 173 SerializedDataPipeProducerDispatcher* s = |
174 static_cast<SerializedDataPipeProducerDispatcher*>(destination); | 174 static_cast<SerializedDataPipeProducerDispatcher*>(destination); |
175 s->validated_options = validated_options(); | 175 s->validated_options = validated_options(); |
176 void* destination_for_endpoint = static_cast<char*>(destination) + | 176 void* destination_for_endpoint = static_cast<char*>(destination) + |
177 sizeof(SerializedDataPipeProducerDispatcher); | 177 sizeof(SerializedDataPipeProducerDispatcher); |
178 | 178 |
179 if (!consumer_open()) { | 179 if (!consumer_open()) { |
180 // Case 1: The consumer is closed. | 180 // Case 1: The consumer is closed. |
181 s->consumer_num_bytes = static_cast<size_t>(-1); | 181 s->consumer_num_bytes = static_cast<size_t>(-1); |
182 owner()->ProducerCloseNoLock(); | |
183 *actual_size = sizeof(SerializedDataPipeProducerDispatcher); | 182 *actual_size = sizeof(SerializedDataPipeProducerDispatcher); |
184 return true; | 183 return true; |
185 } | 184 } |
186 | 185 |
187 // Case 2: The consumer isn't closed. We'll replace ourselves with a | 186 // Case 2: The consumer isn't closed. We'll replace ourselves with a |
188 // |RemoteProducerDataPipeImpl|. | 187 // |RemoteProducerDataPipeImpl|. |
189 | 188 |
| 189 s->consumer_num_bytes = current_num_bytes_; |
190 // Note: We don't use |port|. | 190 // Note: We don't use |port|. |
191 scoped_refptr<ChannelEndpoint> channel_endpoint = | 191 scoped_refptr<ChannelEndpoint> channel_endpoint = |
192 channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr, | 192 channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr, |
193 owner(), 0); | 193 owner(), 0); |
194 // Note: Keep |*this| alive until the end of this method, to make things | 194 // Note: Keep |*this| alive until the end of this method, to make things |
195 // slightly easier on ourselves. | 195 // slightly easier on ourselves. |
196 scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr( | 196 scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr( |
197 new RemoteProducerDataPipeImpl(channel_endpoint.get(), buffer_.Pass(), | 197 new RemoteProducerDataPipeImpl(channel_endpoint.get(), buffer_.Pass(), |
198 start_index_, current_num_bytes_)))); | 198 start_index_, current_num_bytes_)))); |
199 | 199 |
(...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
349 size_t* actual_size, | 349 size_t* actual_size, |
350 embedder::PlatformHandleVector* platform_handles) { | 350 embedder::PlatformHandleVector* platform_handles) { |
351 SerializedDataPipeConsumerDispatcher* s = | 351 SerializedDataPipeConsumerDispatcher* s = |
352 static_cast<SerializedDataPipeConsumerDispatcher*>(destination); | 352 static_cast<SerializedDataPipeConsumerDispatcher*>(destination); |
353 s->validated_options = validated_options(); | 353 s->validated_options = validated_options(); |
354 void* destination_for_endpoint = static_cast<char*>(destination) + | 354 void* destination_for_endpoint = static_cast<char*>(destination) + |
355 sizeof(SerializedDataPipeConsumerDispatcher); | 355 sizeof(SerializedDataPipeConsumerDispatcher); |
356 | 356 |
357 size_t old_num_bytes = current_num_bytes_; | 357 size_t old_num_bytes = current_num_bytes_; |
358 MessageInTransitQueue message_queue; | 358 MessageInTransitQueue message_queue; |
359 ConvertDataToMessages(&message_queue); | 359 ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, |
| 360 &message_queue); |
| 361 start_index_ = 0; |
| 362 current_num_bytes_ = 0; |
360 | 363 |
361 if (!producer_open()) { | 364 if (!producer_open()) { |
362 // Case 1: The producer is closed. | 365 // Case 1: The producer is closed. |
363 channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, | 366 channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, |
364 &message_queue); | 367 &message_queue); |
365 owner()->ConsumerCloseNoLock(); | |
366 *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 368 *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
367 channel->GetSerializedEndpointSize(); | 369 channel->GetSerializedEndpointSize(); |
368 return true; | 370 return true; |
369 } | 371 } |
370 | 372 |
371 // Case 2: The producer isn't closed. We'll replace ourselves with a | 373 // Case 2: The producer isn't closed. We'll replace ourselves with a |
372 // |RemoteConsumerDataPipeImpl|. | 374 // |RemoteConsumerDataPipeImpl|. |
373 | 375 |
374 // Note: We don't use |port|. | 376 // Note: We don't use |port|. |
375 scoped_refptr<ChannelEndpoint> channel_endpoint = | 377 scoped_refptr<ChannelEndpoint> channel_endpoint = |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
432 return current_num_bytes_; | 434 return current_num_bytes_; |
433 } | 435 } |
434 | 436 |
435 void LocalDataPipeImpl::MarkDataAsConsumed(size_t num_bytes) { | 437 void LocalDataPipeImpl::MarkDataAsConsumed(size_t num_bytes) { |
436 DCHECK_LE(num_bytes, current_num_bytes_); | 438 DCHECK_LE(num_bytes, current_num_bytes_); |
437 start_index_ += num_bytes; | 439 start_index_ += num_bytes; |
438 start_index_ %= capacity_num_bytes(); | 440 start_index_ %= capacity_num_bytes(); |
439 current_num_bytes_ -= num_bytes; | 441 current_num_bytes_ -= num_bytes; |
440 } | 442 } |
441 | 443 |
442 void LocalDataPipeImpl::ConvertDataToMessages( | |
443 MessageInTransitQueue* message_queue) { | |
444 // The maximum amount of data to send per message (make it a multiple of the | |
445 // element size. | |
446 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | |
447 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); | |
448 DCHECK_GT(max_message_num_bytes, 0u); | |
449 | |
450 while (current_num_bytes_ > 0) { | |
451 size_t message_num_bytes = | |
452 std::min(max_message_num_bytes, GetMaxNumBytesToRead()); | |
453 | |
454 // Note: |message_num_bytes| fits in a |uint32_t| since the capacity does. | |
455 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
456 MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, | |
457 static_cast<uint32_t>(message_num_bytes), | |
458 buffer_.get() + start_index_)); | |
459 message_queue->AddMessage(message.Pass()); | |
460 | |
461 MarkDataAsConsumed(message_num_bytes); | |
462 } | |
463 } | |
464 | |
465 } // namespace system | 444 } // namespace system |
466 } // namespace mojo | 445 } // namespace mojo |
OLD | NEW |