Index: third_party/mojo/src/mojo/edk/system/data_pipe.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/data_pipe.cc b/third_party/mojo/src/mojo/edk/system/data_pipe.cc |
index d5e9db95266b49f20ac2f308314258895f02b190..c46de2cc1e0ff5c3b23f94d0c0f8885273b4702d 100644 |
--- a/third_party/mojo/src/mojo/edk/system/data_pipe.cc |
+++ b/third_party/mojo/src/mojo/edk/system/data_pipe.cc |
@@ -10,12 +10,17 @@ |
#include <limits> |
#include "base/logging.h" |
+#include "base/memory/aligned_memory.h" |
#include "mojo/edk/system/awakable_list.h" |
+#include "mojo/edk/system/channel.h" |
#include "mojo/edk/system/configuration.h" |
#include "mojo/edk/system/data_pipe_impl.h" |
+#include "mojo/edk/system/incoming_endpoint.h" |
#include "mojo/edk/system/local_data_pipe_impl.h" |
#include "mojo/edk/system/memory.h" |
#include "mojo/edk/system/options_validation.h" |
+#include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
+#include "mojo/edk/system/remote_producer_data_pipe_impl.h" |
namespace mojo { |
namespace system { |
@@ -36,7 +41,7 @@ MojoResult DataPipe::ValidateCreateOptions( |
UserPointer<const MojoCreateDataPipeOptions> in_options, |
MojoCreateDataPipeOptions* out_options) { |
const MojoCreateDataPipeOptionsFlags kKnownFlags = |
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD; |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; |
*out_options = GetDefaultCreateOptions(); |
if (in_options.IsNull()) |
@@ -92,6 +97,170 @@ DataPipe* DataPipe::CreateLocal( |
make_scoped_ptr(new LocalDataPipeImpl())); |
} |
+// static |
+DataPipe* DataPipe::CreateRemoteProducerFromExisting( |
+ const MojoCreateDataPipeOptions& validated_options, |
+ MessageInTransitQueue* message_queue, |
+ ChannelEndpoint* channel_endpoint) { |
+ scoped_ptr<char, base::AlignedFreeDeleter> buffer; |
+ size_t buffer_num_bytes = 0; |
+ if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
+ validated_options, message_queue, &buffer, &buffer_num_bytes)) |
+ return nullptr; |
+ |
+ // Important: This is called under |IncomingEndpoint|'s (which is a |
+ // |ChannelEndpointClient|) lock, in particular from |
+ // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that |
+ // lock, it will reset its |endpoint_| member, which makes any later or |
+ // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
+ // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
+ // is called. |
+ DataPipe* data_pipe = |
+ new DataPipe(false, true, validated_options, |
+ make_scoped_ptr(new RemoteProducerDataPipeImpl( |
+ channel_endpoint, buffer.Pass(), 0, buffer_num_bytes))); |
+ if (channel_endpoint) { |
+ if (!channel_endpoint->ReplaceClient(data_pipe, 0)) |
+ data_pipe->OnDetachFromChannel(0); |
+ } else { |
+ data_pipe->SetProducerClosed(); |
+ } |
+ return data_pipe; |
+} |
+ |
+// static |
+DataPipe* DataPipe::CreateRemoteConsumerFromExisting( |
+ const MojoCreateDataPipeOptions& validated_options, |
+ size_t consumer_num_bytes, |
+ MessageInTransitQueue* message_queue, |
+ ChannelEndpoint* channel_endpoint) { |
+ if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
+ validated_options, &consumer_num_bytes, message_queue)) |
+ return nullptr; |
+ |
+ // Important: This is called under |IncomingEndpoint|'s (which is a |
+ // |ChannelEndpointClient|) lock, in particular from |
+ // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that |
+ // lock, it will reset its |endpoint_| member, which makes any later or |
+ // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
+ // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
+ // is called. |
+ DataPipe* data_pipe = |
+ new DataPipe(true, false, validated_options, |
+ make_scoped_ptr(new RemoteConsumerDataPipeImpl( |
+ channel_endpoint, consumer_num_bytes))); |
+ if (channel_endpoint) { |
+ if (!channel_endpoint->ReplaceClient(data_pipe, 0)) |
+ data_pipe->OnDetachFromChannel(0); |
+ } else { |
+ data_pipe->SetConsumerClosed(); |
+ } |
+ return data_pipe; |
+} |
+ |
+// static |
+bool DataPipe::ProducerDeserialize(Channel* channel, |
+ const void* source, |
+ size_t size, |
+ scoped_refptr<DataPipe>* data_pipe) { |
+ DCHECK(!*data_pipe); // Not technically wrong, but unlikely. |
+ |
+ bool consumer_open = false; |
+ if (size == sizeof(SerializedDataPipeProducerDispatcher)) { |
+ consumer_open = false; |
+ } else if (size == |
+ sizeof(SerializedDataPipeProducerDispatcher) + |
+ channel->GetSerializedEndpointSize()) { |
+ consumer_open = true; |
+ } else { |
+ LOG(ERROR) << "Invalid serialized data pipe producer"; |
+ return false; |
+ } |
+ |
+ const SerializedDataPipeProducerDispatcher* s = |
+ static_cast<const SerializedDataPipeProducerDispatcher*>(source); |
+ MojoCreateDataPipeOptions revalidated_options = {}; |
+ if (ValidateCreateOptions(MakeUserPointer(&s->validated_options), |
+ &revalidated_options) != MOJO_RESULT_OK) { |
+ LOG(ERROR) << "Invalid serialized data pipe producer (bad options)"; |
+ return false; |
+ } |
+ |
+ if (!consumer_open) { |
+ if (s->consumer_num_bytes != static_cast<size_t>(-1)) { |
+ LOG(ERROR) |
+ << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
+ return false; |
+ } |
+ |
+ *data_pipe = new DataPipe( |
+ true, false, revalidated_options, |
+ make_scoped_ptr(new RemoteConsumerDataPipeImpl(nullptr, 0))); |
+ (*data_pipe)->SetConsumerClosed(); |
+ |
+ return true; |
+ } |
+ |
+ if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes || |
+ s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) { |
+ LOG(ERROR) |
+ << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
+ return false; |
+ } |
+ |
+ const void* endpoint_source = static_cast<const char*>(source) + |
+ sizeof(SerializedDataPipeProducerDispatcher); |
+ scoped_refptr<IncomingEndpoint> incoming_endpoint = |
+ channel->DeserializeEndpoint(endpoint_source); |
+ if (!incoming_endpoint) |
+ return false; |
+ |
+ *data_pipe = incoming_endpoint->ConvertToDataPipeProducer( |
+ revalidated_options, s->consumer_num_bytes); |
+ if (!*data_pipe) |
+ return false; |
+ |
+ return true; |
+} |
+ |
+// static |
+bool DataPipe::ConsumerDeserialize(Channel* channel, |
+ const void* source, |
+ size_t size, |
+ scoped_refptr<DataPipe>* data_pipe) { |
+ DCHECK(!*data_pipe); // Not technically wrong, but unlikely. |
+ |
+ if (size != |
+ sizeof(SerializedDataPipeConsumerDispatcher) + |
+ channel->GetSerializedEndpointSize()) { |
+ LOG(ERROR) << "Invalid serialized data pipe consumer"; |
+ return false; |
+ } |
+ |
+ const SerializedDataPipeConsumerDispatcher* s = |
+ static_cast<const SerializedDataPipeConsumerDispatcher*>(source); |
+ MojoCreateDataPipeOptions revalidated_options = {}; |
+ if (ValidateCreateOptions(MakeUserPointer(&s->validated_options), |
+ &revalidated_options) != MOJO_RESULT_OK) { |
+ LOG(ERROR) << "Invalid serialized data pipe consumer (bad options)"; |
+ return false; |
+ } |
+ |
+ const void* endpoint_source = static_cast<const char*>(source) + |
+ sizeof(SerializedDataPipeConsumerDispatcher); |
+ scoped_refptr<IncomingEndpoint> incoming_endpoint = |
+ channel->DeserializeEndpoint(endpoint_source); |
+ if (!incoming_endpoint) |
+ return false; |
+ |
+ *data_pipe = |
+ incoming_endpoint->ConvertToDataPipeConsumer(revalidated_options); |
+ if (!*data_pipe) |
+ return false; |
+ |
+ return true; |
+} |
+ |
void DataPipe::ProducerCancelAllAwakables() { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
@@ -253,8 +422,24 @@ bool DataPipe::ProducerEndSerialize( |
embedder::PlatformHandleVector* platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- return impl_->ProducerEndSerialize(channel, destination, actual_size, |
- platform_handles); |
+ // Warning: After |ProducerEndSerialize()|, quite probably |impl_| has |
+ // changed. |
+ bool rv = impl_->ProducerEndSerialize(channel, destination, actual_size, |
+ platform_handles); |
+ |
+ // TODO(vtl): The code below is similar to, but not quite the same as, |
+ // |ProducerCloseNoLock()|. |
+ DCHECK(has_local_producer_no_lock()); |
+ producer_awakable_list_->CancelAll(); |
+ producer_awakable_list_.reset(); |
+ // Not a bug, except possibly in "user" code. |
+ DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
+ << "Producer transferred with active two-phase write"; |
+ producer_two_phase_max_num_bytes_written_ = 0; |
+ if (!has_local_consumer_no_lock()) |
+ producer_open_ = false; |
+ |
+ return rv; |
} |
bool DataPipe::ProducerIsBusy() const { |
@@ -453,8 +638,23 @@ bool DataPipe::ConsumerEndSerialize( |
embedder::PlatformHandleVector* platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- return impl_->ConsumerEndSerialize(channel, destination, actual_size, |
- platform_handles); |
+ // Warning: After |ConsumerEndSerialize()|, quite probably |impl_| has |
+ // changed. |
+ bool rv = impl_->ConsumerEndSerialize(channel, destination, actual_size, |
+ platform_handles); |
+ |
+ // TODO(vtl): The code below is similar to, but not quite the same as, |
+ // |ConsumerCloseNoLock()|. |
+ consumer_awakable_list_->CancelAll(); |
+ consumer_awakable_list_.reset(); |
+ // Not a bug, except possibly in "user" code. |
+ DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
+ << "Consumer transferred with active two-phase read"; |
+ consumer_two_phase_max_num_bytes_read_ = 0; |
+ if (!has_local_producer_no_lock()) |
+ consumer_open_ = false; |
+ |
+ return rv; |
} |
bool DataPipe::ConsumerIsBusy() const { |
@@ -480,7 +680,7 @@ DataPipe::DataPipe(bool has_local_producer, |
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
// Check that the passed in options actually are validated. |
- MojoCreateDataPipeOptions unused = {0}; |
+ MojoCreateDataPipeOptions unused = {}; |
DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), |
MOJO_RESULT_OK); |
#endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
@@ -493,34 +693,106 @@ DataPipe::~DataPipe() { |
DCHECK(!consumer_awakable_list_); |
} |
+scoped_ptr<DataPipeImpl> DataPipe::ReplaceImplNoLock( |
+ scoped_ptr<DataPipeImpl> new_impl) { |
+ lock_.AssertAcquired(); |
+ DCHECK(new_impl); |
+ |
+ impl_->set_owner(nullptr); |
+ scoped_ptr<DataPipeImpl> rv(impl_.Pass()); |
+ impl_ = new_impl.Pass(); |
+ impl_->set_owner(this); |
+ return rv.Pass(); |
+} |
+ |
+void DataPipe::SetProducerClosedNoLock() { |
+ lock_.AssertAcquired(); |
+ DCHECK(!has_local_producer_no_lock()); |
+ DCHECK(producer_open_); |
+ producer_open_ = false; |
+} |
+ |
+void DataPipe::SetConsumerClosedNoLock() { |
+ lock_.AssertAcquired(); |
+ DCHECK(!has_local_consumer_no_lock()); |
+ DCHECK(consumer_open_); |
+ consumer_open_ = false; |
+} |
+ |
void DataPipe::ProducerCloseNoLock() { |
lock_.AssertAcquired(); |
DCHECK(producer_open_); |
producer_open_ = false; |
- DCHECK(has_local_producer_no_lock()); |
- producer_awakable_list_.reset(); |
- // Not a bug, except possibly in "user" code. |
- DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
- << "Producer closed with active two-phase write"; |
- producer_two_phase_max_num_bytes_written_ = 0; |
- impl_->ProducerClose(); |
- AwakeConsumerAwakablesForStateChangeNoLock( |
- impl_->ConsumerGetHandleSignalsState()); |
+ if (has_local_producer_no_lock()) { |
+ producer_awakable_list_.reset(); |
+ // Not a bug, except possibly in "user" code. |
+ DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
+ << "Producer closed with active two-phase write"; |
+ producer_two_phase_max_num_bytes_written_ = 0; |
+ impl_->ProducerClose(); |
+ AwakeConsumerAwakablesForStateChangeNoLock( |
+ impl_->ConsumerGetHandleSignalsState()); |
+ } |
} |
void DataPipe::ConsumerCloseNoLock() { |
lock_.AssertAcquired(); |
DCHECK(consumer_open_); |
consumer_open_ = false; |
- DCHECK(has_local_consumer_no_lock()); |
- consumer_awakable_list_.reset(); |
- // Not a bug, except possibly in "user" code. |
- DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
- << "Consumer closed with active two-phase read"; |
- consumer_two_phase_max_num_bytes_read_ = 0; |
- impl_->ConsumerClose(); |
- AwakeProducerAwakablesForStateChangeNoLock( |
- impl_->ProducerGetHandleSignalsState()); |
+ if (has_local_consumer_no_lock()) { |
+ consumer_awakable_list_.reset(); |
+ // Not a bug, except possibly in "user" code. |
+ DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
+ << "Consumer closed with active two-phase read"; |
+ consumer_two_phase_max_num_bytes_read_ = 0; |
+ impl_->ConsumerClose(); |
+ AwakeProducerAwakablesForStateChangeNoLock( |
+ impl_->ProducerGetHandleSignalsState()); |
+ } |
+} |
+ |
+bool DataPipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
+ base::AutoLock locker(lock_); |
+ DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock()); |
+ |
+ HandleSignalsState old_producer_state = |
+ impl_->ProducerGetHandleSignalsState(); |
+ HandleSignalsState old_consumer_state = |
+ impl_->ConsumerGetHandleSignalsState(); |
+ |
+ bool rv = impl_->OnReadMessage(port, message); |
+ |
+ HandleSignalsState new_producer_state = |
+ impl_->ProducerGetHandleSignalsState(); |
+ if (!new_producer_state.equals(old_producer_state)) |
+ AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
+ HandleSignalsState new_consumer_state = |
+ impl_->ConsumerGetHandleSignalsState(); |
+ if (!new_consumer_state.equals(old_consumer_state)) |
+ AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
+ |
+ return rv; |
+} |
+ |
+void DataPipe::OnDetachFromChannel(unsigned port) { |
+ base::AutoLock locker(lock_); |
+ DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock()); |
+ |
+ HandleSignalsState old_producer_state = |
+ impl_->ProducerGetHandleSignalsState(); |
+ HandleSignalsState old_consumer_state = |
+ impl_->ConsumerGetHandleSignalsState(); |
+ |
+ impl_->OnDetachFromChannel(port); |
+ |
+ HandleSignalsState new_producer_state = |
+ impl_->ProducerGetHandleSignalsState(); |
+ if (!new_producer_state.equals(old_producer_state)) |
+ AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
+ HandleSignalsState new_consumer_state = |
+ impl_->ConsumerGetHandleSignalsState(); |
+ if (!new_consumer_state.equals(old_consumer_state)) |
+ AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
} |
void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( |
@@ -539,5 +811,15 @@ void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock( |
consumer_awakable_list_->AwakeForStateChange(new_consumer_state); |
} |
+void DataPipe::SetProducerClosed() { |
+ base::AutoLock locker(lock_); |
+ SetProducerClosedNoLock(); |
+} |
+ |
+void DataPipe::SetConsumerClosed() { |
+ base::AutoLock locker(lock_); |
+ SetConsumerClosedNoLock(); |
+} |
+ |
} // namespace system |
} // namespace mojo |