| Index: mojo/edk/system/message_pipe_dispatcher.cc
|
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
|
| index 999869a172eed7539377902a893a3e5f963b423f..0fe2c8b7687ea6ac449f21189cc15fc4228ecad8 100644
|
| --- a/mojo/edk/system/message_pipe_dispatcher.cc
|
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc
|
| @@ -14,6 +14,7 @@
|
| #include "mojo/edk/system/core.h"
|
| #include "mojo/edk/system/message_for_transit.h"
|
| #include "mojo/edk/system/node_controller.h"
|
| +#include "mojo/edk/system/ports/message_filter.h"
|
| #include "mojo/edk/system/ports_message.h"
|
| #include "mojo/edk/system/request_context.h"
|
|
|
| @@ -59,6 +60,103 @@ class MessagePipeDispatcher::PortObserverThunk
|
| DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
|
| };
|
|
|
| +// A MessageFilter used by ReadMessage to determine whether a message should
|
| +// actually be consumed yet.
|
| +class ReadMessageFilter : public ports::MessageFilter {
|
| + public:
|
| + // Creates a new ReadMessageFilter which captures and potentially modifies
|
| + // various (unowned) local state within MessagePipeDispatcher::ReadMessage.
|
| + ReadMessageFilter(bool read_any_size,
|
| + bool may_discard,
|
| + uint32_t* num_bytes,
|
| + uint32_t* num_handles,
|
| + bool* no_space,
|
| + bool* invalid_message)
|
| + : read_any_size_(read_any_size),
|
| + may_discard_(may_discard),
|
| + num_bytes_(num_bytes),
|
| + num_handles_(num_handles),
|
| + no_space_(no_space),
|
| + invalid_message_(invalid_message) {}
|
| +
|
| + ~ReadMessageFilter() override {}
|
| +
|
| + // ports::MessageFilter:
|
| + bool Match(const ports::Message& m) override {
|
| + const PortsMessage& message = static_cast<const PortsMessage&>(m);
|
| + if (message.num_payload_bytes() < sizeof(MessageHeader)) {
|
| + *invalid_message_ = true;
|
| + return true;
|
| + }
|
| +
|
| + const MessageHeader* header =
|
| + static_cast<const MessageHeader*>(message.payload_bytes());
|
| + if (header->header_size > message.num_payload_bytes()) {
|
| + *invalid_message_ = true;
|
| + return true;
|
| + }
|
| +
|
| + uint32_t bytes_to_read = 0;
|
| + uint32_t bytes_available =
|
| + static_cast<uint32_t>(message.num_payload_bytes()) -
|
| + header->header_size;
|
| + if (num_bytes_) {
|
| + bytes_to_read = std::min(*num_bytes_, bytes_available);
|
| + *num_bytes_ = bytes_available;
|
| + }
|
| +
|
| + uint32_t handles_to_read = 0;
|
| + uint32_t handles_available = header->num_dispatchers;
|
| + if (num_handles_) {
|
| + handles_to_read = std::min(*num_handles_, handles_available);
|
| + *num_handles_ = handles_available;
|
| + }
|
| +
|
| + if (handles_to_read < handles_available ||
|
| + (!read_any_size_ && bytes_to_read < bytes_available)) {
|
| + *no_space_ = true;
|
| + return may_discard_;
|
| + }
|
| +
|
| + return true;
|
| + }
|
| +
|
| + private:
|
| + const bool read_any_size_;
|
| + const bool may_discard_;
|
| + uint32_t* const num_bytes_;
|
| + uint32_t* const num_handles_;
|
| + bool* const no_space_;
|
| + bool* const invalid_message_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter);
|
| +};
|
| +
|
| +#if DCHECK_IS_ON()
|
| +
|
| +// A MessageFilter which never matches a message. Used to peek at the size of
|
| +// the next available message on a port, for debug logging only.
|
| +class PeekSizeMessageFilter : public ports::MessageFilter {
|
| + public:
|
| + PeekSizeMessageFilter() {}
|
| + ~PeekSizeMessageFilter() override {}
|
| +
|
| + // ports::MessageFilter:
|
| + bool Match(const ports::Message& message) override {
|
| + message_size_ = message.num_payload_bytes();
|
| + return false;
|
| + }
|
| +
|
| + size_t message_size() const { return message_size_; }
|
| +
|
| + private:
|
| + size_t message_size_ = 0;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter);
|
| +};
|
| +
|
| +#endif // DCHECK_IS_ON()
|
| +
|
| MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
|
| const ports::PortRef& port,
|
| uint64_t pipe_id,
|
| @@ -186,50 +284,9 @@ MojoResult MessagePipeDispatcher::ReadMessage(
|
| // This flag exists to support both new and old API behavior.
|
|
|
| ports::ScopedMessage ports_message;
|
| - int rv = node_controller_->node()->GetMessageIf(
|
| - port_,
|
| - [read_any_size, num_bytes, num_handles, &no_space, &may_discard,
|
| - &invalid_message](
|
| - const ports::Message& next_message) {
|
| - const PortsMessage& message =
|
| - static_cast<const PortsMessage&>(next_message);
|
| - if (message.num_payload_bytes() < sizeof(MessageHeader)) {
|
| - invalid_message = true;
|
| - return true;
|
| - }
|
| -
|
| - const MessageHeader* header =
|
| - static_cast<const MessageHeader*>(message.payload_bytes());
|
| - if (header->header_size > message.num_payload_bytes()) {
|
| - invalid_message = true;
|
| - return true;
|
| - }
|
| -
|
| - uint32_t bytes_to_read = 0;
|
| - uint32_t bytes_available =
|
| - static_cast<uint32_t>(message.num_payload_bytes()) -
|
| - header->header_size;
|
| - if (num_bytes) {
|
| - bytes_to_read = std::min(*num_bytes, bytes_available);
|
| - *num_bytes = bytes_available;
|
| - }
|
| -
|
| - uint32_t handles_to_read = 0;
|
| - uint32_t handles_available = header->num_dispatchers;
|
| - if (num_handles) {
|
| - handles_to_read = std::min(*num_handles, handles_available);
|
| - *num_handles = handles_available;
|
| - }
|
| -
|
| - if (handles_to_read < handles_available ||
|
| - (!read_any_size && bytes_to_read < bytes_available)) {
|
| - no_space = true;
|
| - return may_discard;
|
| - }
|
| -
|
| - return true;
|
| - },
|
| - &ports_message);
|
| + ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles,
|
| + &no_space, &invalid_message);
|
| + int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter);
|
|
|
| if (invalid_message)
|
| return MOJO_RESULT_UNKNOWN;
|
| @@ -530,15 +587,11 @@ void MessagePipeDispatcher::OnPortStatusChanged() {
|
| if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
|
| if (port_status.has_messages) {
|
| ports::ScopedMessage unused;
|
| - size_t message_size = 0;
|
| - node_controller_->node()->GetMessageIf(
|
| - port_, [&message_size](const ports::Message& message) {
|
| - message_size = message.num_payload_bytes();
|
| - return false;
|
| - }, &unused);
|
| + PeekSizeMessageFilter filter;
|
| + node_controller_->node()->GetMessage(port_, &unused, &filter);
|
| DVLOG(4) << "New message detected on message pipe " << pipe_id_
|
| << " endpoint " << endpoint_ << " [port=" << port_.name()
|
| - << "; size=" << message_size << "]";
|
| + << "; size=" << filter.message_size() << "]";
|
| }
|
| if (port_status.peer_closed) {
|
| DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
|
|
|