| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe.h" | 5 #include "mojo/edk/system/data_pipe.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <limits> | 10 #include <limits> |
| 11 #include <memory> | 11 #include <memory> |
| 12 #include <utility> | 12 #include <utility> |
| 13 | 13 |
| 14 #include "base/logging.h" | 14 #include "base/logging.h" |
| 15 #include "base/memory/aligned_memory.h" | 15 #include "base/memory/aligned_memory.h" |
| 16 #include "mojo/edk/system/awakable_list.h" | 16 #include "mojo/edk/system/awakable_list.h" |
| 17 #include "mojo/edk/system/channel.h" | 17 #include "mojo/edk/system/channel.h" |
| 18 #include "mojo/edk/system/configuration.h" | 18 #include "mojo/edk/system/configuration.h" |
| 19 #include "mojo/edk/system/data_pipe_impl.h" | 19 #include "mojo/edk/system/data_pipe_impl.h" |
| 20 #include "mojo/edk/system/incoming_endpoint.h" | 20 #include "mojo/edk/system/incoming_endpoint.h" |
| 21 #include "mojo/edk/system/local_data_pipe_impl.h" | 21 #include "mojo/edk/system/local_data_pipe_impl.h" |
| 22 #include "mojo/edk/system/memory.h" | 22 #include "mojo/edk/system/memory.h" |
| 23 #include "mojo/edk/system/options_validation.h" | 23 #include "mojo/edk/system/options_validation.h" |
| 24 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" | 24 #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
| 25 #include "mojo/edk/system/remote_producer_data_pipe_impl.h" | 25 #include "mojo/edk/system/remote_producer_data_pipe_impl.h" |
| 26 #include "mojo/edk/util/make_unique.h" |
| 26 | 27 |
| 27 namespace mojo { | 28 namespace mojo { |
| 28 namespace system { | 29 namespace system { |
| 29 | 30 |
| 30 // static | 31 // static |
| 31 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { | 32 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
| 32 MojoCreateDataPipeOptions result = { | 33 MojoCreateDataPipeOptions result = { |
| 33 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), | 34 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
| 34 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, | 35 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
| 35 1u, | 36 1u, |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 89 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 90 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 90 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; | 91 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
| 91 | 92 |
| 92 return MOJO_RESULT_OK; | 93 return MOJO_RESULT_OK; |
| 93 } | 94 } |
| 94 | 95 |
| 95 // static | 96 // static |
| 96 DataPipe* DataPipe::CreateLocal( | 97 DataPipe* DataPipe::CreateLocal( |
| 97 const MojoCreateDataPipeOptions& validated_options) { | 98 const MojoCreateDataPipeOptions& validated_options) { |
| 98 return new DataPipe(true, true, validated_options, | 99 return new DataPipe(true, true, validated_options, |
| 99 make_scoped_ptr(new LocalDataPipeImpl())); | 100 util::MakeUnique<LocalDataPipeImpl>()); |
| 100 } | 101 } |
| 101 | 102 |
| 102 // static | 103 // static |
| 103 DataPipe* DataPipe::CreateRemoteProducerFromExisting( | 104 DataPipe* DataPipe::CreateRemoteProducerFromExisting( |
| 104 const MojoCreateDataPipeOptions& validated_options, | 105 const MojoCreateDataPipeOptions& validated_options, |
| 105 MessageInTransitQueue* message_queue, | 106 MessageInTransitQueue* message_queue, |
| 106 ChannelEndpoint* channel_endpoint) { | 107 ChannelEndpoint* channel_endpoint) { |
| 107 std::unique_ptr<char, base::AlignedFreeDeleter> buffer; | 108 std::unique_ptr<char, base::AlignedFreeDeleter> buffer; |
| 108 size_t buffer_num_bytes = 0; | 109 size_t buffer_num_bytes = 0; |
| 109 if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( | 110 if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
| 110 validated_options, message_queue, &buffer, &buffer_num_bytes)) | 111 validated_options, message_queue, &buffer, &buffer_num_bytes)) |
| 111 return nullptr; | 112 return nullptr; |
| 112 | 113 |
| 113 // Important: This is called under |IncomingEndpoint|'s (which is a | 114 // Important: This is called under |IncomingEndpoint|'s (which is a |
| 114 // |ChannelEndpointClient|) lock, in particular from | 115 // |ChannelEndpointClient|) lock, in particular from |
| 115 // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that | 116 // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that |
| 116 // lock, it will reset its |endpoint_| member, which makes any later or | 117 // lock, it will reset its |endpoint_| member, which makes any later or |
| 117 // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will | 118 // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
| 118 // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| | 119 // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
| 119 // is called. | 120 // is called. |
| 120 DataPipe* data_pipe = new DataPipe( | 121 DataPipe* data_pipe = new DataPipe( |
| 121 false, true, validated_options, | 122 false, true, validated_options, |
| 122 make_scoped_ptr(new RemoteProducerDataPipeImpl( | 123 util::MakeUnique<RemoteProducerDataPipeImpl>( |
| 123 channel_endpoint, std::move(buffer), 0, buffer_num_bytes))); | 124 channel_endpoint, std::move(buffer), 0, buffer_num_bytes)); |
| 124 if (channel_endpoint) { | 125 if (channel_endpoint) { |
| 125 if (!channel_endpoint->ReplaceClient(data_pipe, 0)) | 126 if (!channel_endpoint->ReplaceClient(data_pipe, 0)) |
| 126 data_pipe->OnDetachFromChannel(0); | 127 data_pipe->OnDetachFromChannel(0); |
| 127 } else { | 128 } else { |
| 128 data_pipe->SetProducerClosed(); | 129 data_pipe->SetProducerClosed(); |
| 129 } | 130 } |
| 130 return data_pipe; | 131 return data_pipe; |
| 131 } | 132 } |
| 132 | 133 |
| 133 // static | 134 // static |
| 134 DataPipe* DataPipe::CreateRemoteConsumerFromExisting( | 135 DataPipe* DataPipe::CreateRemoteConsumerFromExisting( |
| 135 const MojoCreateDataPipeOptions& validated_options, | 136 const MojoCreateDataPipeOptions& validated_options, |
| 136 size_t consumer_num_bytes, | 137 size_t consumer_num_bytes, |
| 137 MessageInTransitQueue* message_queue, | 138 MessageInTransitQueue* message_queue, |
| 138 ChannelEndpoint* channel_endpoint) { | 139 ChannelEndpoint* channel_endpoint) { |
| 139 if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( | 140 if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
| 140 validated_options, &consumer_num_bytes, message_queue)) | 141 validated_options, &consumer_num_bytes, message_queue)) |
| 141 return nullptr; | 142 return nullptr; |
| 142 | 143 |
| 143 // Important: This is called under |IncomingEndpoint|'s (which is a | 144 // Important: This is called under |IncomingEndpoint|'s (which is a |
| 144 // |ChannelEndpointClient|) lock, in particular from | 145 // |ChannelEndpointClient|) lock, in particular from |
| 145 // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that | 146 // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that |
| 146 // lock, it will reset its |endpoint_| member, which makes any later or | 147 // lock, it will reset its |endpoint_| member, which makes any later or |
| 147 // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will | 148 // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
| 148 // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| | 149 // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
| 149 // is called. | 150 // is called. |
| 150 DataPipe* data_pipe = | 151 DataPipe* data_pipe = |
| 151 new DataPipe(true, false, validated_options, | 152 new DataPipe(true, false, validated_options, |
| 152 make_scoped_ptr(new RemoteConsumerDataPipeImpl( | 153 util::MakeUnique<RemoteConsumerDataPipeImpl>( |
| 153 channel_endpoint, consumer_num_bytes))); | 154 channel_endpoint, consumer_num_bytes)); |
| 154 if (channel_endpoint) { | 155 if (channel_endpoint) { |
| 155 if (!channel_endpoint->ReplaceClient(data_pipe, 0)) | 156 if (!channel_endpoint->ReplaceClient(data_pipe, 0)) |
| 156 data_pipe->OnDetachFromChannel(0); | 157 data_pipe->OnDetachFromChannel(0); |
| 157 } else { | 158 } else { |
| 158 data_pipe->SetConsumerClosed(); | 159 data_pipe->SetConsumerClosed(); |
| 159 } | 160 } |
| 160 return data_pipe; | 161 return data_pipe; |
| 161 } | 162 } |
| 162 | 163 |
| 163 // static | 164 // static |
| (...skipping 24 matching lines...) Expand all Loading... |
| 188 return false; | 189 return false; |
| 189 } | 190 } |
| 190 | 191 |
| 191 if (!consumer_open) { | 192 if (!consumer_open) { |
| 192 if (s->consumer_num_bytes != static_cast<size_t>(-1)) { | 193 if (s->consumer_num_bytes != static_cast<size_t>(-1)) { |
| 193 LOG(ERROR) | 194 LOG(ERROR) |
| 194 << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; | 195 << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
| 195 return false; | 196 return false; |
| 196 } | 197 } |
| 197 | 198 |
| 198 *data_pipe = new DataPipe( | 199 *data_pipe = |
| 199 true, false, revalidated_options, | 200 new DataPipe(true, false, revalidated_options, |
| 200 make_scoped_ptr(new RemoteConsumerDataPipeImpl(nullptr, 0))); | 201 util::MakeUnique<RemoteConsumerDataPipeImpl>(nullptr, 0)); |
| 201 (*data_pipe)->SetConsumerClosed(); | 202 (*data_pipe)->SetConsumerClosed(); |
| 202 | 203 |
| 203 return true; | 204 return true; |
| 204 } | 205 } |
| 205 | 206 |
| 206 if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes || | 207 if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes || |
| 207 s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) { | 208 s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) { |
| 208 LOG(ERROR) | 209 LOG(ERROR) |
| 209 << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; | 210 << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
| 210 return false; | 211 return false; |
| (...skipping 449 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 660 } | 661 } |
| 661 | 662 |
| 662 bool DataPipe::ConsumerIsBusy() const { | 663 bool DataPipe::ConsumerIsBusy() const { |
| 663 MutexLocker locker(&mutex_); | 664 MutexLocker locker(&mutex_); |
| 664 return consumer_in_two_phase_read_no_lock(); | 665 return consumer_in_two_phase_read_no_lock(); |
| 665 } | 666 } |
| 666 | 667 |
| 667 DataPipe::DataPipe(bool has_local_producer, | 668 DataPipe::DataPipe(bool has_local_producer, |
| 668 bool has_local_consumer, | 669 bool has_local_consumer, |
| 669 const MojoCreateDataPipeOptions& validated_options, | 670 const MojoCreateDataPipeOptions& validated_options, |
| 670 scoped_ptr<DataPipeImpl> impl) | 671 std::unique_ptr<DataPipeImpl> impl) |
| 671 : validated_options_(validated_options), | 672 : validated_options_(validated_options), |
| 672 producer_open_(true), | 673 producer_open_(true), |
| 673 consumer_open_(true), | 674 consumer_open_(true), |
| 674 producer_awakable_list_(has_local_producer ? new AwakableList() | 675 producer_awakable_list_(has_local_producer ? new AwakableList() |
| 675 : nullptr), | 676 : nullptr), |
| 676 consumer_awakable_list_(has_local_consumer ? new AwakableList() | 677 consumer_awakable_list_(has_local_consumer ? new AwakableList() |
| 677 : nullptr), | 678 : nullptr), |
| 678 producer_two_phase_max_num_bytes_written_(0), | 679 producer_two_phase_max_num_bytes_written_(0), |
| 679 consumer_two_phase_max_num_bytes_read_(0), | 680 consumer_two_phase_max_num_bytes_read_(0), |
| 680 impl_(impl.Pass()) { | 681 impl_(std::move(impl)) { |
| 681 impl_->set_owner(this); | 682 impl_->set_owner(this); |
| 682 | 683 |
| 683 #if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) | 684 #if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
| 684 // Check that the passed in options actually are validated. | 685 // Check that the passed in options actually are validated. |
| 685 MojoCreateDataPipeOptions unused = {}; | 686 MojoCreateDataPipeOptions unused = {}; |
| 686 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), | 687 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), |
| 687 MOJO_RESULT_OK); | 688 MOJO_RESULT_OK); |
| 688 #endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) | 689 #endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
| 689 } | 690 } |
| 690 | 691 |
| 691 DataPipe::~DataPipe() { | 692 DataPipe::~DataPipe() { |
| 692 DCHECK(!producer_open_); | 693 DCHECK(!producer_open_); |
| 693 DCHECK(!consumer_open_); | 694 DCHECK(!consumer_open_); |
| 694 DCHECK(!producer_awakable_list_); | 695 DCHECK(!producer_awakable_list_); |
| 695 DCHECK(!consumer_awakable_list_); | 696 DCHECK(!consumer_awakable_list_); |
| 696 } | 697 } |
| 697 | 698 |
| 698 scoped_ptr<DataPipeImpl> DataPipe::ReplaceImplNoLock( | 699 std::unique_ptr<DataPipeImpl> DataPipe::ReplaceImplNoLock( |
| 699 scoped_ptr<DataPipeImpl> new_impl) { | 700 std::unique_ptr<DataPipeImpl> new_impl) { |
| 700 mutex_.AssertHeld(); | 701 mutex_.AssertHeld(); |
| 701 DCHECK(new_impl); | 702 DCHECK(new_impl); |
| 702 | 703 |
| 703 impl_->set_owner(nullptr); | 704 impl_->set_owner(nullptr); |
| 704 scoped_ptr<DataPipeImpl> rv(impl_.Pass()); | 705 std::unique_ptr<DataPipeImpl> rv(std::move(impl_)); |
| 705 impl_ = new_impl.Pass(); | 706 impl_ = std::move(new_impl); |
| 706 impl_->set_owner(this); | 707 impl_->set_owner(this); |
| 707 return rv.Pass(); | 708 return rv; |
| 708 } | 709 } |
| 709 | 710 |
| 710 void DataPipe::SetProducerClosedNoLock() { | 711 void DataPipe::SetProducerClosedNoLock() { |
| 711 mutex_.AssertHeld(); | 712 mutex_.AssertHeld(); |
| 712 DCHECK(!has_local_producer_no_lock()); | 713 DCHECK(!has_local_producer_no_lock()); |
| 713 DCHECK(producer_open_); | 714 DCHECK(producer_open_); |
| 714 producer_open_ = false; | 715 producer_open_ = false; |
| 715 } | 716 } |
| 716 | 717 |
| 717 void DataPipe::SetConsumerClosedNoLock() { | 718 void DataPipe::SetConsumerClosedNoLock() { |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 818 SetProducerClosedNoLock(); | 819 SetProducerClosedNoLock(); |
| 819 } | 820 } |
| 820 | 821 |
| 821 void DataPipe::SetConsumerClosed() { | 822 void DataPipe::SetConsumerClosed() { |
| 822 MutexLocker locker(&mutex_); | 823 MutexLocker locker(&mutex_); |
| 823 SetConsumerClosedNoLock(); | 824 SetConsumerClosedNoLock(); |
| 824 } | 825 } |
| 825 | 826 |
| 826 } // namespace system | 827 } // namespace system |
| 827 } // namespace mojo | 828 } // namespace mojo |
| OLD | NEW |