Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
index 6ebab7489a799f655ccadf5768ed5510073fba44..e61b31a3e0af24204643e4641ae2a0edb85f2f5e 100644 |
--- a/mojo/edk/system/message_pipe_dispatcher.cc |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -25,10 +25,13 @@ namespace { |
const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
+ bool transferable; |
+ bool write_error; |
+ uint64_t pipe_id; // If transferable is false. |
+ // The following members are only set if transferable is true. |
// Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
// was closed. |
size_t platform_handle_index; |
- bool write_error; |
size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
uint32_t shared_memory_size; |
@@ -90,7 +93,8 @@ MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
const MojoCreateMessagePipeOptions* in_options, |
MojoCreateMessagePipeOptions* out_options) { |
const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
- MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; |
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; |
*out_options = kDefaultCreateOptions; |
if (!in_options) |
@@ -119,6 +123,7 @@ void MessagePipeDispatcher::Init( |
char* serialized_write_buffer, size_t serialized_write_buffer_size, |
std::vector<int>* serialized_read_fds, |
std::vector<int>* serialized_write_fds) { |
+ CHECK(transferable_); |
if (message_pipe.get().is_valid()) { |
channel_ = RawChannel::Create(message_pipe.Pass()); |
@@ -132,8 +137,14 @@ void MessagePipeDispatcher::Init( |
} |
} |
+void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { |
+ CHECK(!transferable_); |
+ pipe_id_ = pipe_id; |
+} |
+ |
void MessagePipeDispatcher::InitOnIO() { |
base::AutoLock locker(lock()); |
+ CHECK(transferable_); |
calling_init_ = true; |
if (channel_) |
channel_->Init(this); |
@@ -142,10 +153,23 @@ void MessagePipeDispatcher::InitOnIO() { |
void MessagePipeDispatcher::CloseOnIO() { |
base::AutoLock locker(lock()); |
- |
- if (channel_) { |
- channel_->Shutdown(); |
- channel_ = nullptr; |
+ if (transferable_) { |
+ if (channel_) { |
+ channel_->Shutdown(); |
+ channel_ = nullptr; |
+ } |
+ } else { |
+ if (non_transferable_state_ == CONNECT_CALLED) { |
+ // We can't cancel the pending request yet, since the other side of the |
+ // message pipe would want to get pending outgoing messages (if any) or |
+ // at least know that this end was closed. So keep this object alive until |
+ // then. |
+ AddRef(); |
+ } else if (non_transferable_state_ == CONNECTED) { |
+ internal::g_broker->CloseMessagePipe(pipe_id_, this); |
+ non_transferable_state_ = CLOSED; |
+ channel_ = nullptr; |
+ } |
} |
} |
@@ -153,6 +177,31 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const { |
return Type::MESSAGE_PIPE; |
} |
+void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
+ base::AutoLock locker(lock()); |
+ channel_ = channel; |
+ while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
+ channel_->WriteMessage( |
+ non_transferable_outgoing_message_queue_.GetMessage()); |
+ } |
+ |
+ if (is_closed()) { |
+ CHECK_EQ(non_transferable_state_, CONNECT_CALLED); |
+ // We kept this object alive until it's connected, we can release it now. |
+ // Since we're in a callback from the Broker, call it asynchronously. |
+ internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&Broker::CloseMessagePipe, |
+ base::Unretained(internal::g_broker), pipe_id_, |
+ base::Unretained(this))); |
+ non_transferable_state_ = CLOSED; |
+ channel_ = nullptr; |
+ base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
+ } else { |
+ non_transferable_state_ = CONNECTED; |
+ } |
+} |
+ |
#if defined(OS_WIN) |
// TODO(jam): this is copied from RawChannelWin till I figure out what's the |
// best way we want to share this. |
@@ -186,6 +235,14 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
const SerializedMessagePipeHandleDispatcher* serialization = |
static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
+ |
+ scoped_refptr<MessagePipeDispatcher> rv( |
+ new MessagePipeDispatcher(serialization->transferable)); |
+ if (!rv->transferable_) { |
+ rv->InitNonTransferable(serialization->pipe_id); |
+ return rv; |
+ } |
+ |
if (serialization->shared_memory_size != |
(serialization->serialized_read_buffer_size + |
serialization->serialized_write_buffer_size + |
@@ -233,8 +290,6 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
} |
} |
- scoped_refptr<MessagePipeDispatcher> rv( |
- Create(MessagePipeDispatcher::kDefaultCreateOptions)); |
rv->write_error_ = serialization->write_error; |
std::vector<int> serialized_read_fds; |
@@ -333,14 +388,17 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
return rv; |
} |
-MessagePipeDispatcher::MessagePipeDispatcher() |
+MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
: channel_(nullptr), |
- serialized_(false), |
serialized_read_fds_length_(0u), |
serialized_write_fds_length_(0u), |
serialized_message_fds_length_(0u), |
+ pipe_id_(0), |
+ non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
+ serialized_(false), |
calling_init_(false), |
- write_error_(false) { |
+ write_error_(false), |
+ transferable_(transferable) { |
} |
MessagePipeDispatcher::~MessagePipeDispatcher() { |
@@ -369,6 +427,15 @@ void MessagePipeDispatcher::CloseImplNoLock() { |
} |
void MessagePipeDispatcher::SerializeInternal() { |
+ serialized_ = true; |
+ if (!transferable_) { |
+ CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
+ << "Non transferable message pipe being sent after read/write. " |
+ << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
+ << "the pipe can be sent after it's read or written."; |
+ return; |
+ } |
+ |
// We need to stop watching handle immediately, even though not on IO thread, |
// so that other messages aren't read after this. |
std::vector<int> serialized_read_fds, serialized_write_fds; |
@@ -385,8 +452,6 @@ void MessagePipeDispatcher::SerializeInternal() { |
serialized_write_fds.end()); |
serialized_write_fds_length_ = serialized_write_fds.size(); |
channel_ = nullptr; |
- if (write_error) |
- write_error = true; |
} else { |
// It's valid that the other side wrote some data and closed its end. |
} |
@@ -453,8 +518,6 @@ void MessagePipeDispatcher::SerializeInternal() { |
for (size_t i = 0; i < dispatchers.size(); ++i) |
dispatchers[i]->TransportEnded(); |
} |
- |
- serialized_ = true; |
} |
scoped_refptr<Dispatcher> |
@@ -466,18 +529,24 @@ MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
// TODO(vtl): Currently, there are no options, so we just use |
yzshen1
2015/12/03 23:37:50
I think this comment is not useful anymore.
jam
2015/12/04 05:06:47
Done.
|
// |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options |
// too. |
- scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); |
- rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
- serialized_message_queue_.swap(rv->serialized_message_queue_); |
- serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
- serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
- serialized_fds_.swap(rv->serialized_fds_); |
- rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
- rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
- rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
+ scoped_refptr<MessagePipeDispatcher> rv( |
+ new MessagePipeDispatcher(transferable_)); |
rv->serialized_ = true; |
- rv->write_error_ = write_error_; |
- return scoped_refptr<Dispatcher>(rv.get()); |
+ if (transferable_) { |
+ rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
+ serialized_message_queue_.swap(rv->serialized_message_queue_); |
+ serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
+ serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
+ serialized_fds_.swap(rv->serialized_fds_); |
+ rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
+ rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
+ rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
+ rv->write_error_ = write_error_; |
+ } else { |
+ rv->pipe_id_ = pipe_id_; |
+ rv->non_transferable_state_ = non_transferable_state_; |
+ } |
+ return rv; |
} |
MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
@@ -485,15 +554,17 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
uint32_t num_bytes, |
std::vector<DispatcherTransport>* transports, |
MojoWriteMessageFlags flags) { |
+ lock().AssertAcquired(); |
DCHECK(!transports || |
(transports->size() > 0 && |
transports->size() <= GetConfiguration().max_message_num_handles)); |
- lock().AssertAcquired(); |
- |
- if (!channel_ || write_error_) |
+ if (write_error_ || |
+ (transferable_ && !channel_) || |
+ (!transferable_ && non_transferable_state_ == CLOSED)) { |
return MOJO_RESULT_FAILED_PRECONDITION; |
+ } |
if (num_bytes > GetConfiguration().max_message_num_bytes) |
return MOJO_RESULT_RESOURCE_EXHAUSTED; |
@@ -506,7 +577,17 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
} |
message->SerializeAndCloseDispatchers(); |
- channel_->WriteMessage(message.Pass()); |
+ if (!transferable_) |
+ message->set_route_id(pipe_id_); |
+ if (!transferable_ && |
+ (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
+ non_transferable_state_ == CONNECT_CALLED)) { |
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
+ RequestNontransferableChannel(); |
+ non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); |
+ } else { |
+ channel_->WriteMessage(message.Pass()); |
+ } |
return MOJO_RESULT_OK; |
} |
@@ -518,8 +599,14 @@ MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
uint32_t* num_dispatchers, |
MojoReadMessageFlags flags) { |
lock().AssertAcquired(); |
- if (channel_) |
+ if (channel_) { |
channel_->EnsureLazyInitialized(); |
+ } else if (!transferable_ && |
+ non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
+ RequestNontransferableChannel(); |
+ return MOJO_RESULT_SHOULD_WAIT; |
+ } |
+ |
DCHECK(!dispatchers || dispatchers->empty()); |
const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
@@ -583,14 +670,21 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
HandleSignalsState rv; |
if (!message_queue_.IsEmpty()) |
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
- if (channel_ || !message_queue_.IsEmpty()) |
+ if (!message_queue_.IsEmpty() || |
+ (transferable_ && channel_) || |
+ (!transferable_ && non_transferable_state_ != CLOSED)) |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
- if (channel_ && !write_error_) { |
+ if (!write_error_ && |
+ ((transferable_ && channel_) || |
+ (!transferable_ && non_transferable_state_ != CLOSED))) { |
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
} |
- if (!channel_ || write_error_) |
+ if (write_error_ || |
+ (transferable_ && !channel_) || |
+ (!transferable_ && non_transferable_state_ == CLOSED)) { |
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ } |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
return rv; |
} |
@@ -601,8 +695,13 @@ MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
uintptr_t context, |
HandleSignalsState* signals_state) { |
lock().AssertAcquired(); |
- if (channel_) |
+ if (channel_) { |
channel_->EnsureLazyInitialized(); |
+ } else if (!transferable_ && |
+ non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
+ RequestNontransferableChannel(); |
+ } |
+ |
HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
if (state.satisfies(signals)) { |
if (signals_state) |
@@ -653,6 +752,8 @@ bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
CloseImplNoLock(); |
SerializedMessagePipeHandleDispatcher* serialization = |
static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
+ serialization->transferable = transferable_; |
+ serialization->pipe_id = pipe_id_; |
if (serialized_platform_handle_.is_valid()) { |
serialization->platform_handle_index = platform_handles->size(); |
platform_handles->push_back(serialized_platform_handle_.release()); |
@@ -796,7 +897,18 @@ void MessagePipeDispatcher::OnError(Error error) { |
// called, that is safe since this class always does a PostTask to the IO |
// thread to self destruct. |
if (channel_ && error != ERROR_WRITE) { |
- channel_->Shutdown(); |
+ if (transferable_) { |
+ channel_->Shutdown(); |
+ } else { |
+ CHECK_NE(non_transferable_state_, CLOSED); |
+ // Since we're in a callback from the Broker, call it asynchronously. |
+ internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&Broker::CloseMessagePipe, |
+ base::Unretained(internal::g_broker), pipe_id_, |
+ base::Unretained(this))); |
+ non_transferable_state_ = CLOSED; |
+ } |
channel_ = nullptr; |
} |
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
@@ -849,5 +961,19 @@ MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
return MOJO_RESULT_OK; |
} |
+void MessagePipeDispatcher::RequestNontransferableChannel() { |
+ lock().AssertAcquired(); |
+ CHECK(!transferable_); |
+ CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
+ non_transferable_state_ = CONNECT_CALLED; |
+ |
+ // PostTask since the broker can call us back synchronously. |
+ internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&Broker::ConnectMessagePipe, |
+ base::Unretained(internal::g_broker), pipe_id_, |
+ base::Unretained(this))); |
+} |
+ |
} // namespace edk |
} // namespace mojo |