| Index: mojo/edk/system/message_pipe_dispatcher.cc
|
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
|
| index 687710cf94e78aed7d2fafa2a2971b8e92c749ee..d26c912c6e3e59e3975c2a87103f3bcbbecec118 100644
|
| --- a/mojo/edk/system/message_pipe_dispatcher.cc
|
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc
|
| @@ -4,1095 +4,611 @@
|
|
|
| #include "mojo/edk/system/message_pipe_dispatcher.h"
|
|
|
| -#include <stddef.h>
|
| -#include <stdint.h>
|
| -
|
| #include <limits>
|
| -#include <utility>
|
| -#include <vector>
|
|
|
| -#include "base/bind.h"
|
| -#include "base/debug/stack_trace.h"
|
| -#include "base/logging.h"
|
| -#include "base/message_loop/message_loop.h"
|
| +#include "base/macros.h"
|
| +#include "base/memory/ref_counted.h"
|
| +#include "base/memory/scoped_ptr.h"
|
| #include "mojo/edk/embedder/embedder_internal.h"
|
| -#include "mojo/edk/embedder/platform_handle_utils.h"
|
| -#include "mojo/edk/embedder/platform_shared_buffer.h"
|
| -#include "mojo/edk/embedder/platform_support.h"
|
| -#include "mojo/edk/system/broker.h"
|
| -#include "mojo/edk/system/configuration.h"
|
| -#include "mojo/edk/system/message_in_transit.h"
|
| -#include "mojo/edk/system/options_validation.h"
|
| -#include "mojo/edk/system/transport_data.h"
|
| +#include "mojo/edk/system/core.h"
|
| +#include "mojo/edk/system/node_controller.h"
|
| +#include "mojo/edk/system/ports_message.h"
|
| +#include "mojo/public/c/system/macros.h"
|
|
|
| namespace mojo {
|
| namespace edk {
|
|
|
| namespace {
|
|
|
| -const uint32_t kInvalidMessagePipeHandleIndex = static_cast<uint32_t>(-1);
|
| -
|
| -struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
|
| - MOJO_ALIGNAS(1) bool transferable;
|
| - MOJO_ALIGNAS(1) bool write_error;
|
| - MOJO_ALIGNAS(8) uint64_t pipe_id; // If transferable is false.
|
| - // The following members are only set if transferable is true.
|
| - // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
|
| - // was closed.
|
| - MOJO_ALIGNAS(4) uint32_t platform_handle_index;
|
| -
|
| - MOJO_ALIGNAS(4) uint32_t
|
| - shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
|
| - MOJO_ALIGNAS(4) uint32_t shared_memory_size;
|
| -
|
| - MOJO_ALIGNAS(4) uint32_t serialized_read_buffer_size;
|
| - MOJO_ALIGNAS(4) uint32_t serialized_write_buffer_size;
|
| - MOJO_ALIGNAS(4) uint32_t serialized_message_queue_size;
|
| -
|
| - // These are the FDs required as part of serializing channel_ and
|
| - // message_queue_. This is only used on POSIX.
|
| - MOJO_ALIGNAS(4)
|
| - uint32_t serialized_fds_index; // (Or |kInvalidMessagePipeHandleIndex|.)
|
| - MOJO_ALIGNAS(4) uint32_t serialized_read_fds_length;
|
| - MOJO_ALIGNAS(4) uint32_t serialized_write_fds_length;
|
| - MOJO_ALIGNAS(4) uint32_t serialized_message_fds_length;
|
| +// Header attached to every message sent over a message pipe.
|
| +struct MOJO_ALIGNAS(8) MessageHeader {
|
| + // The number of serialized dispatchers included in this header.
|
| + uint32_t num_dispatchers;
|
| +
|
| + // Total size of the header, including serialized dispatcher data.
|
| + uint32_t header_size;
|
| };
|
|
|
| -char* SerializeBuffer(char* start, std::vector<char>* buffer) {
|
| - if (buffer->size())
|
| - memcpy(start, &(*buffer)[0], buffer->size());
|
| - return start + buffer->size();
|
| -}
|
| +// Header for each dispatcher, immediately following the message header.
|
| +struct MOJO_ALIGNAS(8) DispatcherHeader {
|
| + // The type of the dispatcher, correpsonding to the Dispatcher::Type enum.
|
| + int32_t type;
|
|
|
| -bool GetHandle(size_t index,
|
| - PlatformHandleVector* platform_handles,
|
| - ScopedPlatformHandle* handle) {
|
| - if (index == kInvalidMessagePipeHandleIndex)
|
| - return true;
|
| + // The size of the serialized dispatcher, not including this header.
|
| + uint32_t num_bytes;
|
|
|
| - if (!platform_handles || index >= platform_handles->size()) {
|
| - LOG(ERROR)
|
| - << "Invalid serialized message pipe dispatcher (missing handles)";
|
| - return false;
|
| - }
|
| + // The number of ports needed to deserialize this dispatcher.
|
| + uint32_t num_ports;
|
|
|
| - // We take ownership of the handle, so we have to invalidate the one in
|
| - // |platform_handles|.
|
| - handle->reset((*platform_handles)[index]);
|
| - (*platform_handles)[index] = PlatformHandle();
|
| - return true;
|
| -}
|
| + // The number of platform handles needed to deserialize this dispatcher.
|
| + uint32_t num_platform_handles;
|
| +};
|
|
|
| -#if defined(OS_POSIX)
|
| -void ClosePlatformHandles(std::vector<int>* fds) {
|
| - for (size_t i = 0; i < fds->size(); ++i)
|
| - PlatformHandle((*fds)[i]).CloseIfNecessary();
|
| -}
|
| -#endif
|
| +struct MOJO_ALIGNAS(8) SerializedState {
|
| + uint64_t pipe_id;
|
| + int8_t endpoint;
|
| +};
|
|
|
| } // namespace
|
|
|
| -// MessagePipeDispatcher -------------------------------------------------------
|
| +// A PortObserver which forwards to a MessagePipeDispatcher. This owns a
|
| +// reference to the MPD to ensure it lives as long as the observed port.
|
| +class MessagePipeDispatcher::PortObserverThunk
|
| + : public NodeController::PortObserver {
|
| + public:
|
| + explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
|
| + : dispatcher_(dispatcher) {}
|
|
|
| -const MojoCreateMessagePipeOptions
|
| - MessagePipeDispatcher::kDefaultCreateOptions = {
|
| - static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
|
| - MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
|
| + private:
|
| + ~PortObserverThunk() override {}
|
|
|
| -MojoResult MessagePipeDispatcher::ValidateCreateOptions(
|
| - const MojoCreateMessagePipeOptions* in_options,
|
| - MojoCreateMessagePipeOptions* out_options) {
|
| - const MojoCreateMessagePipeOptionsFlags kKnownFlags =
|
| - MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE |
|
| - MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
|
| -
|
| - *out_options = kDefaultCreateOptions;
|
| - if (!in_options)
|
| - return MOJO_RESULT_OK;
|
| -
|
| - UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
|
| - if (!reader.is_valid())
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| + // NodeController::PortObserver:
|
| + void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
|
|
|
| - if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
|
| - return MOJO_RESULT_OK;
|
| - if ((reader.options().flags & ~kKnownFlags))
|
| - return MOJO_RESULT_UNIMPLEMENTED;
|
| - out_options->flags = reader.options().flags;
|
| + scoped_refptr<MessagePipeDispatcher> dispatcher_;
|
|
|
| - // Checks for fields beyond |flags|:
|
| -
|
| - // (Nothing here yet.)
|
| -
|
| - return MOJO_RESULT_OK;
|
| -}
|
| -
|
| -void MessagePipeDispatcher::Init(
|
| - ScopedPlatformHandle message_pipe,
|
| - char* serialized_read_buffer, size_t serialized_read_buffer_size,
|
| - char* serialized_write_buffer, size_t serialized_write_buffer_size,
|
| - std::vector<int>* serialized_read_fds,
|
| - std::vector<int>* serialized_write_fds) {
|
| - CHECK(transferable_);
|
| - if (message_pipe.get().is_valid()) {
|
| - channel_ = RawChannel::Create(std::move(message_pipe));
|
| -
|
| - // TODO(jam): It's probably cleaner to pass this in Init call.
|
| - channel_->SetSerializedData(
|
| - serialized_read_buffer, serialized_read_buffer_size,
|
| - serialized_write_buffer, serialized_write_buffer_size,
|
| - serialized_read_fds, serialized_write_fds);
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
|
| - }
|
| -}
|
| -
|
| -void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
|
| - CHECK(!transferable_);
|
| - pipe_id_ = pipe_id;
|
| -}
|
| -
|
| -void MessagePipeDispatcher::InitOnIO() {
|
| - base::AutoLock locker(lock());
|
| - CHECK(transferable_);
|
| - calling_init_ = true;
|
| - if (channel_)
|
| - channel_->Init(this);
|
| - calling_init_ = false;
|
| -}
|
| -
|
| -void MessagePipeDispatcher::CloseOnIOAndRelease() {
|
| - {
|
| - base::AutoLock locker(lock());
|
| - CloseOnIO();
|
| - }
|
| - Release(); // To match CloseImplNoLock.
|
| -}
|
| + DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
|
| +};
|
|
|
| -void MessagePipeDispatcher::CloseOnIO() {
|
| - lock().AssertAcquired();
|
| -
|
| - if (channel_) {
|
| - // If we closed the channel now then in-flight message pipes wouldn't get
|
| - // closed, and their other side wouldn't get a connection error notification
|
| - // which could lead to hangs or leaks. So we ask the other side of this
|
| - // message pipe to close, which ensures that we have dispatched all
|
| - // in-flight message pipes.
|
| - DCHECK(!close_requested_);
|
| - close_requested_ = true;
|
| - AddRef();
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
|
| - if (!transferable_)
|
| - message->set_route_id(pipe_id_);
|
| - channel_->WriteMessage(std::move(message));
|
| - return;
|
| - }
|
| +MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
|
| + const ports::PortRef& port,
|
| + uint64_t pipe_id,
|
| + int endpoint)
|
| + : node_controller_(node_controller),
|
| + port_(port),
|
| + pipe_id_(pipe_id),
|
| + endpoint_(endpoint) {
|
| + DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
|
| + << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
|
|
|
| - if (!transferable_ &&
|
| - (non_transferable_state_ == CONNECT_CALLED ||
|
| - non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) {
|
| - if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
|
| - RequestNontransferableChannel();
|
| -
|
| - // We can't cancel the pending request yet, since the other side of the
|
| - // message pipe would want to get pending outgoing messages (if any) or at
|
| - // least know that this end was closed. So keep this object alive until
|
| - // then.
|
| - non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
|
| - AddRef();
|
| - }
|
| + node_controller_->SetPortObserver(
|
| + port_,
|
| + make_scoped_refptr(new PortObserverThunk(this)));
|
| }
|
|
|
| Dispatcher::Type MessagePipeDispatcher::GetType() const {
|
| return Type::MESSAGE_PIPE;
|
| }
|
|
|
| -void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
|
| - DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| - base::AutoLock locker(lock());
|
| - channel_ = channel;
|
| - while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
|
| - channel_->WriteMessage(
|
| - non_transferable_outgoing_message_queue_.GetMessage());
|
| - }
|
| -
|
| - if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
|
| - non_transferable_state_ = CONNECTED;
|
| - // We kept this object alive until it's connected, we can close it now.
|
| - CloseOnIO();
|
| - // Balance the AddRef in CloseOnIO.
|
| - Release();
|
| - return;
|
| - }
|
| -
|
| - non_transferable_state_ = CONNECTED;
|
| -}
|
| -
|
| -#if defined(OS_WIN)
|
| -// TODO(jam): this is copied from RawChannelWin till I figure out what's the
|
| -// best way we want to share this.
|
| -// Since this is used for serialization of messages read/written to a MP that
|
| -// aren't consumed by Mojo primitives yet, there could be an unbounded number of
|
| -// them when a MP is being sent. As a result, even for POSIX we will probably
|
| -// want to send the handles to the shell process and exchange them for tokens
|
| -// (since we can be sure that the shell will respond to our IPCs, compared to
|
| -// the other end where we're sending the MP to, which may not be reading...).
|
| -ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
|
| - size_t num_platform_handles,
|
| - const void* platform_handle_table) {
|
| - ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector());
|
| - rv->resize(num_platform_handles);
|
| -
|
| - const uint64_t* tokens =
|
| - static_cast<const uint64_t*>(platform_handle_table);
|
| - internal::g_broker->TokenToHandle(tokens, num_platform_handles, &rv->at(0));
|
| - return rv.Pass();
|
| +MojoResult MessagePipeDispatcher::Close() {
|
| + base::AutoLock lock(signal_lock_);
|
| + DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
|
| + << " [port=" << port_.name() << "]";
|
| + return CloseNoLock();
|
| }
|
| -#endif
|
| -
|
| -scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
|
| - const void* source,
|
| - size_t size,
|
| - PlatformHandleVector* platform_handles) {
|
| - if (size != sizeof(SerializedMessagePipeHandleDispatcher)) {
|
| - LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)";
|
| - return nullptr;
|
| - }
|
| -
|
| - const SerializedMessagePipeHandleDispatcher* serialization =
|
| - static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
|
| -
|
| - scoped_refptr<MessagePipeDispatcher> rv(
|
| - new MessagePipeDispatcher(serialization->transferable));
|
| - if (!rv->transferable_) {
|
| - rv->InitNonTransferable(serialization->pipe_id);
|
| - return rv;
|
| - }
|
|
|
| - if (serialization->shared_memory_size !=
|
| - (serialization->serialized_read_buffer_size +
|
| - serialization->serialized_write_buffer_size +
|
| - serialization->serialized_message_queue_size)) {
|
| - LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)";
|
| - return nullptr;
|
| - }
|
| +MojoResult MessagePipeDispatcher::WriteMessage(
|
| + const void* bytes,
|
| + uint32_t num_bytes,
|
| + const DispatcherInTransit* dispatchers,
|
| + uint32_t num_dispatchers,
|
| + MojoWriteMessageFlags flags) {
|
|
|
| - ScopedPlatformHandle platform_handle, shared_memory_handle;
|
| - if (!GetHandle(serialization->platform_handle_index,
|
| - platform_handles, &platform_handle) ||
|
| - !GetHandle(serialization->shared_memory_handle_index,
|
| - platform_handles, &shared_memory_handle)) {
|
| - return nullptr;
|
| - }
|
| + {
|
| + base::AutoLock lock(signal_lock_);
|
| + if (port_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + }
|
| +
|
| + // A structure for retaining information about every Dispatcher we're about
|
| + // to send. This information is collected by calling StartSerialize() on
|
| + // each dispatcher in sequence.
|
| + struct DispatcherInfo {
|
| + uint32_t num_bytes;
|
| + uint32_t num_ports;
|
| + uint32_t num_handles;
|
| + };
|
| +
|
| + // This is only the base header size. It will grow as we accumulate the
|
| + // size of serialized state for each dispatcher.
|
| + size_t header_size = sizeof(MessageHeader) +
|
| + num_dispatchers * sizeof(DispatcherHeader);
|
| +
|
| + size_t num_ports = 0;
|
| + size_t num_handles = 0;
|
| +
|
| + std::vector<DispatcherInfo> dispatcher_info(num_dispatchers);
|
| + for (size_t i = 0; i < num_dispatchers; ++i) {
|
| + Dispatcher* d = dispatchers[i].dispatcher.get();
|
| + d->StartSerialize(&dispatcher_info[i].num_bytes,
|
| + &dispatcher_info[i].num_ports,
|
| + &dispatcher_info[i].num_handles);
|
| + header_size += dispatcher_info[i].num_bytes;
|
| + num_ports += dispatcher_info[i].num_ports;
|
| + num_handles += dispatcher_info[i].num_handles;
|
| + }
|
| +
|
| + // We now have enough information to fully allocate the message storage.
|
| + scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage(
|
| + header_size + num_bytes, num_ports, num_handles);
|
| + DCHECK(message);
|
| +
|
| + // Populate the message header with information about serialized dispatchers.
|
| + //
|
| + // The front of the message is always a MessageHeader followed by a
|
| + // DispatcherHeader for each dispatcher to be sent.
|
| + MessageHeader* header =
|
| + static_cast<MessageHeader*>(message->mutable_payload_bytes());
|
| + DispatcherHeader* dispatcher_headers =
|
| + reinterpret_cast<DispatcherHeader*>(header + 1);
|
| +
|
| + // Serialized dispatcher state immediately follows the series of
|
| + // DispatcherHeaders.
|
| + char* dispatcher_data =
|
| + reinterpret_cast<char*>(dispatcher_headers + num_dispatchers);
|
| +
|
| + header->num_dispatchers = num_dispatchers;
|
| +
|
| + // |header_size| is the total number of bytes preceding the message payload,
|
| + // including all dispatcher headers and serialized dispatcher state.
|
| + DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max());
|
| + header->header_size = static_cast<uint32_t>(header_size);
|
| +
|
| + bool cancel_transit = false;
|
| + if (num_dispatchers > 0) {
|
| + ScopedPlatformHandleVectorPtr handles(
|
| + new PlatformHandleVector(num_handles));
|
| + size_t port_index = 0;
|
| + size_t handle_index = 0;
|
| + for (size_t i = 0; i < num_dispatchers; ++i) {
|
| + Dispatcher* d = dispatchers[i].dispatcher.get();
|
| + DispatcherHeader* dh = &dispatcher_headers[i];
|
| + const DispatcherInfo& info = dispatcher_info[i];
|
| +
|
| + // Fill in the header for this dispatcher.
|
| + dh->type = static_cast<int32_t>(d->GetType());
|
| + dh->num_bytes = info.num_bytes;
|
| + dh->num_ports = info.num_ports;
|
| + dh->num_platform_handles = info.num_handles;
|
| +
|
| + // Fill in serialized state, ports, and platform handles. We'll cancel
|
| + // the send if the dispatcher implementation rejects for some reason.
|
| + if (!d->EndSerialize(static_cast<void*>(dispatcher_data),
|
| + message->mutable_ports() + port_index,
|
| + handles->data() + handle_index)) {
|
| + cancel_transit = true;
|
| + break;
|
| + }
|
|
|
| - char* serialized_read_buffer = nullptr;
|
| - size_t serialized_read_buffer_size = 0;
|
| - char* serialized_write_buffer = nullptr;
|
| - size_t serialized_write_buffer_size = 0;
|
| - char* message_queue_data = nullptr;
|
| - size_t message_queue_size = 0;
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer;
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping;
|
| - if (shared_memory_handle.is_valid()) {
|
| - shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
|
| - serialization->shared_memory_size, std::move(shared_memory_handle));
|
| - mapping = shared_buffer->Map(0, serialization->shared_memory_size);
|
| - char* buffer = static_cast<char*>(mapping->GetBase());
|
| - if (serialization->serialized_read_buffer_size) {
|
| - serialized_read_buffer = buffer;
|
| - serialized_read_buffer_size = serialization->serialized_read_buffer_size;
|
| - buffer += serialized_read_buffer_size;
|
| - }
|
| - if (serialization->serialized_write_buffer_size) {
|
| - serialized_write_buffer = buffer;
|
| - serialized_write_buffer_size =
|
| - serialization->serialized_write_buffer_size;
|
| - buffer += serialized_write_buffer_size;
|
| + dispatcher_data += info.num_bytes;
|
| + port_index += info.num_ports;
|
| + handle_index += info.num_handles;
|
| }
|
| - if (serialization->serialized_message_queue_size) {
|
| - message_queue_data = buffer;
|
| - message_queue_size = serialization->serialized_message_queue_size;
|
| - buffer += message_queue_size;
|
| - }
|
| - }
|
|
|
| - rv->write_error_ = serialization->write_error;
|
| -
|
| - std::vector<int> serialized_read_fds;
|
| - std::vector<int> serialized_write_fds;
|
| -#if defined(OS_POSIX)
|
| - std::vector<int> serialized_fds;
|
| - size_t serialized_fds_index = 0;
|
| -
|
| - size_t total_fd_count = serialization->serialized_read_fds_length +
|
| - serialization->serialized_write_fds_length +
|
| - serialization->serialized_message_fds_length;
|
| - for (size_t i = 0; i < total_fd_count; ++i) {
|
| - ScopedPlatformHandle handle;
|
| - if (!GetHandle(serialization->serialized_fds_index + i, platform_handles,
|
| - &handle)) {
|
| - ClosePlatformHandles(&serialized_fds);
|
| - return nullptr;
|
| + if (!cancel_transit) {
|
| + // Take ownership of all the handles and move them into message storage.
|
| + message->SetHandles(std::move(handles));
|
| + } else {
|
| + // Release any platform handles we've accumulated. Their dispatchers
|
| + // retain ownership when transit is canceled, so these are not actually
|
| + // leaking.
|
| + handles->clear();
|
| }
|
| - serialized_fds.push_back(handle.release().handle);
|
| }
|
|
|
| - serialized_read_fds.assign(
|
| - serialized_fds.begin(),
|
| - serialized_fds.begin() + serialization->serialized_read_fds_length);
|
| - serialized_fds_index += serialization->serialized_read_fds_length;
|
| - serialized_write_fds.assign(
|
| - serialized_fds.begin() + serialized_fds_index,
|
| - serialized_fds.begin() + serialized_fds_index +
|
| - serialization->serialized_write_fds_length);
|
| - serialized_fds_index += serialization->serialized_write_fds_length;
|
| -#endif
|
| -
|
| - while (message_queue_size) {
|
| - size_t message_size;
|
| - if (!MessageInTransit::GetNextMessageSize(
|
| - message_queue_data, message_queue_size, &message_size)) {
|
| - NOTREACHED() << "Couldn't read message size from serialized data.";
|
| - return nullptr;
|
| - }
|
| - if (message_size > message_queue_size) {
|
| - NOTREACHED() << "Invalid serialized message size.";
|
| - return nullptr;
|
| - }
|
| - MessageInTransit::View message_view(message_size, message_queue_data);
|
| - message_queue_size -= message_size;
|
| - message_queue_data += message_size;
|
| -
|
| - // TODO(jam): Copied below from RawChannelWin. See commment above
|
| - // GetReadPlatformHandles.
|
| - ScopedPlatformHandleVectorPtr temp_platform_handles;
|
| - if (message_view.transport_data_buffer()) {
|
| - size_t num_platform_handles;
|
| - const void* platform_handle_table;
|
| - TransportData::GetPlatformHandleTable(
|
| - message_view.transport_data_buffer(), &num_platform_handles,
|
| - &platform_handle_table);
|
| -
|
| - if (num_platform_handles > 0) {
|
| -#if defined(OS_WIN)
|
| - temp_platform_handles =
|
| - GetReadPlatformHandles(num_platform_handles,
|
| - platform_handle_table).Pass();
|
| - if (!temp_platform_handles) {
|
| - LOG(ERROR) << "Invalid number of platform handles received";
|
| - return nullptr;
|
| - }
|
| -#else
|
| - temp_platform_handles.reset(new PlatformHandleVector());
|
| - for (size_t i = 0; i < num_platform_handles; ++i)
|
| - temp_platform_handles->push_back(
|
| - PlatformHandle(serialized_fds[serialized_fds_index++]));
|
| -#endif
|
| + MojoResult result = MOJO_RESULT_OK;
|
| + if (!cancel_transit) {
|
| + // Copy the message body.
|
| + void* message_body = static_cast<void*>(
|
| + static_cast<char*>(message->mutable_payload_bytes()) + header_size);
|
| + memcpy(message_body, bytes, num_bytes);
|
| +
|
| + int rv = node_controller_->SendMessage(port_, &message);
|
| +
|
| + DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
|
| + << " [port=" << port_.name() << "; rv=" << rv
|
| + << "; num_bytes=" << num_bytes << "]";
|
| +
|
| + if (rv != ports::OK) {
|
| + if (rv == ports::ERROR_PORT_UNKNOWN ||
|
| + rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
|
| + rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
|
| + result = MOJO_RESULT_INVALID_ARGUMENT;
|
| + } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
|
| + base::AutoLock lock(signal_lock_);
|
| + awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + result = MOJO_RESULT_FAILED_PRECONDITION;
|
| + } else {
|
| + NOTREACHED();
|
| + result = MOJO_RESULT_UNKNOWN;
|
| }
|
| - }
|
| -
|
| - // TODO(jam): Copied below from RawChannelWin. See commment above
|
| - // GetReadPlatformHandles.
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
|
| - if (message_view.transport_data_buffer_size() > 0) {
|
| - DCHECK(message_view.transport_data_buffer());
|
| - message->SetDispatchers(TransportData::DeserializeDispatchers(
|
| - message_view.transport_data_buffer(),
|
| - message_view.transport_data_buffer_size(),
|
| - std::move(temp_platform_handles)));
|
| - }
|
| -
|
| - rv->message_queue_.AddMessage(std::move(message));
|
| - }
|
| -
|
| - rv->Init(std::move(platform_handle), serialized_read_buffer,
|
| - serialized_read_buffer_size, serialized_write_buffer,
|
| - serialized_write_buffer_size, &serialized_read_fds,
|
| - &serialized_write_fds);
|
| -
|
| - if (message_queue_size) { // Should be empty by now.
|
| - LOG(ERROR) << "Invalid queued messages";
|
| - return nullptr;
|
| - }
|
| -
|
| - return rv;
|
| -}
|
| -
|
| -MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
|
| - : channel_(nullptr),
|
| - serialized_read_fds_length_(0u),
|
| - serialized_write_fds_length_(0u),
|
| - serialized_message_fds_length_(0u),
|
| - pipe_id_(0),
|
| - non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
|
| - serialized_(false),
|
| - calling_init_(false),
|
| - write_error_(false),
|
| - transferable_(transferable),
|
| - close_requested_(false) {
|
| -}
|
| -
|
| -MessagePipeDispatcher::~MessagePipeDispatcher() {
|
| - // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
|
| - // exception is if they posted a task to run CloseOnIO but the IO thread shut
|
| - // down and so when it was deleting pending tasks it caused the last reference
|
| - // to destruct this object. In that case, safe to destroy the channel.
|
| - if (channel_ &&
|
| - internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) {
|
| - if (transferable_) {
|
| - channel_->Shutdown();
|
| + cancel_transit = true;
|
| } else {
|
| - internal::g_broker->CloseMessagePipe(pipe_id_, this);
|
| + DCHECK(!message);
|
| }
|
| - } else {
|
| - DCHECK(!channel_);
|
| - }
|
| -#if defined(OS_POSIX)
|
| - ClosePlatformHandles(&serialized_fds_);
|
| -#endif
|
| -}
|
| -
|
| -void MessagePipeDispatcher::CancelAllAwakablesNoLock() {
|
| - lock().AssertAcquired();
|
| - awakable_list_.CancelAll();
|
| -}
|
| -
|
| -void MessagePipeDispatcher::CloseImplNoLock() {
|
| - lock().AssertAcquired();
|
| - // This early circuit fixes leak in unit tests. There's nothing to do in the
|
| - // posted task.
|
| - if (!transferable_ && non_transferable_state_ == CLOSED)
|
| - return;
|
| -
|
| - // We take a manual refcount because at shutdown, the task below might not get
|
| - // a chance to execute. If that happens, the RawChannel will still call our
|
| - // OnError method because it always runs (since it watches thread
|
| - // destruction). So to avoid UAF, manually add a reference and only release it
|
| - // if the task runs.
|
| - AddRef();
|
| - if (!internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this))) {
|
| - // Avoid a shutdown leak in unittests. If the thread is shutting down,
|
| - // we can't connect to the other end to let it know that we're closed either
|
| - // way.
|
| - if (!transferable_ && non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
|
| - Release();
|
| }
|
| -}
|
|
|
| -void MessagePipeDispatcher::SerializeInternal() {
|
| - serialized_ = true;
|
| - if (!transferable_) {
|
| - CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
|
| - << "Non transferable message pipe being sent after read/write/waited. "
|
| - << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
|
| - << "the pipe can be sent after it's read or written. This message pipe "
|
| - << "was previously bound at:\n"
|
| - << non_transferable_bound_stack_->ToString();
|
| -
|
| - non_transferable_state_ = SERIALISED;
|
| - return;
|
| + if (cancel_transit) {
|
| + // We ended up not sending the message. Release all the platform handles.
|
| + // Their dipatchers retain ownership when transit is canceled, so these are
|
| + // not actually leaking.
|
| + DCHECK(message);
|
| + Channel::MessagePtr m = message->TakeChannelMessage();
|
| + ScopedPlatformHandleVectorPtr handles = m->TakeHandles();
|
| + if (handles)
|
| + handles->clear();
|
| }
|
|
|
| - // We need to stop watching handle immediately, even though not on IO thread,
|
| - // so that other messages aren't read after this.
|
| - std::vector<int> serialized_read_fds, serialized_write_fds;
|
| - if (channel_) {
|
| - bool write_error = false;
|
| -
|
| - serialized_platform_handle_ = channel_->ReleaseHandle(
|
| - &serialized_read_buffer_, &serialized_write_buffer_,
|
| - &serialized_read_fds, &serialized_write_fds, &write_error);
|
| - serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(),
|
| - serialized_read_fds.end());
|
| - serialized_read_fds_length_ = serialized_read_fds.size();
|
| - serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(),
|
| - serialized_write_fds.end());
|
| - serialized_write_fds_length_ = serialized_write_fds.size();
|
| - channel_ = nullptr;
|
| - } else {
|
| - // It's valid that the other side wrote some data and closed its end.
|
| - }
|
| + return result;
|
| +}
|
|
|
| - DCHECK(serialized_message_queue_.empty());
|
| - while (!message_queue_.IsEmpty()) {
|
| - scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
|
| -
|
| - // When MojoWriteMessage is called, the MessageInTransit doesn't have
|
| - // dispatchers set and CreateEquivaent... is called since the dispatchers
|
| - // can be referenced by others. here dispatchers aren't referenced by
|
| - // others, but rawchannel can still call to them. so since we dont call
|
| - // createequiv, manually call TransportStarted and TransportEnd.
|
| - DispatcherVector dispatchers;
|
| - if (message->has_dispatchers())
|
| - dispatchers = *message->dispatchers();
|
| - for (size_t i = 0; i < dispatchers.size(); ++i)
|
| - dispatchers[i]->TransportStarted();
|
| -
|
| - // TODO(jam): this handling for dispatchers only works on windows where we
|
| - // send transportdata as bytes instead of as parameters to sendmsg.
|
| - message->SerializeAndCloseDispatchers();
|
| - // cont'd below
|
| -
|
| - size_t main_buffer_size = message->main_buffer_size();
|
| - size_t transport_data_buffer_size = message->transport_data() ?
|
| - message->transport_data()->buffer_size() : 0;
|
| -
|
| - serialized_message_queue_.insert(
|
| - serialized_message_queue_.end(),
|
| - static_cast<const char*>(message->main_buffer()),
|
| - static_cast<const char*>(message->main_buffer()) + main_buffer_size);
|
| -
|
| - // cont'd
|
| - if (transport_data_buffer_size != 0) {
|
| - // TODO(jam): copied from RawChannelWin::WriteNoLock(
|
| - PlatformHandleVector* all_platform_handles =
|
| - message->transport_data()->platform_handles();
|
| - if (all_platform_handles) {
|
| -#if defined(OS_WIN)
|
| - uint64_t* tokens = reinterpret_cast<uint64_t*>(
|
| - static_cast<char*>(message->transport_data()->buffer()) +
|
| - message->transport_data()->platform_handle_table_offset());
|
| - internal::g_broker->HandleToToken(
|
| - &all_platform_handles->at(0), all_platform_handles->size(), tokens);
|
| - for (size_t i = 0; i < all_platform_handles->size(); i++)
|
| - all_platform_handles->at(i) = PlatformHandle();
|
| -#else
|
| - for (size_t i = 0; i < all_platform_handles->size(); i++) {
|
| - serialized_fds_.push_back(all_platform_handles->at(i).handle);
|
| - serialized_message_fds_length_++;
|
| - all_platform_handles->at(i) = PlatformHandle();
|
| +MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| + uint32_t* num_bytes,
|
| + MojoHandle* handles,
|
| + uint32_t* num_handles,
|
| + MojoReadMessageFlags flags) {
|
| + {
|
| + base::AutoLock lock(signal_lock_);
|
| + // We can't read from a port that's closed or in transit!
|
| + if (port_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + }
|
| +
|
| + bool no_space = false;
|
| + bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
|
| +
|
| + // Ensure the provided buffers are large enough to hold the next message.
|
| + // GetMessageIf provides an atomic way to test the next message without
|
| + // committing to removing it from the port's underlying message queue until
|
| + // we are sure we can consume it.
|
| +
|
| + ports::ScopedMessage ports_message;
|
| + int rv = node_controller_->node()->GetMessageIf(
|
| + port_,
|
| + [num_bytes, num_handles, &no_space, &may_discard](
|
| + const ports::Message& next_message) {
|
| + const PortsMessage& message =
|
| + static_cast<const PortsMessage&>(next_message);
|
| + DCHECK_GE(message.num_payload_bytes(), sizeof(MessageHeader));
|
| +
|
| + const MessageHeader* header =
|
| + static_cast<const MessageHeader*>(message.payload_bytes());
|
| + DCHECK_LE(header->header_size, message.num_payload_bytes());
|
| +
|
| + uint32_t bytes_to_read = 0;
|
| + uint32_t bytes_available =
|
| + static_cast<uint32_t>(message.num_payload_bytes()) -
|
| + header->header_size;
|
| + if (num_bytes) {
|
| + bytes_to_read = std::min(*num_bytes, bytes_available);
|
| + *num_bytes = bytes_available;
|
| }
|
| -#endif
|
| - }
|
| -
|
| - serialized_message_queue_.insert(
|
| - serialized_message_queue_.end(),
|
| - static_cast<const char*>(message->transport_data()->buffer()),
|
| - static_cast<const char*>(message->transport_data()->buffer()) +
|
| - transport_data_buffer_size);
|
| - }
|
|
|
| - for (size_t i = 0; i < dispatchers.size(); ++i)
|
| - dispatchers[i]->TransportEnded();
|
| - }
|
| -}
|
| + uint32_t handles_to_read = 0;
|
| + uint32_t handles_available = header->num_dispatchers;
|
| + if (num_handles) {
|
| + handles_to_read = std::min(*num_handles, handles_available);
|
| + *num_handles = handles_available;
|
| + }
|
|
|
| -scoped_refptr<Dispatcher>
|
| -MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
|
| - lock().AssertAcquired();
|
| -
|
| - SerializeInternal();
|
| -
|
| - scoped_refptr<MessagePipeDispatcher> rv(
|
| - new MessagePipeDispatcher(transferable_));
|
| - rv->serialized_ = true;
|
| - if (transferable_) {
|
| - rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
|
| - serialized_message_queue_.swap(rv->serialized_message_queue_);
|
| - serialized_read_buffer_.swap(rv->serialized_read_buffer_);
|
| - serialized_write_buffer_.swap(rv->serialized_write_buffer_);
|
| - serialized_fds_.swap(rv->serialized_fds_);
|
| - rv->serialized_read_fds_length_ = serialized_read_fds_length_;
|
| - rv->serialized_write_fds_length_ = serialized_write_fds_length_;
|
| - rv->serialized_message_fds_length_ = serialized_message_fds_length_;
|
| - rv->write_error_ = write_error_;
|
| - } else {
|
| - rv->pipe_id_ = pipe_id_;
|
| - rv->non_transferable_state_ = non_transferable_state_;
|
| - }
|
| - return rv;
|
| -}
|
| + if (bytes_to_read < bytes_available ||
|
| + handles_to_read < handles_available) {
|
| + no_space = true;
|
| + return may_discard;
|
| + }
|
|
|
| -MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
|
| - const void* bytes,
|
| - uint32_t num_bytes,
|
| - std::vector<DispatcherTransport>* transports,
|
| - MojoWriteMessageFlags flags) {
|
| - lock().AssertAcquired();
|
| + return true;
|
| + },
|
| + &ports_message);
|
|
|
| - DCHECK(!transports ||
|
| - (transports->size() > 0 &&
|
| - transports->size() <= GetConfiguration().max_message_num_handles));
|
| + if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
|
| + if (rv == ports::ERROR_PORT_UNKNOWN ||
|
| + rv == ports::ERROR_PORT_STATE_UNEXPECTED)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - if (write_error_ ||
|
| - (transferable_ && !channel_) ||
|
| - (!transferable_ && non_transferable_state_ == CLOSED)) {
|
| - return MOJO_RESULT_FAILED_PRECONDITION;
|
| + NOTREACHED();
|
| + return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here?
|
| }
|
|
|
| - if (num_bytes > GetConfiguration().max_message_num_bytes)
|
| + if (no_space) {
|
| + // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this
|
| + // message's data. The message will still be in queue unless
|
| + // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
|
| return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::Type::MESSAGE, num_bytes, bytes));
|
| - if (transports) {
|
| - MojoResult result = AttachTransportsNoLock(message.get(), transports);
|
| - if (result != MOJO_RESULT_OK)
|
| - return result;
|
| - }
|
| -
|
| - message->SerializeAndCloseDispatchers();
|
| - if (!transferable_)
|
| - message->set_route_id(pipe_id_);
|
| - if (!transferable_ &&
|
| - (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
|
| - non_transferable_state_ == CONNECT_CALLED)) {
|
| - if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
|
| - RequestNontransferableChannel();
|
| - non_transferable_outgoing_message_queue_.AddMessage(std::move(message));
|
| - } else {
|
| - channel_->WriteMessage(std::move(message));
|
| }
|
|
|
| - return MOJO_RESULT_OK;
|
| -}
|
| + if (!ports_message) {
|
| + // No message was available in queue.
|
|
|
| -MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
|
| - void* bytes,
|
| - uint32_t* num_bytes,
|
| - DispatcherVector* dispatchers,
|
| - uint32_t* num_dispatchers,
|
| - MojoReadMessageFlags flags) {
|
| - lock().AssertAcquired();
|
| - if (transferable_ && channel_) {
|
| - channel_->EnsureLazyInitialized();
|
| - } else if (!transferable_) {
|
| - if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
|
| - RequestNontransferableChannel();
|
| - return MOJO_RESULT_SHOULD_WAIT;
|
| - } else if (non_transferable_state_ == CONNECT_CALLED) {
|
| + if (rv == ports::OK)
|
| return MOJO_RESULT_SHOULD_WAIT;
|
| - }
|
| - }
|
|
|
| - DCHECK(!dispatchers || dispatchers->empty());
|
| -
|
| - const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
|
| - const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
|
| -
|
| - if (message_queue_.IsEmpty())
|
| - return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION;
|
| -
|
| - // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
|
| - // and release the lock immediately.
|
| - bool enough_space = true;
|
| - MessageInTransit* message = message_queue_.PeekMessage();
|
| - if (num_bytes)
|
| - *num_bytes = message->num_bytes();
|
| - if (message->num_bytes() <= max_bytes)
|
| - memcpy(bytes, message->bytes(), message->num_bytes());
|
| - else
|
| - enough_space = false;
|
| -
|
| - if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
|
| - if (num_dispatchers)
|
| - *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
|
| - if (enough_space) {
|
| - if (queued_dispatchers->empty()) {
|
| - // Nothing to do.
|
| - } else if (queued_dispatchers->size() <= max_num_dispatchers) {
|
| - DCHECK(dispatchers);
|
| - dispatchers->swap(*queued_dispatchers);
|
| - } else {
|
| - enough_space = false;
|
| - }
|
| - }
|
| - } else {
|
| - if (num_dispatchers)
|
| - *num_dispatchers = 0;
|
| + // Peer is closed and there are no more messages to read.
|
| + DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
|
| + base::AutoLock lock(signal_lock_);
|
| + awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| }
|
|
|
| - message = nullptr;
|
| -
|
| - if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
|
| - message_queue_.DiscardMessage();
|
| -
|
| - // Now it's empty, thus no longer readable.
|
| - if (message_queue_.IsEmpty()) {
|
| - // It's currently not possible to wait for non-readability, but we should
|
| - // do the state change anyway.
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| + // Alright! We have a message and the caller has provided sufficient storage
|
| + // in which to receive it.
|
| +
|
| + scoped_ptr<PortsMessage> message(
|
| + static_cast<PortsMessage*>(ports_message.release()));
|
| +
|
| + const MessageHeader* header =
|
| + static_cast<const MessageHeader*>(message->payload_bytes());
|
| + const DispatcherHeader* dispatcher_headers =
|
| + reinterpret_cast<const DispatcherHeader*>(header + 1);
|
| +
|
| + const char* dispatcher_data = reinterpret_cast<const char*>(
|
| + dispatcher_headers + header->num_dispatchers);
|
| +
|
| + // Deserialize dispatchers.
|
| + if (header->num_dispatchers > 0) {
|
| + CHECK(handles);
|
| + std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers);
|
| + size_t data_payload_index = sizeof(MessageHeader) +
|
| + header->num_dispatchers * sizeof(DispatcherHeader);
|
| + size_t port_index = 0;
|
| + size_t platform_handle_index = 0;
|
| + for (size_t i = 0; i < header->num_dispatchers; ++i) {
|
| + const DispatcherHeader& dh = dispatcher_headers[i];
|
| + Type type = static_cast<Type>(dh.type);
|
| +
|
| + DCHECK_GE(message->num_payload_bytes(),
|
| + data_payload_index + dh.num_bytes);
|
| + DCHECK_GE(message->num_ports(),
|
| + port_index + dh.num_ports);
|
| + DCHECK_GE(message->num_handles(),
|
| + platform_handle_index + dh.num_platform_handles);
|
| +
|
| + PlatformHandle* out_handles =
|
| + message->num_handles() ? message->handles() + platform_handle_index
|
| + : nullptr;
|
| + dispatchers[i].dispatcher = Dispatcher::Deserialize(
|
| + type, dispatcher_data, dh.num_bytes, message->ports() + port_index,
|
| + dh.num_ports, out_handles, dh.num_platform_handles);
|
| + if (!dispatchers[i].dispatcher)
|
| + return MOJO_RESULT_UNKNOWN;
|
| +
|
| + dispatcher_data += dh.num_bytes;
|
| + data_payload_index += dh.num_bytes;
|
| + port_index += dh.num_ports;
|
| + platform_handle_index += dh.num_platform_handles;
|
| }
|
| +
|
| + if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers,
|
| + handles))
|
| + return MOJO_RESULT_UNKNOWN;
|
| }
|
|
|
| - if (!enough_space)
|
| - return MOJO_RESULT_RESOURCE_EXHAUSTED;
|
| + // Copy message bytes.
|
| + DCHECK_GE(message->num_payload_bytes(), header->header_size);
|
| + const char* payload = reinterpret_cast<const char*>(message->payload_bytes());
|
| + memcpy(bytes, payload + header->header_size,
|
| + message->num_payload_bytes() - header->header_size);
|
|
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| -HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
|
| - const {
|
| - lock().AssertAcquired();
|
| -
|
| - HandleSignalsState rv;
|
| - if (!message_queue_.IsEmpty())
|
| - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| - if (!message_queue_.IsEmpty() ||
|
| - (transferable_ && channel_) ||
|
| - (!transferable_ && non_transferable_state_ != CLOSED))
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| - if (!write_error_ &&
|
| - ((transferable_ && channel_) ||
|
| - (!transferable_ && non_transferable_state_ != CLOSED))) {
|
| - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| - }
|
| - if (write_error_ ||
|
| - (transferable_ && !channel_) ||
|
| - (!transferable_ &&
|
| - ((non_transferable_state_ == CLOSED) || is_closed()))) {
|
| - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| - }
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| - return rv;
|
| +HandleSignalsState
|
| +MessagePipeDispatcher::GetHandleSignalsState() const {
|
| + base::AutoLock lock(signal_lock_);
|
| + return GetHandleSignalsStateNoLock();
|
| }
|
|
|
| -MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
|
| +MojoResult MessagePipeDispatcher::AddAwakable(
|
| Awakable* awakable,
|
| MojoHandleSignals signals,
|
| uintptr_t context,
|
| HandleSignalsState* signals_state) {
|
| - lock().AssertAcquired();
|
| - if (transferable_ && channel_) {
|
| - channel_->EnsureLazyInitialized();
|
| - } else if (!transferable_ &&
|
| - non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
|
| - RequestNontransferableChannel();
|
| + base::AutoLock lock(signal_lock_);
|
| +
|
| + if (port_closed_ || in_transit_) {
|
| + if (signals_state)
|
| + *signals_state = HandleSignalsState();
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - HandleSignalsState state = GetHandleSignalsStateImplNoLock();
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| +
|
| + DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint "
|
| + << endpoint_ << " [awakable=" << awakable << "; port="
|
| + << port_.name() << "; signals=" << signals << "; satisfied="
|
| + << state.satisfied_signals << "; satisfiable="
|
| + << state.satisfiable_signals << "]";
|
| +
|
| if (state.satisfies(signals)) {
|
| if (signals_state)
|
| *signals_state = state;
|
| + DVLOG(2) << "Signals already set for " << port_.name();
|
| return MOJO_RESULT_ALREADY_EXISTS;
|
| }
|
| if (!state.can_satisfy(signals)) {
|
| if (signals_state)
|
| *signals_state = state;
|
| + DVLOG(2) << "Signals impossible to satisfy for " << port_.name();
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
| }
|
|
|
| - awakable_list_.Add(awakable, signals, context);
|
| + DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint "
|
| + << endpoint_ << " [awakable=" << awakable << "; port="
|
| + << port_.name() << "; signals=" << signals << "]";
|
| +
|
| + awakables_.Add(awakable, signals, context);
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| -void MessagePipeDispatcher::RemoveAwakableImplNoLock(
|
| - Awakable* awakable,
|
| - HandleSignalsState* signals_state) {
|
| - lock().AssertAcquired();
|
| +void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
|
| + HandleSignalsState* signals_state) {
|
| + base::AutoLock lock(signal_lock_);
|
| + if (port_closed_ || in_transit_) {
|
| + if (signals_state)
|
| + *signals_state = HandleSignalsState();
|
| + } else if (signals_state) {
|
| + *signals_state = GetHandleSignalsStateNoLock();
|
| + }
|
| +
|
| + DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint "
|
| + << endpoint_ << " [awakable=" << awakable << "; port="
|
| + << port_.name() << "]";
|
|
|
| - awakable_list_.Remove(awakable);
|
| - if (signals_state)
|
| - *signals_state = GetHandleSignalsStateImplNoLock();
|
| + awakables_.Remove(awakable);
|
| }
|
|
|
| -void MessagePipeDispatcher::StartSerializeImplNoLock(
|
| - size_t* max_size,
|
| - size_t* max_platform_handles) {
|
| - if (!serialized_)
|
| - SerializeInternal();
|
| -
|
| - *max_platform_handles = 0;
|
| - if (serialized_platform_handle_.is_valid())
|
| - (*max_platform_handles)++;
|
| - if (!serialized_read_buffer_.empty() ||
|
| - !serialized_write_buffer_.empty() ||
|
| - !serialized_message_queue_.empty())
|
| - (*max_platform_handles)++;
|
| - *max_platform_handles += serialized_fds_.size();
|
| - *max_size = sizeof(SerializedMessagePipeHandleDispatcher);
|
| +void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
|
| + uint32_t* num_ports,
|
| + uint32_t* num_handles) {
|
| + *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
|
| + *num_ports = 1;
|
| + *num_handles = 0;
|
| }
|
|
|
| -bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
|
| - void* destination,
|
| - size_t* actual_size,
|
| - PlatformHandleVector* platform_handles) {
|
| - CloseImplNoLock();
|
| - SerializedMessagePipeHandleDispatcher* serialization =
|
| - static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
|
| - serialization->transferable = transferable_;
|
| - serialization->pipe_id = pipe_id_;
|
| - if (serialized_platform_handle_.is_valid()) {
|
| - DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
|
| - serialization->platform_handle_index =
|
| - static_cast<uint32_t>(platform_handles->size());
|
| - platform_handles->push_back(serialized_platform_handle_.release());
|
| - } else {
|
| - serialization->platform_handle_index = kInvalidMessagePipeHandleIndex;
|
| - }
|
| +bool MessagePipeDispatcher::EndSerialize(void* destination,
|
| + ports::PortName* ports,
|
| + PlatformHandle* handles) {
|
| + SerializedState* state = static_cast<SerializedState*>(destination);
|
| + state->pipe_id = pipe_id_;
|
| + state->endpoint = static_cast<int8_t>(endpoint_);
|
| + ports[0] = port_.name();
|
| + return true;
|
| +}
|
|
|
| - serialization->write_error = write_error_;
|
| - DCHECK(serialized_read_buffer_.size() < std::numeric_limits<uint32_t>::max());
|
| - serialization->serialized_read_buffer_size =
|
| - static_cast<uint32_t>(serialized_read_buffer_.size());
|
| - DCHECK(serialized_write_buffer_.size() <
|
| - std::numeric_limits<uint32_t>::max());
|
| - serialization->serialized_write_buffer_size =
|
| - static_cast<uint32_t>(serialized_write_buffer_.size());
|
| - DCHECK(serialized_message_queue_.size() <
|
| - std::numeric_limits<uint32_t>::max());
|
| - serialization->serialized_message_queue_size =
|
| - static_cast<uint32_t>(serialized_message_queue_.size());
|
| -
|
| - serialization->shared_memory_size = static_cast<uint32_t>(
|
| - serialization->serialized_read_buffer_size +
|
| - serialization->serialized_write_buffer_size +
|
| - serialization->serialized_message_queue_size);
|
| - if (serialization->shared_memory_size) {
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer(
|
| - internal::g_platform_support->CreateSharedBuffer(
|
| - serialization->shared_memory_size));
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping(
|
| - shared_buffer->Map(0, serialization->shared_memory_size));
|
| - char* start = static_cast<char*>(mapping->GetBase());
|
| - start = SerializeBuffer(start, &serialized_read_buffer_);
|
| - start = SerializeBuffer(start, &serialized_write_buffer_);
|
| - start = SerializeBuffer(start, &serialized_message_queue_);
|
| -
|
| - DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
|
| - serialization->shared_memory_handle_index =
|
| - static_cast<uint32_t>(platform_handles->size());
|
| - platform_handles->push_back(shared_buffer->PassPlatformHandle().release());
|
| - } else {
|
| - serialization->shared_memory_handle_index = kInvalidMessagePipeHandleIndex;
|
| - }
|
| +bool MessagePipeDispatcher::BeginTransit() {
|
| + base::AutoLock lock(signal_lock_);
|
| + if (in_transit_)
|
| + return false;
|
| + in_transit_ = true;
|
| + return in_transit_;
|
| +}
|
|
|
| - DCHECK(serialized_read_fds_length_ < std::numeric_limits<uint32_t>::max());
|
| - serialization->serialized_read_fds_length =
|
| - static_cast<uint32_t>(serialized_read_fds_length_);
|
| - DCHECK(serialized_write_fds_length_ < std::numeric_limits<uint32_t>::max());
|
| - serialization->serialized_write_fds_length =
|
| - static_cast<uint32_t>(serialized_write_fds_length_);
|
| - DCHECK(serialized_message_fds_length_ < std::numeric_limits<uint32_t>::max());
|
| - serialization->serialized_message_fds_length =
|
| - static_cast<uint32_t>(serialized_message_fds_length_);
|
| - if (serialized_fds_.empty()) {
|
| - serialization->serialized_fds_index = kInvalidMessagePipeHandleIndex;
|
| - } else {
|
| -#if defined(OS_POSIX)
|
| - DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
|
| - serialization->serialized_fds_index =
|
| - static_cast<uint32_t>(platform_handles->size());
|
| - for (size_t i = 0; i < serialized_fds_.size(); ++i)
|
| - platform_handles->push_back(PlatformHandle(serialized_fds_[i]));
|
| - serialized_fds_.clear();
|
| -#endif
|
| - }
|
| +void MessagePipeDispatcher::CompleteTransitAndClose() {
|
| + node_controller_->SetPortObserver(port_, nullptr);
|
|
|
| - *actual_size = sizeof(SerializedMessagePipeHandleDispatcher);
|
| - return true;
|
| + base::AutoLock lock(signal_lock_);
|
| + in_transit_ = false;
|
| + port_transferred_ = true;
|
| + CloseNoLock();
|
| }
|
|
|
| -void MessagePipeDispatcher::TransportStarted() {
|
| - started_transport_.Acquire();
|
| +void MessagePipeDispatcher::CancelTransit() {
|
| + base::AutoLock lock(signal_lock_);
|
| + in_transit_ = false;
|
| +
|
| + // Something may have happened while we were waiting for potential transit.
|
| + awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| }
|
|
|
| -void MessagePipeDispatcher::TransportEnded() {
|
| - started_transport_.Release();
|
| +// static
|
| +scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
|
| + const void* data,
|
| + size_t num_bytes,
|
| + const ports::PortName* ports,
|
| + size_t num_ports,
|
| + PlatformHandle* handles,
|
| + size_t num_handles) {
|
| + if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
|
| + return nullptr;
|
| +
|
| + const SerializedState* state = static_cast<const SerializedState*>(data);
|
|
|
| - base::AutoLock locker(lock());
|
| + ports::PortRef port;
|
| + CHECK_EQ(
|
| + ports::OK,
|
| + internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port));
|
|
|
| - // If transporting of MPD failed, we might have got more data and didn't
|
| - // awake for.
|
| - // TODO(jam): should we care about only alerting if it was empty before
|
| - // TransportStarted?
|
| - if (!message_queue_.IsEmpty())
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| + return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
|
| + state->pipe_id, state->endpoint);
|
| }
|
|
|
| -void MessagePipeDispatcher::OnReadMessage(
|
| - const MessageInTransit::View& message_view,
|
| - ScopedPlatformHandleVectorPtr platform_handles) {
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
|
| - if (message_view.transport_data_buffer_size() > 0) {
|
| - DCHECK(message_view.transport_data_buffer());
|
| - message->SetDispatchers(TransportData::DeserializeDispatchers(
|
| - message_view.transport_data_buffer(),
|
| - message_view.transport_data_buffer_size(),
|
| - std::move(platform_handles)));
|
| - }
|
| -
|
| - bool call_release = false;
|
| - if (started_transport_.Try()) {
|
| - // We're not in the middle of being sent.
|
| +MessagePipeDispatcher::~MessagePipeDispatcher() {
|
| + DCHECK(port_closed_ && !in_transit_);
|
| +}
|
|
|
| - // Can get synchronously called back in Init if there was initial data.
|
| - scoped_ptr<base::AutoLock> locker;
|
| - if (!calling_init_)
|
| - locker.reset(new base::AutoLock(lock()));
|
| +MojoResult MessagePipeDispatcher::CloseNoLock() {
|
| + signal_lock_.AssertAcquired();
|
| + if (port_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
|
| - if (transferable_) {
|
| - channel_->Shutdown();
|
| - } else {
|
| - internal::g_broker->CloseMessagePipe(pipe_id_, this);
|
| - non_transferable_state_ = CLOSED;
|
| - }
|
| - channel_ = nullptr;
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| - if (close_requested_) {
|
| - // We requested the other side to close the connection while they also
|
| - // did the same. We must balance out the AddRef in CloseOnIO to ensure
|
| - // this object isn't leaked.
|
| - call_release = true;
|
| - }
|
| - } else {
|
| - bool was_empty = message_queue_.IsEmpty();
|
| - message_queue_.AddMessage(std::move(message));
|
| - if (was_empty)
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| - }
|
| + port_closed_ = true;
|
| + awakables_.CancelAll();
|
|
|
| - started_transport_.Release();
|
| - } else {
|
| - if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
|
| - // We got a request to shutdown the channel but this object is already
|
| - // calling into channel to serialize it. Since all the other side cares
|
| - // about is flushing pending messages, we bounce the quit back to it.
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
|
| - channel_->WriteMessage(std::move(message));
|
| - } else {
|
| - // If RawChannel is calling OnRead, that means it has its read_lock_
|
| - // acquired. That means StartSerialize can't be accessing message queue as
|
| - // it waits on ReleaseHandle first which acquires read_lock_.
|
| - message_queue_.AddMessage(std::move(message));
|
| - }
|
| + if (!port_transferred_) {
|
| + base::AutoUnlock unlock(signal_lock_);
|
| + node_controller_->ClosePort(port_);
|
| }
|
|
|
| - if (call_release)
|
| - Release();
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| -void MessagePipeDispatcher::OnError(Error error) {
|
| - // If there's a read error, then the other side of the pipe is closed. By
|
| - // definition, we can't write since there's no one to read it. And we can't
|
| - // read anymore, since we just got a read erorr. So we close the pipe.
|
| - // If there's a write error, then we stop writing. But we keep the pipe open
|
| - // until we finish reading everything in it. This is because it's valid for
|
| - // one endpoint to write some data and close their pipe immediately. Even
|
| - // though the other end can't write anymore, it should still get all the data.
|
| - switch (error) {
|
| - case ERROR_READ_SHUTDOWN:
|
| - // The other side was cleanly closed, so this isn't actually an error.
|
| - DVLOG(1) << "MessagePipeDispatcher read error (shutdown)";
|
| - break;
|
| - case ERROR_READ_BROKEN:
|
| - DVLOG(1) << "MessagePipeDispatcher read error (connection broken)";
|
| - break;
|
| - case ERROR_READ_BAD_MESSAGE:
|
| - // Receiving a bad message means either a bug, data corruption, or
|
| - // malicious attack (probably due to some other bug).
|
| - LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)";
|
| - break;
|
| - case ERROR_READ_UNKNOWN:
|
| - LOG(ERROR) << "MessagePipeDispatcher read error (unknown)";
|
| - break;
|
| - case ERROR_WRITE:
|
| - // Write errors are slightly notable: they probably shouldn't happen under
|
| - // normal operation (but maybe the other side crashed).
|
| - LOG(WARNING) << "MessagePipeDispatcher write error";
|
| - DCHECK_EQ(write_error_, false) << "Should only get one write error.";
|
| - write_error_ = true;
|
| - break;
|
| +HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
|
| + HandleSignalsState rv;
|
| +
|
| + ports::PortStatus port_status;
|
| + if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
|
| + CHECK(in_transit_ || port_transferred_ || port_closed_);
|
| + return HandleSignalsState();
|
| }
|
|
|
| - bool call_release = false;
|
| - if (started_transport_.Try()) {
|
| - base::AutoLock locker(lock());
|
| - if (channel_ && error != ERROR_WRITE) {
|
| - if (transferable_) {
|
| - channel_->Shutdown();
|
| - } else {
|
| - CHECK_NE(non_transferable_state_, CLOSED);
|
| - internal::g_broker->CloseMessagePipe(pipe_id_, this);
|
| - non_transferable_state_ = CLOSED;
|
| - }
|
| - channel_ = nullptr;
|
| - if (close_requested_) {
|
| - // Balance AddRef in CloseOnIO.
|
| - call_release = true;
|
| - }
|
| - } else if (!channel_ && !transferable_ &&
|
| - non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
|
| - // Balance AddRef in CloseOnIO.
|
| - call_release = true;
|
| - }
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| - started_transport_.Release();
|
| + if (port_status.has_messages) {
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| + }
|
| + if (port_status.receiving_messages)
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| + if (!port_status.peer_closed) {
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| } else {
|
| - // We must be waiting to call ReleaseHandle. It will call Shutdown.
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| }
|
| -
|
| - if (call_release)
|
| - Release();
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + return rv;
|
| }
|
|
|
| -MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
|
| - MessageInTransit* message,
|
| - std::vector<DispatcherTransport>* transports) {
|
| - DCHECK(!message->has_dispatchers());
|
| -
|
| - // You're not allowed to send either handle to a message pipe over the message
|
| - // pipe, so check for this. (The case of trying to write a handle to itself is
|
| - // taken care of by |Core|. That case kind of makes sense, but leads to
|
| - // complications if, e.g., both sides try to do the same thing with their
|
| - // respective handles simultaneously. The other case, of trying to write the
|
| - // peer handle to a handle, doesn't make sense -- since no handle will be
|
| - // available to read the message from.)
|
| - for (size_t i = 0; i < transports->size(); i++) {
|
| - if (!(*transports)[i].is_valid())
|
| - continue;
|
| - if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
|
| - MessagePipeDispatcher* mp =
|
| - static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
|
| - if (transferable_ && mp->transferable_ &&
|
| - channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
|
| - // The other case should have been disallowed by |Core|. (Note: |port|
|
| - // is the peer port of the handle given to |WriteMessage()|.)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| - } else if (!transferable_ && !mp->transferable_ &&
|
| - pipe_id_ == mp->pipe_id_) {
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| - }
|
| - }
|
| - }
|
| +void MessagePipeDispatcher::OnPortStatusChanged() {
|
| + base::AutoLock lock(signal_lock_);
|
|
|
| - // Clone the dispatchers and attach them to the message. (This must be done as
|
| - // a separate loop, since we want to leave the dispatchers alone on failure.)
|
| - scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
|
| - dispatchers->reserve(transports->size());
|
| - for (size_t i = 0; i < transports->size(); i++) {
|
| - if ((*transports)[i].is_valid()) {
|
| - dispatchers->push_back(
|
| - (*transports)[i].CreateEquivalentDispatcherAndClose());
|
| - } else {
|
| - LOG(WARNING) << "Enqueueing null dispatcher";
|
| - dispatchers->push_back(nullptr);
|
| - }
|
| - }
|
| - message->SetDispatchers(std::move(dispatchers));
|
| - return MOJO_RESULT_OK;
|
| -}
|
| + // We stop observing our port as soon as it's transferred, but this can race
|
| + // with events which are raised right before that happens. This is fine to
|
| + // ignore.
|
| + if (port_transferred_)
|
| + return;
|
|
|
| -void MessagePipeDispatcher::RequestNontransferableChannel() {
|
| - lock().AssertAcquired();
|
| - CHECK(!transferable_);
|
| - CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
|
| - non_transferable_state_ = CONNECT_CALLED;
|
| -#if !defined(OFFICIAL_BUILD)
|
| - non_transferable_bound_stack_.reset(new base::debug::StackTrace);
|
| +#if !defined(NDEBUG)
|
| + ports::PortStatus port_status;
|
| + node_controller_->node()->GetStatus(port_, &port_status);
|
| + if (port_status.has_messages) {
|
| + ports::ScopedMessage unused;
|
| + size_t message_size = 0;
|
| + node_controller_->node()->GetMessageIf(
|
| + port_, [&message_size](const ports::Message& message) {
|
| + message_size = message.num_payload_bytes();
|
| + return false;
|
| + }, &unused);
|
| + DVLOG(1) << "New message detected on message pipe " << pipe_id_
|
| + << " endpoint " << endpoint_ << " [port=" << port_.name()
|
| + << "; size=" << message_size << "]";
|
| + }
|
| + if (port_status.peer_closed) {
|
| + DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_
|
| + << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
|
| + }
|
| #endif
|
|
|
| - // PostTask since the broker can call us back synchronously.
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&Broker::ConnectMessagePipe,
|
| - base::Unretained(internal::g_broker), pipe_id_,
|
| - base::Unretained(this)));
|
| + awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| }
|
|
|
| } // namespace edk
|
|
|