| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // TODO(vtl): I currently potentially overflow in doing index calculations. | 5 // TODO(vtl): I currently potentially overflow in doing index calculations. |
| 6 // E.g., |start_index_| and |current_num_bytes_| fit into a |uint32_t|, but | 6 // E.g., |start_index_| and |current_num_bytes_| fit into a |uint32_t|, but |
| 7 // their sum may not. This is bad and poses a security risk. (We're currently | 7 // their sum may not. This is bad and poses a security risk. (We're currently |
| 8 // saved by the limit on capacity -- the maximum size of the buffer, checked in | 8 // saved by the limit on capacity -- the maximum size of the buffer, checked in |
| 9 // |DataPipe::ValidateOptions()|, is currently sufficiently small.) | 9 // |DataPipe::ValidateOptions()|, is currently sufficiently small.) |
| 10 | 10 |
| (...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 172 embedder::PlatformHandleVector* platform_handles) { | 172 embedder::PlatformHandleVector* platform_handles) { |
| 173 SerializedDataPipeProducerDispatcher* s = | 173 SerializedDataPipeProducerDispatcher* s = |
| 174 static_cast<SerializedDataPipeProducerDispatcher*>(destination); | 174 static_cast<SerializedDataPipeProducerDispatcher*>(destination); |
| 175 s->validated_options = validated_options(); | 175 s->validated_options = validated_options(); |
| 176 void* destination_for_endpoint = static_cast<char*>(destination) + | 176 void* destination_for_endpoint = static_cast<char*>(destination) + |
| 177 sizeof(SerializedDataPipeProducerDispatcher); | 177 sizeof(SerializedDataPipeProducerDispatcher); |
| 178 | 178 |
| 179 if (!consumer_open()) { | 179 if (!consumer_open()) { |
| 180 // Case 1: The consumer is closed. | 180 // Case 1: The consumer is closed. |
| 181 s->consumer_num_bytes = static_cast<size_t>(-1); | 181 s->consumer_num_bytes = static_cast<size_t>(-1); |
| 182 owner()->ProducerCloseNoLock(); | |
| 183 *actual_size = sizeof(SerializedDataPipeProducerDispatcher); | 182 *actual_size = sizeof(SerializedDataPipeProducerDispatcher); |
| 184 return true; | 183 return true; |
| 185 } | 184 } |
| 186 | 185 |
| 187 // Case 2: The consumer isn't closed. We'll replace ourselves with a | 186 // Case 2: The consumer isn't closed. We'll replace ourselves with a |
| 188 // |RemoteProducerDataPipeImpl|. | 187 // |RemoteProducerDataPipeImpl|. |
| 189 | 188 |
| 189 s->consumer_num_bytes = current_num_bytes_; |
| 190 // Note: We don't use |port|. | 190 // Note: We don't use |port|. |
| 191 scoped_refptr<ChannelEndpoint> channel_endpoint = | 191 scoped_refptr<ChannelEndpoint> channel_endpoint = |
| 192 channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr, | 192 channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr, |
| 193 owner(), 0); | 193 owner(), 0); |
| 194 // Note: Keep |*this| alive until the end of this method, to make things | 194 // Note: Keep |*this| alive until the end of this method, to make things |
| 195 // slightly easier on ourselves. | 195 // slightly easier on ourselves. |
| 196 scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr( | 196 scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr( |
| 197 new RemoteProducerDataPipeImpl(channel_endpoint.get(), buffer_.Pass(), | 197 new RemoteProducerDataPipeImpl(channel_endpoint.get(), buffer_.Pass(), |
| 198 start_index_, current_num_bytes_)))); | 198 start_index_, current_num_bytes_)))); |
| 199 | 199 |
| (...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 349 size_t* actual_size, | 349 size_t* actual_size, |
| 350 embedder::PlatformHandleVector* platform_handles) { | 350 embedder::PlatformHandleVector* platform_handles) { |
| 351 SerializedDataPipeConsumerDispatcher* s = | 351 SerializedDataPipeConsumerDispatcher* s = |
| 352 static_cast<SerializedDataPipeConsumerDispatcher*>(destination); | 352 static_cast<SerializedDataPipeConsumerDispatcher*>(destination); |
| 353 s->validated_options = validated_options(); | 353 s->validated_options = validated_options(); |
| 354 void* destination_for_endpoint = static_cast<char*>(destination) + | 354 void* destination_for_endpoint = static_cast<char*>(destination) + |
| 355 sizeof(SerializedDataPipeConsumerDispatcher); | 355 sizeof(SerializedDataPipeConsumerDispatcher); |
| 356 | 356 |
| 357 size_t old_num_bytes = current_num_bytes_; | 357 size_t old_num_bytes = current_num_bytes_; |
| 358 MessageInTransitQueue message_queue; | 358 MessageInTransitQueue message_queue; |
| 359 ConvertDataToMessages(&message_queue); | 359 ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, |
| 360 &message_queue); |
| 361 start_index_ = 0; |
| 362 current_num_bytes_ = 0; |
| 360 | 363 |
| 361 if (!producer_open()) { | 364 if (!producer_open()) { |
| 362 // Case 1: The producer is closed. | 365 // Case 1: The producer is closed. |
| 363 channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, | 366 channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, |
| 364 &message_queue); | 367 &message_queue); |
| 365 owner()->ConsumerCloseNoLock(); | |
| 366 *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 368 *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
| 367 channel->GetSerializedEndpointSize(); | 369 channel->GetSerializedEndpointSize(); |
| 368 return true; | 370 return true; |
| 369 } | 371 } |
| 370 | 372 |
| 371 // Case 2: The producer isn't closed. We'll replace ourselves with a | 373 // Case 2: The producer isn't closed. We'll replace ourselves with a |
| 372 // |RemoteConsumerDataPipeImpl|. | 374 // |RemoteConsumerDataPipeImpl|. |
| 373 | 375 |
| 374 // Note: We don't use |port|. | 376 // Note: We don't use |port|. |
| 375 scoped_refptr<ChannelEndpoint> channel_endpoint = | 377 scoped_refptr<ChannelEndpoint> channel_endpoint = |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 432 return current_num_bytes_; | 434 return current_num_bytes_; |
| 433 } | 435 } |
| 434 | 436 |
| 435 void LocalDataPipeImpl::MarkDataAsConsumed(size_t num_bytes) { | 437 void LocalDataPipeImpl::MarkDataAsConsumed(size_t num_bytes) { |
| 436 DCHECK_LE(num_bytes, current_num_bytes_); | 438 DCHECK_LE(num_bytes, current_num_bytes_); |
| 437 start_index_ += num_bytes; | 439 start_index_ += num_bytes; |
| 438 start_index_ %= capacity_num_bytes(); | 440 start_index_ %= capacity_num_bytes(); |
| 439 current_num_bytes_ -= num_bytes; | 441 current_num_bytes_ -= num_bytes; |
| 440 } | 442 } |
| 441 | 443 |
| 442 void LocalDataPipeImpl::ConvertDataToMessages( | |
| 443 MessageInTransitQueue* message_queue) { | |
| 444 // The maximum amount of data to send per message (make it a multiple of the | |
| 445 // element size. | |
| 446 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | |
| 447 max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); | |
| 448 DCHECK_GT(max_message_num_bytes, 0u); | |
| 449 | |
| 450 while (current_num_bytes_ > 0) { | |
| 451 size_t message_num_bytes = | |
| 452 std::min(max_message_num_bytes, GetMaxNumBytesToRead()); | |
| 453 | |
| 454 // Note: |message_num_bytes| fits in a |uint32_t| since the capacity does. | |
| 455 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 456 MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, | |
| 457 static_cast<uint32_t>(message_num_bytes), | |
| 458 buffer_.get() + start_index_)); | |
| 459 message_queue->AddMessage(message.Pass()); | |
| 460 | |
| 461 MarkDataAsConsumed(message_num_bytes); | |
| 462 } | |
| 463 } | |
| 464 | |
| 465 } // namespace system | 444 } // namespace system |
| 466 } // namespace mojo | 445 } // namespace mojo |
| OLD | NEW |