Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(695)

Unified Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tsepez review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/message_pipe_perftest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/message_pipe_dispatcher.cc
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index 6ebab7489a799f655ccadf5768ed5510073fba44..55e03fa236d09d2e8f28ab75813c656607617245 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -25,10 +25,13 @@ namespace {
const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
+ bool transferable;
+ bool write_error;
+ uint64_t pipe_id; // If transferable is false.
+ // The following members are only set if transferable is true.
// Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
// was closed.
size_t platform_handle_index;
- bool write_error;
size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
uint32_t shared_memory_size;
@@ -90,7 +93,8 @@ MojoResult MessagePipeDispatcher::ValidateCreateOptions(
const MojoCreateMessagePipeOptions* in_options,
MojoCreateMessagePipeOptions* out_options) {
const MojoCreateMessagePipeOptionsFlags kKnownFlags =
- MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE |
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
*out_options = kDefaultCreateOptions;
if (!in_options)
@@ -119,6 +123,7 @@ void MessagePipeDispatcher::Init(
char* serialized_write_buffer, size_t serialized_write_buffer_size,
std::vector<int>* serialized_read_fds,
std::vector<int>* serialized_write_fds) {
+ CHECK(transferable_);
if (message_pipe.get().is_valid()) {
channel_ = RawChannel::Create(message_pipe.Pass());
@@ -132,8 +137,14 @@ void MessagePipeDispatcher::Init(
}
}
+void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
+ CHECK(!transferable_);
+ pipe_id_ = pipe_id;
+}
+
void MessagePipeDispatcher::InitOnIO() {
base::AutoLock locker(lock());
+ CHECK(transferable_);
calling_init_ = true;
if (channel_)
channel_->Init(this);
@@ -142,10 +153,28 @@ void MessagePipeDispatcher::InitOnIO() {
void MessagePipeDispatcher::CloseOnIO() {
base::AutoLock locker(lock());
-
- if (channel_) {
- channel_->Shutdown();
- channel_ = nullptr;
+ if (transferable_) {
+ if (channel_) {
+ channel_->Shutdown();
+ channel_ = nullptr;
+ }
+ } else {
+ if (non_transferable_state_ == CONNECT_CALLED ||
+ non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
+ RequestNontransferableChannel();
+
+ // We can't cancel the pending request yet, since the other side of the
+ // message pipe would want to get pending outgoing messages (if any) or
+ // at least know that this end was closed. So keep this object alive until
+ // then.
+ non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
+ AddRef();
+ } else if (non_transferable_state_ == CONNECTED) {
+ internal::g_broker->CloseMessagePipe(pipe_id_, this);
+ non_transferable_state_ = CLOSED;
+ channel_ = nullptr;
+ }
}
}
@@ -153,6 +182,30 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
return Type::MESSAGE_PIPE;
}
+void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
+ base::AutoLock locker(lock());
+ channel_ = channel;
+ while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
+ channel_->WriteMessage(
+ non_transferable_outgoing_message_queue_.GetMessage());
+ }
+
+ if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
+ // We kept this object alive until it's connected, we can release it now.
+ // Since we're in a callback from the Broker, call it asynchronously.
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&Broker::CloseMessagePipe,
+ base::Unretained(internal::g_broker), pipe_id_,
+ base::Unretained(this)));
+ non_transferable_state_ = CLOSED;
+ channel_ = nullptr;
+ base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
+ } else {
+ non_transferable_state_ = CONNECTED;
+ }
+}
+
#if defined(OS_WIN)
// TODO(jam): this is copied from RawChannelWin till I figure out what's the
// best way we want to share this.
@@ -186,6 +239,14 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
const SerializedMessagePipeHandleDispatcher* serialization =
static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
+
+ scoped_refptr<MessagePipeDispatcher> rv(
+ new MessagePipeDispatcher(serialization->transferable));
+ if (!rv->transferable_) {
+ rv->InitNonTransferable(serialization->pipe_id);
+ return rv;
+ }
+
if (serialization->shared_memory_size !=
(serialization->serialized_read_buffer_size +
serialization->serialized_write_buffer_size +
@@ -233,8 +294,6 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
}
}
- scoped_refptr<MessagePipeDispatcher> rv(
- Create(MessagePipeDispatcher::kDefaultCreateOptions));
rv->write_error_ = serialization->write_error;
std::vector<int> serialized_read_fds;
@@ -269,8 +328,15 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
while (message_queue_size) {
size_t message_size;
- CHECK(MessageInTransit::GetNextMessageSize(
- message_queue_data, message_queue_size, &message_size));
+ if (!MessageInTransit::GetNextMessageSize(
+ message_queue_data, message_queue_size, &message_size)) {
+ NOTREACHED() << "Couldn't read message size from serialized data.";
+ return nullptr;
+ }
+ if (message_size > message_queue_size) {
+ NOTREACHED() << "Invalid serialized message size.";
+ return nullptr;
+ }
MessageInTransit::View message_view(message_size, message_queue_data);
message_queue_size -= message_size;
message_queue_data += message_size;
@@ -333,14 +399,17 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
return rv;
}
-MessagePipeDispatcher::MessagePipeDispatcher()
+MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
: channel_(nullptr),
- serialized_(false),
serialized_read_fds_length_(0u),
serialized_write_fds_length_(0u),
serialized_message_fds_length_(0u),
+ pipe_id_(0),
+ non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
+ serialized_(false),
calling_init_(false),
- write_error_(false) {
+ write_error_(false),
+ transferable_(transferable) {
}
MessagePipeDispatcher::~MessagePipeDispatcher() {
@@ -369,6 +438,16 @@ void MessagePipeDispatcher::CloseImplNoLock() {
}
void MessagePipeDispatcher::SerializeInternal() {
+ serialized_ = true;
+ if (!transferable_) {
+ CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
+ << "Non transferable message pipe being sent after read/write. "
+ << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
+ << "the pipe can be sent after it's read or written.";
+ non_transferable_state_ = SERIALISED;
+ return;
+ }
+
// We need to stop watching handle immediately, even though not on IO thread,
// so that other messages aren't read after this.
std::vector<int> serialized_read_fds, serialized_write_fds;
@@ -385,8 +464,6 @@ void MessagePipeDispatcher::SerializeInternal() {
serialized_write_fds.end());
serialized_write_fds_length_ = serialized_write_fds.size();
channel_ = nullptr;
- if (write_error)
- write_error = true;
} else {
// It's valid that the other side wrote some data and closed its end.
}
@@ -441,20 +518,18 @@ void MessagePipeDispatcher::SerializeInternal() {
all_platform_handles->at(i) = PlatformHandle();
}
#endif
+ }
serialized_message_queue_.insert(
serialized_message_queue_.end(),
static_cast<const char*>(message->transport_data()->buffer()),
static_cast<const char*>(message->transport_data()->buffer()) +
transport_data_buffer_size);
- }
}
for (size_t i = 0; i < dispatchers.size(); ++i)
dispatchers[i]->TransportEnded();
}
-
- serialized_ = true;
}
scoped_refptr<Dispatcher>
@@ -463,21 +538,24 @@ MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
SerializeInternal();
- // TODO(vtl): Currently, there are no options, so we just use
- // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
- // too.
- scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
- rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
- serialized_message_queue_.swap(rv->serialized_message_queue_);
- serialized_read_buffer_.swap(rv->serialized_read_buffer_);
- serialized_write_buffer_.swap(rv->serialized_write_buffer_);
- serialized_fds_.swap(rv->serialized_fds_);
- rv->serialized_read_fds_length_ = serialized_read_fds_length_;
- rv->serialized_write_fds_length_ = serialized_write_fds_length_;
- rv->serialized_message_fds_length_ = serialized_message_fds_length_;
+ scoped_refptr<MessagePipeDispatcher> rv(
+ new MessagePipeDispatcher(transferable_));
rv->serialized_ = true;
- rv->write_error_ = write_error_;
- return scoped_refptr<Dispatcher>(rv.get());
+ if (transferable_) {
+ rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
+ serialized_message_queue_.swap(rv->serialized_message_queue_);
+ serialized_read_buffer_.swap(rv->serialized_read_buffer_);
+ serialized_write_buffer_.swap(rv->serialized_write_buffer_);
+ serialized_fds_.swap(rv->serialized_fds_);
+ rv->serialized_read_fds_length_ = serialized_read_fds_length_;
+ rv->serialized_write_fds_length_ = serialized_write_fds_length_;
+ rv->serialized_message_fds_length_ = serialized_message_fds_length_;
+ rv->write_error_ = write_error_;
+ } else {
+ rv->pipe_id_ = pipe_id_;
+ rv->non_transferable_state_ = non_transferable_state_;
+ }
+ return rv;
}
MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
@@ -485,15 +563,17 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
uint32_t num_bytes,
std::vector<DispatcherTransport>* transports,
MojoWriteMessageFlags flags) {
+ lock().AssertAcquired();
DCHECK(!transports ||
(transports->size() > 0 &&
transports->size() <= GetConfiguration().max_message_num_handles));
- lock().AssertAcquired();
-
- if (!channel_ || write_error_)
+ if (write_error_ ||
+ (transferable_ && !channel_) ||
+ (!transferable_ && non_transferable_state_ == CLOSED)) {
return MOJO_RESULT_FAILED_PRECONDITION;
+ }
if (num_bytes > GetConfiguration().max_message_num_bytes)
return MOJO_RESULT_RESOURCE_EXHAUSTED;
@@ -506,7 +586,17 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
}
message->SerializeAndCloseDispatchers();
- channel_->WriteMessage(message.Pass());
+ if (!transferable_)
+ message->set_route_id(pipe_id_);
+ if (!transferable_ &&
+ (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
+ non_transferable_state_ == CONNECT_CALLED)) {
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
+ RequestNontransferableChannel();
+ non_transferable_outgoing_message_queue_.AddMessage(message.Pass());
+ } else {
+ channel_->WriteMessage(message.Pass());
+ }
return MOJO_RESULT_OK;
}
@@ -518,8 +608,17 @@ MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
uint32_t* num_dispatchers,
MojoReadMessageFlags flags) {
lock().AssertAcquired();
- if (channel_)
+ if (channel_) {
channel_->EnsureLazyInitialized();
+ } else if (!transferable_) {
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
+ RequestNontransferableChannel();
+ return MOJO_RESULT_SHOULD_WAIT;
+ } else if (non_transferable_state_ == CONNECT_CALLED) {
+ return MOJO_RESULT_SHOULD_WAIT;
+ }
+ }
+
DCHECK(!dispatchers || dispatchers->empty());
const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
@@ -583,14 +682,22 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
HandleSignalsState rv;
if (!message_queue_.IsEmpty())
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
- if (channel_ || !message_queue_.IsEmpty())
+ if (!message_queue_.IsEmpty() ||
+ (transferable_ && channel_) ||
+ (!transferable_ && non_transferable_state_ != CLOSED))
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
- if (channel_ && !write_error_) {
+ if (!write_error_ &&
+ ((transferable_ && channel_) ||
+ (!transferable_ && non_transferable_state_ != CLOSED))) {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
}
- if (!channel_ || write_error_)
+ if (write_error_ ||
+ (transferable_ && !channel_) ||
+ (!transferable_ &&
+ ((non_transferable_state_ == CLOSED) || is_closed()))) {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ }
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
return rv;
}
@@ -601,8 +708,13 @@ MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
uintptr_t context,
HandleSignalsState* signals_state) {
lock().AssertAcquired();
- if (channel_)
+ if (channel_) {
channel_->EnsureLazyInitialized();
+ } else if (!transferable_ &&
+ non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
+ RequestNontransferableChannel();
+ }
+
HandleSignalsState state = GetHandleSignalsStateImplNoLock();
if (state.satisfies(signals)) {
if (signals_state)
@@ -653,6 +765,8 @@ bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
CloseImplNoLock();
SerializedMessagePipeHandleDispatcher* serialization =
static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
+ serialization->transferable = transferable_;
+ serialization->pipe_id = pipe_id_;
if (serialized_platform_handle_.is_valid()) {
serialization->platform_handle_index = platform_handles->size();
platform_handles->push_back(serialized_platform_handle_.release());
@@ -796,7 +910,18 @@ void MessagePipeDispatcher::OnError(Error error) {
// called, that is safe since this class always does a PostTask to the IO
// thread to self destruct.
if (channel_ && error != ERROR_WRITE) {
- channel_->Shutdown();
+ if (transferable_) {
+ channel_->Shutdown();
+ } else {
+ CHECK_NE(non_transferable_state_, CLOSED);
+ // Since we're in a callback from the Broker, call it asynchronously.
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&Broker::CloseMessagePipe,
+ base::Unretained(internal::g_broker), pipe_id_,
+ base::Unretained(this)));
+ non_transferable_state_ = CLOSED;
+ }
channel_ = nullptr;
}
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
@@ -824,10 +949,14 @@ MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
MessagePipeDispatcher* mp =
static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
- if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
+ if (transferable_ && mp->transferable_ &&
+ channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
// The other case should have been disallowed by |Core|. (Note: |port|
// is the peer port of the handle given to |WriteMessage()|.)
return MOJO_RESULT_INVALID_ARGUMENT;
+ } else if (!transferable_ && !mp->transferable_ &&
+ pipe_id_ == mp->pipe_id_) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
}
}
}
@@ -849,5 +978,19 @@ MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
return MOJO_RESULT_OK;
}
+void MessagePipeDispatcher::RequestNontransferableChannel() {
+ lock().AssertAcquired();
+ CHECK(!transferable_);
+ CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
+ non_transferable_state_ = CONNECT_CALLED;
+
+ // PostTask since the broker can call us back synchronously.
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&Broker::ConnectMessagePipe,
+ base::Unretained(internal::g_broker), pipe_id_,
+ base::Unretained(this)));
+}
+
} // namespace edk
} // namespace mojo
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/message_pipe_perftest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698