Chromium Code Reviews| Index: mojo/edk/system/message_pipe_dispatcher.cc |
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
| index 6ebab7489a799f655ccadf5768ed5510073fba44..e61b31a3e0af24204643e4641ae2a0edb85f2f5e 100644 |
| --- a/mojo/edk/system/message_pipe_dispatcher.cc |
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc |
| @@ -25,10 +25,13 @@ namespace { |
| const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
| struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
| + bool transferable; |
| + bool write_error; |
| + uint64_t pipe_id; // If transferable is false. |
| + // The following members are only set if transferable is true. |
| // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
| // was closed. |
| size_t platform_handle_index; |
| - bool write_error; |
| size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
| uint32_t shared_memory_size; |
| @@ -90,7 +93,8 @@ MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
| const MojoCreateMessagePipeOptions* in_options, |
| MojoCreateMessagePipeOptions* out_options) { |
| const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
| - MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; |
| + MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
| + MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; |
| *out_options = kDefaultCreateOptions; |
| if (!in_options) |
| @@ -119,6 +123,7 @@ void MessagePipeDispatcher::Init( |
| char* serialized_write_buffer, size_t serialized_write_buffer_size, |
| std::vector<int>* serialized_read_fds, |
| std::vector<int>* serialized_write_fds) { |
| + CHECK(transferable_); |
| if (message_pipe.get().is_valid()) { |
| channel_ = RawChannel::Create(message_pipe.Pass()); |
| @@ -132,8 +137,14 @@ void MessagePipeDispatcher::Init( |
| } |
| } |
| +void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { |
| + CHECK(!transferable_); |
| + pipe_id_ = pipe_id; |
| +} |
| + |
| void MessagePipeDispatcher::InitOnIO() { |
| base::AutoLock locker(lock()); |
| + CHECK(transferable_); |
| calling_init_ = true; |
| if (channel_) |
| channel_->Init(this); |
| @@ -142,10 +153,23 @@ void MessagePipeDispatcher::InitOnIO() { |
| void MessagePipeDispatcher::CloseOnIO() { |
| base::AutoLock locker(lock()); |
| - |
| - if (channel_) { |
| - channel_->Shutdown(); |
| - channel_ = nullptr; |
| + if (transferable_) { |
| + if (channel_) { |
| + channel_->Shutdown(); |
| + channel_ = nullptr; |
| + } |
| + } else { |
| + if (non_transferable_state_ == CONNECT_CALLED) { |
| + // We can't cancel the pending request yet, since the other side of the |
| + // message pipe would want to get pending outgoing messages (if any) or |
| + // at least know that this end was closed. So keep this object alive until |
| + // then. |
| + AddRef(); |
| + } else if (non_transferable_state_ == CONNECTED) { |
| + internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| + non_transferable_state_ = CLOSED; |
| + channel_ = nullptr; |
| + } |
| } |
| } |
| @@ -153,6 +177,31 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| return Type::MESSAGE_PIPE; |
| } |
| +void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| + base::AutoLock locker(lock()); |
| + channel_ = channel; |
| + while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
| + channel_->WriteMessage( |
| + non_transferable_outgoing_message_queue_.GetMessage()); |
| + } |
| + |
| + if (is_closed()) { |
| + CHECK_EQ(non_transferable_state_, CONNECT_CALLED); |
| + // We kept this object alive until it's connected, we can release it now. |
| + // Since we're in a callback from the Broker, call it asynchronously. |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&Broker::CloseMessagePipe, |
| + base::Unretained(internal::g_broker), pipe_id_, |
| + base::Unretained(this))); |
| + non_transferable_state_ = CLOSED; |
| + channel_ = nullptr; |
| + base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
| + } else { |
| + non_transferable_state_ = CONNECTED; |
| + } |
| +} |
| + |
| #if defined(OS_WIN) |
| // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
| // best way we want to share this. |
| @@ -186,6 +235,14 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
| const SerializedMessagePipeHandleDispatcher* serialization = |
| static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
| + |
| + scoped_refptr<MessagePipeDispatcher> rv( |
| + new MessagePipeDispatcher(serialization->transferable)); |
| + if (!rv->transferable_) { |
| + rv->InitNonTransferable(serialization->pipe_id); |
| + return rv; |
| + } |
| + |
| if (serialization->shared_memory_size != |
| (serialization->serialized_read_buffer_size + |
| serialization->serialized_write_buffer_size + |
| @@ -233,8 +290,6 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
| } |
| } |
| - scoped_refptr<MessagePipeDispatcher> rv( |
| - Create(MessagePipeDispatcher::kDefaultCreateOptions)); |
| rv->write_error_ = serialization->write_error; |
| std::vector<int> serialized_read_fds; |
| @@ -333,14 +388,17 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
| return rv; |
| } |
| -MessagePipeDispatcher::MessagePipeDispatcher() |
| +MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
| : channel_(nullptr), |
| - serialized_(false), |
| serialized_read_fds_length_(0u), |
| serialized_write_fds_length_(0u), |
| serialized_message_fds_length_(0u), |
| + pipe_id_(0), |
| + non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
| + serialized_(false), |
| calling_init_(false), |
| - write_error_(false) { |
| + write_error_(false), |
| + transferable_(transferable) { |
| } |
| MessagePipeDispatcher::~MessagePipeDispatcher() { |
| @@ -369,6 +427,15 @@ void MessagePipeDispatcher::CloseImplNoLock() { |
| } |
| void MessagePipeDispatcher::SerializeInternal() { |
| + serialized_ = true; |
| + if (!transferable_) { |
| + CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| + << "Non transferable message pipe being sent after read/write. " |
| + << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
| + << "the pipe can be sent after it's read or written."; |
| + return; |
| + } |
| + |
| // We need to stop watching handle immediately, even though not on IO thread, |
| // so that other messages aren't read after this. |
| std::vector<int> serialized_read_fds, serialized_write_fds; |
| @@ -385,8 +452,6 @@ void MessagePipeDispatcher::SerializeInternal() { |
| serialized_write_fds.end()); |
| serialized_write_fds_length_ = serialized_write_fds.size(); |
| channel_ = nullptr; |
| - if (write_error) |
| - write_error = true; |
| } else { |
| // It's valid that the other side wrote some data and closed its end. |
| } |
| @@ -453,8 +518,6 @@ void MessagePipeDispatcher::SerializeInternal() { |
| for (size_t i = 0; i < dispatchers.size(); ++i) |
| dispatchers[i]->TransportEnded(); |
| } |
| - |
| - serialized_ = true; |
| } |
| scoped_refptr<Dispatcher> |
| @@ -466,18 +529,24 @@ MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| // TODO(vtl): Currently, there are no options, so we just use |
|
yzshen1
2015/12/03 23:37:50
I think this comment is not useful anymore.
jam
2015/12/04 05:06:47
Done.
|
| // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options |
| // too. |
| - scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); |
| - rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
| - serialized_message_queue_.swap(rv->serialized_message_queue_); |
| - serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| - serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
| - serialized_fds_.swap(rv->serialized_fds_); |
| - rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
| - rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
| - rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
| + scoped_refptr<MessagePipeDispatcher> rv( |
| + new MessagePipeDispatcher(transferable_)); |
| rv->serialized_ = true; |
| - rv->write_error_ = write_error_; |
| - return scoped_refptr<Dispatcher>(rv.get()); |
| + if (transferable_) { |
| + rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
| + serialized_message_queue_.swap(rv->serialized_message_queue_); |
| + serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| + serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
| + serialized_fds_.swap(rv->serialized_fds_); |
| + rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
| + rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
| + rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
| + rv->write_error_ = write_error_; |
| + } else { |
| + rv->pipe_id_ = pipe_id_; |
| + rv->non_transferable_state_ = non_transferable_state_; |
| + } |
| + return rv; |
| } |
| MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
| @@ -485,15 +554,17 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
| uint32_t num_bytes, |
| std::vector<DispatcherTransport>* transports, |
| MojoWriteMessageFlags flags) { |
| + lock().AssertAcquired(); |
| DCHECK(!transports || |
| (transports->size() > 0 && |
| transports->size() <= GetConfiguration().max_message_num_handles)); |
| - lock().AssertAcquired(); |
| - |
| - if (!channel_ || write_error_) |
| + if (write_error_ || |
| + (transferable_ && !channel_) || |
| + (!transferable_ && non_transferable_state_ == CLOSED)) { |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| + } |
| if (num_bytes > GetConfiguration().max_message_num_bytes) |
| return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| @@ -506,7 +577,17 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
| } |
| message->SerializeAndCloseDispatchers(); |
| - channel_->WriteMessage(message.Pass()); |
| + if (!transferable_) |
| + message->set_route_id(pipe_id_); |
| + if (!transferable_ && |
| + (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
| + non_transferable_state_ == CONNECT_CALLED)) { |
| + if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| + RequestNontransferableChannel(); |
| + non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); |
| + } else { |
| + channel_->WriteMessage(message.Pass()); |
| + } |
| return MOJO_RESULT_OK; |
| } |
| @@ -518,8 +599,14 @@ MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
| uint32_t* num_dispatchers, |
| MojoReadMessageFlags flags) { |
| lock().AssertAcquired(); |
| - if (channel_) |
| + if (channel_) { |
| channel_->EnsureLazyInitialized(); |
| + } else if (!transferable_ && |
| + non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| + RequestNontransferableChannel(); |
| + return MOJO_RESULT_SHOULD_WAIT; |
| + } |
| + |
| DCHECK(!dispatchers || dispatchers->empty()); |
| const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
| @@ -583,14 +670,21 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
| HandleSignalsState rv; |
| if (!message_queue_.IsEmpty()) |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| - if (channel_ || !message_queue_.IsEmpty()) |
| + if (!message_queue_.IsEmpty() || |
| + (transferable_ && channel_) || |
| + (!transferable_ && non_transferable_state_ != CLOSED)) |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| - if (channel_ && !write_error_) { |
| + if (!write_error_ && |
| + ((transferable_ && channel_) || |
| + (!transferable_ && non_transferable_state_ != CLOSED))) { |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| } |
| - if (!channel_ || write_error_) |
| + if (write_error_ || |
| + (transferable_ && !channel_) || |
| + (!transferable_ && non_transferable_state_ == CLOSED)) { |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| + } |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| return rv; |
| } |
| @@ -601,8 +695,13 @@ MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
| uintptr_t context, |
| HandleSignalsState* signals_state) { |
| lock().AssertAcquired(); |
| - if (channel_) |
| + if (channel_) { |
| channel_->EnsureLazyInitialized(); |
| + } else if (!transferable_ && |
| + non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| + RequestNontransferableChannel(); |
| + } |
| + |
| HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| if (state.satisfies(signals)) { |
| if (signals_state) |
| @@ -653,6 +752,8 @@ bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
| CloseImplNoLock(); |
| SerializedMessagePipeHandleDispatcher* serialization = |
| static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
| + serialization->transferable = transferable_; |
| + serialization->pipe_id = pipe_id_; |
| if (serialized_platform_handle_.is_valid()) { |
| serialization->platform_handle_index = platform_handles->size(); |
| platform_handles->push_back(serialized_platform_handle_.release()); |
| @@ -796,7 +897,18 @@ void MessagePipeDispatcher::OnError(Error error) { |
| // called, that is safe since this class always does a PostTask to the IO |
| // thread to self destruct. |
| if (channel_ && error != ERROR_WRITE) { |
| - channel_->Shutdown(); |
| + if (transferable_) { |
| + channel_->Shutdown(); |
| + } else { |
| + CHECK_NE(non_transferable_state_, CLOSED); |
| + // Since we're in a callback from the Broker, call it asynchronously. |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&Broker::CloseMessagePipe, |
| + base::Unretained(internal::g_broker), pipe_id_, |
| + base::Unretained(this))); |
| + non_transferable_state_ = CLOSED; |
| + } |
| channel_ = nullptr; |
| } |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| @@ -849,5 +961,19 @@ MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
| return MOJO_RESULT_OK; |
| } |
| +void MessagePipeDispatcher::RequestNontransferableChannel() { |
| + lock().AssertAcquired(); |
| + CHECK(!transferable_); |
| + CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
| + non_transferable_state_ = CONNECT_CALLED; |
| + |
| + // PostTask since the broker can call us back synchronously. |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&Broker::ConnectMessagePipe, |
| + base::Unretained(internal::g_broker), pipe_id_, |
| + base::Unretained(this))); |
| +} |
| + |
| } // namespace edk |
| } // namespace mojo |