| Index: mojo/edk/system/message_pipe_dispatcher.cc
|
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
|
| index 6ebab7489a799f655ccadf5768ed5510073fba44..045162f1cb63ae4dcc52e20d42f39150023f5721 100644
|
| --- a/mojo/edk/system/message_pipe_dispatcher.cc
|
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc
|
| @@ -25,10 +25,13 @@ namespace {
|
| const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
|
|
|
| struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
|
| + bool transferable;
|
| + bool write_error;
|
| + uint64_t pipe_id; // If transferable is false.
|
| + // The following members are only set if transferable is true.
|
| // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
|
| // was closed.
|
| size_t platform_handle_index;
|
| - bool write_error;
|
|
|
| size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
|
| uint32_t shared_memory_size;
|
| @@ -90,7 +93,8 @@ MojoResult MessagePipeDispatcher::ValidateCreateOptions(
|
| const MojoCreateMessagePipeOptions* in_options,
|
| MojoCreateMessagePipeOptions* out_options) {
|
| const MojoCreateMessagePipeOptionsFlags kKnownFlags =
|
| - MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
|
| + MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE |
|
| + MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
|
|
|
| *out_options = kDefaultCreateOptions;
|
| if (!in_options)
|
| @@ -119,6 +123,7 @@ void MessagePipeDispatcher::Init(
|
| char* serialized_write_buffer, size_t serialized_write_buffer_size,
|
| std::vector<int>* serialized_read_fds,
|
| std::vector<int>* serialized_write_fds) {
|
| + CHECK(transferable_);
|
| if (message_pipe.get().is_valid()) {
|
| channel_ = RawChannel::Create(message_pipe.Pass());
|
|
|
| @@ -132,8 +137,14 @@ void MessagePipeDispatcher::Init(
|
| }
|
| }
|
|
|
| +void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
|
| + CHECK(!transferable_);
|
| + pipe_id_ = pipe_id;
|
| +}
|
| +
|
| void MessagePipeDispatcher::InitOnIO() {
|
| base::AutoLock locker(lock());
|
| + CHECK(transferable_);
|
| calling_init_ = true;
|
| if (channel_)
|
| channel_->Init(this);
|
| @@ -142,10 +153,23 @@ void MessagePipeDispatcher::InitOnIO() {
|
|
|
| void MessagePipeDispatcher::CloseOnIO() {
|
| base::AutoLock locker(lock());
|
| -
|
| - if (channel_) {
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| + if (transferable_) {
|
| + if (channel_) {
|
| + channel_->Shutdown();
|
| + channel_ = nullptr;
|
| + }
|
| + } else {
|
| + if (non_transferable_state_ == CONNECT_CALLED) {
|
| + // We can't cancel the pending request yet, since the other side of the
|
| + // message pipe would want to get pending outgoing messages (if any) or
|
| + // at least know that this end was closed. So keep this object alive until
|
| + // then.
|
| + AddRef();
|
| + } else if (non_transferable_state_ == CONNECTED) {
|
| + internal::g_broker->CloseMessagePipe(pipe_id_, this);
|
| + non_transferable_state_ = CLOSED;
|
| + channel_ = nullptr;
|
| + }
|
| }
|
| }
|
|
|
| @@ -153,6 +177,31 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
|
| return Type::MESSAGE_PIPE;
|
| }
|
|
|
| +void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
|
| + base::AutoLock locker(lock());
|
| + channel_ = channel;
|
| + while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
|
| + channel_->WriteMessage(
|
| + non_transferable_outgoing_message_queue_.GetMessage());
|
| + }
|
| +
|
| + if (is_closed()) {
|
| + CHECK_EQ(non_transferable_state_, CONNECT_CALLED);
|
| + // We kept this object alive until it's connected, we can release it now.
|
| + // Since we're in a callback from the Broker, call it asynchronously.
|
| + internal::g_io_thread_task_runner->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&Broker::CloseMessagePipe,
|
| + base::Unretained(internal::g_broker), pipe_id_,
|
| + base::Unretained(this)));
|
| + non_transferable_state_ = CLOSED;
|
| + channel_ = nullptr;
|
| + base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
|
| + } else {
|
| + non_transferable_state_ = CONNECTED;
|
| + }
|
| +}
|
| +
|
| #if defined(OS_WIN)
|
| // TODO(jam): this is copied from RawChannelWin till I figure out what's the
|
| // best way we want to share this.
|
| @@ -186,6 +235,14 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
|
|
|
| const SerializedMessagePipeHandleDispatcher* serialization =
|
| static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
|
| +
|
| + scoped_refptr<MessagePipeDispatcher> rv(
|
| + new MessagePipeDispatcher(serialization->transferable));
|
| + if (!rv->transferable_) {
|
| + rv->InitNonTransferable(serialization->pipe_id);
|
| + return rv;
|
| + }
|
| +
|
| if (serialization->shared_memory_size !=
|
| (serialization->serialized_read_buffer_size +
|
| serialization->serialized_write_buffer_size +
|
| @@ -233,8 +290,6 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
|
| }
|
| }
|
|
|
| - scoped_refptr<MessagePipeDispatcher> rv(
|
| - Create(MessagePipeDispatcher::kDefaultCreateOptions));
|
| rv->write_error_ = serialization->write_error;
|
|
|
| std::vector<int> serialized_read_fds;
|
| @@ -269,8 +324,15 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
|
|
|
| while (message_queue_size) {
|
| size_t message_size;
|
| - CHECK(MessageInTransit::GetNextMessageSize(
|
| - message_queue_data, message_queue_size, &message_size));
|
| + if (!MessageInTransit::GetNextMessageSize(
|
| + message_queue_data, message_queue_size, &message_size)) {
|
| + NOTREACHED() << "Couldn't read message size from serialized data.";
|
| + return nullptr;
|
| + }
|
| + if (message_size > message_queue_size) {
|
| + NOTREACHED() << "Invalid serialized message size.";
|
| + return nullptr;
|
| + }
|
| MessageInTransit::View message_view(message_size, message_queue_data);
|
| message_queue_size -= message_size;
|
| message_queue_data += message_size;
|
| @@ -333,14 +395,17 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
|
| return rv;
|
| }
|
|
|
| -MessagePipeDispatcher::MessagePipeDispatcher()
|
| +MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
|
| : channel_(nullptr),
|
| - serialized_(false),
|
| serialized_read_fds_length_(0u),
|
| serialized_write_fds_length_(0u),
|
| serialized_message_fds_length_(0u),
|
| + pipe_id_(0),
|
| + non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
|
| + serialized_(false),
|
| calling_init_(false),
|
| - write_error_(false) {
|
| + write_error_(false),
|
| + transferable_(transferable) {
|
| }
|
|
|
| MessagePipeDispatcher::~MessagePipeDispatcher() {
|
| @@ -369,6 +434,15 @@ void MessagePipeDispatcher::CloseImplNoLock() {
|
| }
|
|
|
| void MessagePipeDispatcher::SerializeInternal() {
|
| + serialized_ = true;
|
| + if (!transferable_) {
|
| + CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
|
| + << "Non transferable message pipe being sent after read/write. "
|
| + << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
|
| + << "the pipe can be sent after it's read or written.";
|
| + return;
|
| + }
|
| +
|
| // We need to stop watching handle immediately, even though not on IO thread,
|
| // so that other messages aren't read after this.
|
| std::vector<int> serialized_read_fds, serialized_write_fds;
|
| @@ -385,8 +459,6 @@ void MessagePipeDispatcher::SerializeInternal() {
|
| serialized_write_fds.end());
|
| serialized_write_fds_length_ = serialized_write_fds.size();
|
| channel_ = nullptr;
|
| - if (write_error)
|
| - write_error = true;
|
| } else {
|
| // It's valid that the other side wrote some data and closed its end.
|
| }
|
| @@ -441,20 +513,18 @@ void MessagePipeDispatcher::SerializeInternal() {
|
| all_platform_handles->at(i) = PlatformHandle();
|
| }
|
| #endif
|
| + }
|
|
|
| serialized_message_queue_.insert(
|
| serialized_message_queue_.end(),
|
| static_cast<const char*>(message->transport_data()->buffer()),
|
| static_cast<const char*>(message->transport_data()->buffer()) +
|
| transport_data_buffer_size);
|
| - }
|
| }
|
|
|
| for (size_t i = 0; i < dispatchers.size(); ++i)
|
| dispatchers[i]->TransportEnded();
|
| }
|
| -
|
| - serialized_ = true;
|
| }
|
|
|
| scoped_refptr<Dispatcher>
|
| @@ -463,21 +533,24 @@ MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
|
|
|
| SerializeInternal();
|
|
|
| - // TODO(vtl): Currently, there are no options, so we just use
|
| - // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
|
| - // too.
|
| - scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
|
| - rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
|
| - serialized_message_queue_.swap(rv->serialized_message_queue_);
|
| - serialized_read_buffer_.swap(rv->serialized_read_buffer_);
|
| - serialized_write_buffer_.swap(rv->serialized_write_buffer_);
|
| - serialized_fds_.swap(rv->serialized_fds_);
|
| - rv->serialized_read_fds_length_ = serialized_read_fds_length_;
|
| - rv->serialized_write_fds_length_ = serialized_write_fds_length_;
|
| - rv->serialized_message_fds_length_ = serialized_message_fds_length_;
|
| + scoped_refptr<MessagePipeDispatcher> rv(
|
| + new MessagePipeDispatcher(transferable_));
|
| rv->serialized_ = true;
|
| - rv->write_error_ = write_error_;
|
| - return scoped_refptr<Dispatcher>(rv.get());
|
| + if (transferable_) {
|
| + rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
|
| + serialized_message_queue_.swap(rv->serialized_message_queue_);
|
| + serialized_read_buffer_.swap(rv->serialized_read_buffer_);
|
| + serialized_write_buffer_.swap(rv->serialized_write_buffer_);
|
| + serialized_fds_.swap(rv->serialized_fds_);
|
| + rv->serialized_read_fds_length_ = serialized_read_fds_length_;
|
| + rv->serialized_write_fds_length_ = serialized_write_fds_length_;
|
| + rv->serialized_message_fds_length_ = serialized_message_fds_length_;
|
| + rv->write_error_ = write_error_;
|
| + } else {
|
| + rv->pipe_id_ = pipe_id_;
|
| + rv->non_transferable_state_ = non_transferable_state_;
|
| + }
|
| + return rv;
|
| }
|
|
|
| MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
|
| @@ -485,15 +558,17 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
|
| uint32_t num_bytes,
|
| std::vector<DispatcherTransport>* transports,
|
| MojoWriteMessageFlags flags) {
|
| + lock().AssertAcquired();
|
|
|
| DCHECK(!transports ||
|
| (transports->size() > 0 &&
|
| transports->size() <= GetConfiguration().max_message_num_handles));
|
|
|
| - lock().AssertAcquired();
|
| -
|
| - if (!channel_ || write_error_)
|
| + if (write_error_ ||
|
| + (transferable_ && !channel_) ||
|
| + (!transferable_ && non_transferable_state_ == CLOSED)) {
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
| + }
|
|
|
| if (num_bytes > GetConfiguration().max_message_num_bytes)
|
| return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| @@ -506,7 +581,17 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
|
| }
|
|
|
| message->SerializeAndCloseDispatchers();
|
| - channel_->WriteMessage(message.Pass());
|
| + if (!transferable_)
|
| + message->set_route_id(pipe_id_);
|
| + if (!transferable_ &&
|
| + (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
|
| + non_transferable_state_ == CONNECT_CALLED)) {
|
| + if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
|
| + RequestNontransferableChannel();
|
| + non_transferable_outgoing_message_queue_.AddMessage(message.Pass());
|
| + } else {
|
| + channel_->WriteMessage(message.Pass());
|
| + }
|
|
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -518,8 +603,17 @@ MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
|
| uint32_t* num_dispatchers,
|
| MojoReadMessageFlags flags) {
|
| lock().AssertAcquired();
|
| - if (channel_)
|
| + if (channel_) {
|
| channel_->EnsureLazyInitialized();
|
| + } else if (!transferable_) {
|
| + if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
|
| + RequestNontransferableChannel();
|
| + return MOJO_RESULT_SHOULD_WAIT;
|
| + } else if (non_transferable_state_ == CONNECT_CALLED) {
|
| + return MOJO_RESULT_SHOULD_WAIT;
|
| + }
|
| + }
|
| +
|
| DCHECK(!dispatchers || dispatchers->empty());
|
|
|
| const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
|
| @@ -583,14 +677,21 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
|
| HandleSignalsState rv;
|
| if (!message_queue_.IsEmpty())
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| - if (channel_ || !message_queue_.IsEmpty())
|
| + if (!message_queue_.IsEmpty() ||
|
| + (transferable_ && channel_) ||
|
| + (!transferable_ && non_transferable_state_ != CLOSED))
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| - if (channel_ && !write_error_) {
|
| + if (!write_error_ &&
|
| + ((transferable_ && channel_) ||
|
| + (!transferable_ && non_transferable_state_ != CLOSED))) {
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| }
|
| - if (!channel_ || write_error_)
|
| + if (write_error_ ||
|
| + (transferable_ && !channel_) ||
|
| + (!transferable_ && non_transferable_state_ == CLOSED)) {
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + }
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| return rv;
|
| }
|
| @@ -601,8 +702,13 @@ MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
|
| uintptr_t context,
|
| HandleSignalsState* signals_state) {
|
| lock().AssertAcquired();
|
| - if (channel_)
|
| + if (channel_) {
|
| channel_->EnsureLazyInitialized();
|
| + } else if (!transferable_ &&
|
| + non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
|
| + RequestNontransferableChannel();
|
| + }
|
| +
|
| HandleSignalsState state = GetHandleSignalsStateImplNoLock();
|
| if (state.satisfies(signals)) {
|
| if (signals_state)
|
| @@ -653,6 +759,8 @@ bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
|
| CloseImplNoLock();
|
| SerializedMessagePipeHandleDispatcher* serialization =
|
| static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
|
| + serialization->transferable = transferable_;
|
| + serialization->pipe_id = pipe_id_;
|
| if (serialized_platform_handle_.is_valid()) {
|
| serialization->platform_handle_index = platform_handles->size();
|
| platform_handles->push_back(serialized_platform_handle_.release());
|
| @@ -796,7 +904,18 @@ void MessagePipeDispatcher::OnError(Error error) {
|
| // called, that is safe since this class always does a PostTask to the IO
|
| // thread to self destruct.
|
| if (channel_ && error != ERROR_WRITE) {
|
| - channel_->Shutdown();
|
| + if (transferable_) {
|
| + channel_->Shutdown();
|
| + } else {
|
| + CHECK_NE(non_transferable_state_, CLOSED);
|
| + // Since we're in a callback from the Broker, call it asynchronously.
|
| + internal::g_io_thread_task_runner->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&Broker::CloseMessagePipe,
|
| + base::Unretained(internal::g_broker), pipe_id_,
|
| + base::Unretained(this)));
|
| + non_transferable_state_ = CLOSED;
|
| + }
|
| channel_ = nullptr;
|
| }
|
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| @@ -824,10 +943,14 @@ MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
|
| if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
|
| MessagePipeDispatcher* mp =
|
| static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
|
| - if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
|
| + if (transferable_ && mp->transferable_ &&
|
| + channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
|
| // The other case should have been disallowed by |Core|. (Note: |port|
|
| // is the peer port of the handle given to |WriteMessage()|.)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| + } else if (!transferable_ && !mp->transferable_ &&
|
| + pipe_id_ == mp->pipe_id_) {
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
| }
|
| }
|
| @@ -849,5 +972,19 @@ MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| +void MessagePipeDispatcher::RequestNontransferableChannel() {
|
| + lock().AssertAcquired();
|
| + CHECK(!transferable_);
|
| + CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
|
| + non_transferable_state_ = CONNECT_CALLED;
|
| +
|
| + // PostTask since the broker can call us back synchronously.
|
| + internal::g_io_thread_task_runner->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&Broker::ConnectMessagePipe,
|
| + base::Unretained(internal::g_broker), pipe_id_,
|
| + base::Unretained(this)));
|
| +}
|
| +
|
| } // namespace edk
|
| } // namespace mojo
|
|
|