Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Unified Diff: third_party/mojo/src/mojo/edk/system/data_pipe.cc

Issue 975973002: Update mojo sdk to rev f68e697e389943cd9bf9652397312280e96b127a (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: shake fist at msvc Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/mojo/src/mojo/edk/system/data_pipe.cc
diff --git a/third_party/mojo/src/mojo/edk/system/data_pipe.cc b/third_party/mojo/src/mojo/edk/system/data_pipe.cc
index d5e9db95266b49f20ac2f308314258895f02b190..c46de2cc1e0ff5c3b23f94d0c0f8885273b4702d 100644
--- a/third_party/mojo/src/mojo/edk/system/data_pipe.cc
+++ b/third_party/mojo/src/mojo/edk/system/data_pipe.cc
@@ -10,12 +10,17 @@
#include <limits>
#include "base/logging.h"
+#include "base/memory/aligned_memory.h"
#include "mojo/edk/system/awakable_list.h"
+#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/configuration.h"
#include "mojo/edk/system/data_pipe_impl.h"
+#include "mojo/edk/system/incoming_endpoint.h"
#include "mojo/edk/system/local_data_pipe_impl.h"
#include "mojo/edk/system/memory.h"
#include "mojo/edk/system/options_validation.h"
+#include "mojo/edk/system/remote_consumer_data_pipe_impl.h"
+#include "mojo/edk/system/remote_producer_data_pipe_impl.h"
namespace mojo {
namespace system {
@@ -36,7 +41,7 @@ MojoResult DataPipe::ValidateCreateOptions(
UserPointer<const MojoCreateDataPipeOptions> in_options,
MojoCreateDataPipeOptions* out_options) {
const MojoCreateDataPipeOptionsFlags kKnownFlags =
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
*out_options = GetDefaultCreateOptions();
if (in_options.IsNull())
@@ -92,6 +97,170 @@ DataPipe* DataPipe::CreateLocal(
make_scoped_ptr(new LocalDataPipeImpl()));
}
+// static
+DataPipe* DataPipe::CreateRemoteProducerFromExisting(
+ const MojoCreateDataPipeOptions& validated_options,
+ MessageInTransitQueue* message_queue,
+ ChannelEndpoint* channel_endpoint) {
+ scoped_ptr<char, base::AlignedFreeDeleter> buffer;
+ size_t buffer_num_bytes = 0;
+ if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint(
+ validated_options, message_queue, &buffer, &buffer_num_bytes))
+ return nullptr;
+
+ // Important: This is called under |IncomingEndpoint|'s (which is a
+ // |ChannelEndpointClient|) lock, in particular from
+ // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that
+ // lock, it will reset its |endpoint_| member, which makes any later or
+ // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will
+ // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()|
+ // is called.
+ DataPipe* data_pipe =
+ new DataPipe(false, true, validated_options,
+ make_scoped_ptr(new RemoteProducerDataPipeImpl(
+ channel_endpoint, buffer.Pass(), 0, buffer_num_bytes)));
+ if (channel_endpoint) {
+ if (!channel_endpoint->ReplaceClient(data_pipe, 0))
+ data_pipe->OnDetachFromChannel(0);
+ } else {
+ data_pipe->SetProducerClosed();
+ }
+ return data_pipe;
+}
+
+// static
+DataPipe* DataPipe::CreateRemoteConsumerFromExisting(
+ const MojoCreateDataPipeOptions& validated_options,
+ size_t consumer_num_bytes,
+ MessageInTransitQueue* message_queue,
+ ChannelEndpoint* channel_endpoint) {
+ if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint(
+ validated_options, &consumer_num_bytes, message_queue))
+ return nullptr;
+
+ // Important: This is called under |IncomingEndpoint|'s (which is a
+ // |ChannelEndpointClient|) lock, in particular from
+ // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that
+ // lock, it will reset its |endpoint_| member, which makes any later or
+ // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will
+ // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()|
+ // is called.
+ DataPipe* data_pipe =
+ new DataPipe(true, false, validated_options,
+ make_scoped_ptr(new RemoteConsumerDataPipeImpl(
+ channel_endpoint, consumer_num_bytes)));
+ if (channel_endpoint) {
+ if (!channel_endpoint->ReplaceClient(data_pipe, 0))
+ data_pipe->OnDetachFromChannel(0);
+ } else {
+ data_pipe->SetConsumerClosed();
+ }
+ return data_pipe;
+}
+
+// static
+bool DataPipe::ProducerDeserialize(Channel* channel,
+ const void* source,
+ size_t size,
+ scoped_refptr<DataPipe>* data_pipe) {
+ DCHECK(!*data_pipe); // Not technically wrong, but unlikely.
+
+ bool consumer_open = false;
+ if (size == sizeof(SerializedDataPipeProducerDispatcher)) {
+ consumer_open = false;
+ } else if (size ==
+ sizeof(SerializedDataPipeProducerDispatcher) +
+ channel->GetSerializedEndpointSize()) {
+ consumer_open = true;
+ } else {
+ LOG(ERROR) << "Invalid serialized data pipe producer";
+ return false;
+ }
+
+ const SerializedDataPipeProducerDispatcher* s =
+ static_cast<const SerializedDataPipeProducerDispatcher*>(source);
+ MojoCreateDataPipeOptions revalidated_options = {};
+ if (ValidateCreateOptions(MakeUserPointer(&s->validated_options),
+ &revalidated_options) != MOJO_RESULT_OK) {
+ LOG(ERROR) << "Invalid serialized data pipe producer (bad options)";
+ return false;
+ }
+
+ if (!consumer_open) {
+ if (s->consumer_num_bytes != static_cast<size_t>(-1)) {
+ LOG(ERROR)
+ << "Invalid serialized data pipe producer (bad consumer_num_bytes)";
+ return false;
+ }
+
+ *data_pipe = new DataPipe(
+ true, false, revalidated_options,
+ make_scoped_ptr(new RemoteConsumerDataPipeImpl(nullptr, 0)));
+ (*data_pipe)->SetConsumerClosed();
+
+ return true;
+ }
+
+ if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes ||
+ s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) {
+ LOG(ERROR)
+ << "Invalid serialized data pipe producer (bad consumer_num_bytes)";
+ return false;
+ }
+
+ const void* endpoint_source = static_cast<const char*>(source) +
+ sizeof(SerializedDataPipeProducerDispatcher);
+ scoped_refptr<IncomingEndpoint> incoming_endpoint =
+ channel->DeserializeEndpoint(endpoint_source);
+ if (!incoming_endpoint)
+ return false;
+
+ *data_pipe = incoming_endpoint->ConvertToDataPipeProducer(
+ revalidated_options, s->consumer_num_bytes);
+ if (!*data_pipe)
+ return false;
+
+ return true;
+}
+
+// static
+bool DataPipe::ConsumerDeserialize(Channel* channel,
+ const void* source,
+ size_t size,
+ scoped_refptr<DataPipe>* data_pipe) {
+ DCHECK(!*data_pipe); // Not technically wrong, but unlikely.
+
+ if (size !=
+ sizeof(SerializedDataPipeConsumerDispatcher) +
+ channel->GetSerializedEndpointSize()) {
+ LOG(ERROR) << "Invalid serialized data pipe consumer";
+ return false;
+ }
+
+ const SerializedDataPipeConsumerDispatcher* s =
+ static_cast<const SerializedDataPipeConsumerDispatcher*>(source);
+ MojoCreateDataPipeOptions revalidated_options = {};
+ if (ValidateCreateOptions(MakeUserPointer(&s->validated_options),
+ &revalidated_options) != MOJO_RESULT_OK) {
+ LOG(ERROR) << "Invalid serialized data pipe consumer (bad options)";
+ return false;
+ }
+
+ const void* endpoint_source = static_cast<const char*>(source) +
+ sizeof(SerializedDataPipeConsumerDispatcher);
+ scoped_refptr<IncomingEndpoint> incoming_endpoint =
+ channel->DeserializeEndpoint(endpoint_source);
+ if (!incoming_endpoint)
+ return false;
+
+ *data_pipe =
+ incoming_endpoint->ConvertToDataPipeConsumer(revalidated_options);
+ if (!*data_pipe)
+ return false;
+
+ return true;
+}
+
void DataPipe::ProducerCancelAllAwakables() {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
@@ -253,8 +422,24 @@ bool DataPipe::ProducerEndSerialize(
embedder::PlatformHandleVector* platform_handles) {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
- return impl_->ProducerEndSerialize(channel, destination, actual_size,
- platform_handles);
+ // Warning: After |ProducerEndSerialize()|, quite probably |impl_| has
+ // changed.
+ bool rv = impl_->ProducerEndSerialize(channel, destination, actual_size,
+ platform_handles);
+
+ // TODO(vtl): The code below is similar to, but not quite the same as,
+ // |ProducerCloseNoLock()|.
+ DCHECK(has_local_producer_no_lock());
+ producer_awakable_list_->CancelAll();
+ producer_awakable_list_.reset();
+ // Not a bug, except possibly in "user" code.
+ DVLOG_IF(2, producer_in_two_phase_write_no_lock())
+ << "Producer transferred with active two-phase write";
+ producer_two_phase_max_num_bytes_written_ = 0;
+ if (!has_local_consumer_no_lock())
+ producer_open_ = false;
+
+ return rv;
}
bool DataPipe::ProducerIsBusy() const {
@@ -453,8 +638,23 @@ bool DataPipe::ConsumerEndSerialize(
embedder::PlatformHandleVector* platform_handles) {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
- return impl_->ConsumerEndSerialize(channel, destination, actual_size,
- platform_handles);
+ // Warning: After |ConsumerEndSerialize()|, quite probably |impl_| has
+ // changed.
+ bool rv = impl_->ConsumerEndSerialize(channel, destination, actual_size,
+ platform_handles);
+
+ // TODO(vtl): The code below is similar to, but not quite the same as,
+ // |ConsumerCloseNoLock()|.
+ consumer_awakable_list_->CancelAll();
+ consumer_awakable_list_.reset();
+ // Not a bug, except possibly in "user" code.
+ DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
+ << "Consumer transferred with active two-phase read";
+ consumer_two_phase_max_num_bytes_read_ = 0;
+ if (!has_local_producer_no_lock())
+ consumer_open_ = false;
+
+ return rv;
}
bool DataPipe::ConsumerIsBusy() const {
@@ -480,7 +680,7 @@ DataPipe::DataPipe(bool has_local_producer,
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
// Check that the passed in options actually are validated.
- MojoCreateDataPipeOptions unused = {0};
+ MojoCreateDataPipeOptions unused = {};
DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
MOJO_RESULT_OK);
#endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
@@ -493,34 +693,106 @@ DataPipe::~DataPipe() {
DCHECK(!consumer_awakable_list_);
}
+scoped_ptr<DataPipeImpl> DataPipe::ReplaceImplNoLock(
+ scoped_ptr<DataPipeImpl> new_impl) {
+ lock_.AssertAcquired();
+ DCHECK(new_impl);
+
+ impl_->set_owner(nullptr);
+ scoped_ptr<DataPipeImpl> rv(impl_.Pass());
+ impl_ = new_impl.Pass();
+ impl_->set_owner(this);
+ return rv.Pass();
+}
+
+void DataPipe::SetProducerClosedNoLock() {
+ lock_.AssertAcquired();
+ DCHECK(!has_local_producer_no_lock());
+ DCHECK(producer_open_);
+ producer_open_ = false;
+}
+
+void DataPipe::SetConsumerClosedNoLock() {
+ lock_.AssertAcquired();
+ DCHECK(!has_local_consumer_no_lock());
+ DCHECK(consumer_open_);
+ consumer_open_ = false;
+}
+
void DataPipe::ProducerCloseNoLock() {
lock_.AssertAcquired();
DCHECK(producer_open_);
producer_open_ = false;
- DCHECK(has_local_producer_no_lock());
- producer_awakable_list_.reset();
- // Not a bug, except possibly in "user" code.
- DVLOG_IF(2, producer_in_two_phase_write_no_lock())
- << "Producer closed with active two-phase write";
- producer_two_phase_max_num_bytes_written_ = 0;
- impl_->ProducerClose();
- AwakeConsumerAwakablesForStateChangeNoLock(
- impl_->ConsumerGetHandleSignalsState());
+ if (has_local_producer_no_lock()) {
+ producer_awakable_list_.reset();
+ // Not a bug, except possibly in "user" code.
+ DVLOG_IF(2, producer_in_two_phase_write_no_lock())
+ << "Producer closed with active two-phase write";
+ producer_two_phase_max_num_bytes_written_ = 0;
+ impl_->ProducerClose();
+ AwakeConsumerAwakablesForStateChangeNoLock(
+ impl_->ConsumerGetHandleSignalsState());
+ }
}
void DataPipe::ConsumerCloseNoLock() {
lock_.AssertAcquired();
DCHECK(consumer_open_);
consumer_open_ = false;
- DCHECK(has_local_consumer_no_lock());
- consumer_awakable_list_.reset();
- // Not a bug, except possibly in "user" code.
- DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
- << "Consumer closed with active two-phase read";
- consumer_two_phase_max_num_bytes_read_ = 0;
- impl_->ConsumerClose();
- AwakeProducerAwakablesForStateChangeNoLock(
- impl_->ProducerGetHandleSignalsState());
+ if (has_local_consumer_no_lock()) {
+ consumer_awakable_list_.reset();
+ // Not a bug, except possibly in "user" code.
+ DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
+ << "Consumer closed with active two-phase read";
+ consumer_two_phase_max_num_bytes_read_ = 0;
+ impl_->ConsumerClose();
+ AwakeProducerAwakablesForStateChangeNoLock(
+ impl_->ProducerGetHandleSignalsState());
+ }
+}
+
+bool DataPipe::OnReadMessage(unsigned port, MessageInTransit* message) {
+ base::AutoLock locker(lock_);
+ DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock());
+
+ HandleSignalsState old_producer_state =
+ impl_->ProducerGetHandleSignalsState();
+ HandleSignalsState old_consumer_state =
+ impl_->ConsumerGetHandleSignalsState();
+
+ bool rv = impl_->OnReadMessage(port, message);
+
+ HandleSignalsState new_producer_state =
+ impl_->ProducerGetHandleSignalsState();
+ if (!new_producer_state.equals(old_producer_state))
+ AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
+ HandleSignalsState new_consumer_state =
+ impl_->ConsumerGetHandleSignalsState();
+ if (!new_consumer_state.equals(old_consumer_state))
+ AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
+
+ return rv;
+}
+
+void DataPipe::OnDetachFromChannel(unsigned port) {
+ base::AutoLock locker(lock_);
+ DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock());
+
+ HandleSignalsState old_producer_state =
+ impl_->ProducerGetHandleSignalsState();
+ HandleSignalsState old_consumer_state =
+ impl_->ConsumerGetHandleSignalsState();
+
+ impl_->OnDetachFromChannel(port);
+
+ HandleSignalsState new_producer_state =
+ impl_->ProducerGetHandleSignalsState();
+ if (!new_producer_state.equals(old_producer_state))
+ AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
+ HandleSignalsState new_consumer_state =
+ impl_->ConsumerGetHandleSignalsState();
+ if (!new_consumer_state.equals(old_consumer_state))
+ AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
}
void DataPipe::AwakeProducerAwakablesForStateChangeNoLock(
@@ -539,5 +811,15 @@ void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock(
consumer_awakable_list_->AwakeForStateChange(new_consumer_state);
}
+void DataPipe::SetProducerClosed() {
+ base::AutoLock locker(lock_);
+ SetProducerClosedNoLock();
+}
+
+void DataPipe::SetConsumerClosed() {
+ base::AutoLock locker(lock_);
+ SetConsumerClosedNoLock();
+}
+
} // namespace system
} // namespace mojo
« no previous file with comments | « third_party/mojo/src/mojo/edk/system/data_pipe.h ('k') | third_party/mojo/src/mojo/edk/system/data_pipe_consumer_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698