Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
index 999869a172eed7539377902a893a3e5f963b423f..0fe2c8b7687ea6ac449f21189cc15fc4228ecad8 100644 |
--- a/mojo/edk/system/message_pipe_dispatcher.cc |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -14,6 +14,7 @@ |
#include "mojo/edk/system/core.h" |
#include "mojo/edk/system/message_for_transit.h" |
#include "mojo/edk/system/node_controller.h" |
+#include "mojo/edk/system/ports/message_filter.h" |
#include "mojo/edk/system/ports_message.h" |
#include "mojo/edk/system/request_context.h" |
@@ -59,6 +60,103 @@ class MessagePipeDispatcher::PortObserverThunk |
DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
}; |
+// A MessageFilter used by ReadMessage to determine whether a message should |
+// actually be consumed yet. |
+class ReadMessageFilter : public ports::MessageFilter { |
+ public: |
+ // Creates a new ReadMessageFilter which captures and potentially modifies |
+ // various (unowned) local state within MessagePipeDispatcher::ReadMessage. |
+ ReadMessageFilter(bool read_any_size, |
+ bool may_discard, |
+ uint32_t* num_bytes, |
+ uint32_t* num_handles, |
+ bool* no_space, |
+ bool* invalid_message) |
+ : read_any_size_(read_any_size), |
+ may_discard_(may_discard), |
+ num_bytes_(num_bytes), |
+ num_handles_(num_handles), |
+ no_space_(no_space), |
+ invalid_message_(invalid_message) {} |
+ |
+ ~ReadMessageFilter() override {} |
+ |
+ // ports::MessageFilter: |
+ bool Match(const ports::Message& m) override { |
+ const PortsMessage& message = static_cast<const PortsMessage&>(m); |
+ if (message.num_payload_bytes() < sizeof(MessageHeader)) { |
+ *invalid_message_ = true; |
+ return true; |
+ } |
+ |
+ const MessageHeader* header = |
+ static_cast<const MessageHeader*>(message.payload_bytes()); |
+ if (header->header_size > message.num_payload_bytes()) { |
+ *invalid_message_ = true; |
+ return true; |
+ } |
+ |
+ uint32_t bytes_to_read = 0; |
+ uint32_t bytes_available = |
+ static_cast<uint32_t>(message.num_payload_bytes()) - |
+ header->header_size; |
+ if (num_bytes_) { |
+ bytes_to_read = std::min(*num_bytes_, bytes_available); |
+ *num_bytes_ = bytes_available; |
+ } |
+ |
+ uint32_t handles_to_read = 0; |
+ uint32_t handles_available = header->num_dispatchers; |
+ if (num_handles_) { |
+ handles_to_read = std::min(*num_handles_, handles_available); |
+ *num_handles_ = handles_available; |
+ } |
+ |
+ if (handles_to_read < handles_available || |
+ (!read_any_size_ && bytes_to_read < bytes_available)) { |
+ *no_space_ = true; |
+ return may_discard_; |
+ } |
+ |
+ return true; |
+ } |
+ |
+ private: |
+ const bool read_any_size_; |
+ const bool may_discard_; |
+ uint32_t* const num_bytes_; |
+ uint32_t* const num_handles_; |
+ bool* const no_space_; |
+ bool* const invalid_message_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter); |
+}; |
+ |
+#if DCHECK_IS_ON() |
+ |
+// A MessageFilter which never matches a message. Used to peek at the size of |
+// the next available message on a port, for debug logging only. |
+class PeekSizeMessageFilter : public ports::MessageFilter { |
+ public: |
+ PeekSizeMessageFilter() {} |
+ ~PeekSizeMessageFilter() override {} |
+ |
+ // ports::MessageFilter: |
+ bool Match(const ports::Message& message) override { |
+ message_size_ = message.num_payload_bytes(); |
+ return false; |
+ } |
+ |
+ size_t message_size() const { return message_size_; } |
+ |
+ private: |
+ size_t message_size_ = 0; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter); |
+}; |
+ |
+#endif // DCHECK_IS_ON() |
+ |
MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
const ports::PortRef& port, |
uint64_t pipe_id, |
@@ -186,50 +284,9 @@ MojoResult MessagePipeDispatcher::ReadMessage( |
// This flag exists to support both new and old API behavior. |
ports::ScopedMessage ports_message; |
- int rv = node_controller_->node()->GetMessageIf( |
- port_, |
- [read_any_size, num_bytes, num_handles, &no_space, &may_discard, |
- &invalid_message]( |
- const ports::Message& next_message) { |
- const PortsMessage& message = |
- static_cast<const PortsMessage&>(next_message); |
- if (message.num_payload_bytes() < sizeof(MessageHeader)) { |
- invalid_message = true; |
- return true; |
- } |
- |
- const MessageHeader* header = |
- static_cast<const MessageHeader*>(message.payload_bytes()); |
- if (header->header_size > message.num_payload_bytes()) { |
- invalid_message = true; |
- return true; |
- } |
- |
- uint32_t bytes_to_read = 0; |
- uint32_t bytes_available = |
- static_cast<uint32_t>(message.num_payload_bytes()) - |
- header->header_size; |
- if (num_bytes) { |
- bytes_to_read = std::min(*num_bytes, bytes_available); |
- *num_bytes = bytes_available; |
- } |
- |
- uint32_t handles_to_read = 0; |
- uint32_t handles_available = header->num_dispatchers; |
- if (num_handles) { |
- handles_to_read = std::min(*num_handles, handles_available); |
- *num_handles = handles_available; |
- } |
- |
- if (handles_to_read < handles_available || |
- (!read_any_size && bytes_to_read < bytes_available)) { |
- no_space = true; |
- return may_discard; |
- } |
- |
- return true; |
- }, |
- &ports_message); |
+ ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, |
+ &no_space, &invalid_message); |
+ int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); |
if (invalid_message) |
return MOJO_RESULT_UNKNOWN; |
@@ -530,15 +587,11 @@ void MessagePipeDispatcher::OnPortStatusChanged() { |
if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) { |
if (port_status.has_messages) { |
ports::ScopedMessage unused; |
- size_t message_size = 0; |
- node_controller_->node()->GetMessageIf( |
- port_, [&message_size](const ports::Message& message) { |
- message_size = message.num_payload_bytes(); |
- return false; |
- }, &unused); |
+ PeekSizeMessageFilter filter; |
+ node_controller_->node()->GetMessage(port_, &unused, &filter); |
DVLOG(4) << "New message detected on message pipe " << pipe_id_ |
<< " endpoint " << endpoint_ << " [port=" << port_.name() |
- << "; size=" << message_size << "]"; |
+ << "; size=" << filter.message_size() << "]"; |
} |
if (port_status.peer_closed) { |
DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ |