Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
index 999869a172eed7539377902a893a3e5f963b423f..e9baf645a131af851b3f6b439fd477f2985ae086 100644 |
--- a/mojo/edk/system/message_pipe_dispatcher.cc |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -14,6 +14,7 @@ |
#include "mojo/edk/system/core.h" |
#include "mojo/edk/system/message_for_transit.h" |
#include "mojo/edk/system/node_controller.h" |
+#include "mojo/edk/system/ports/message_filter.h" |
#include "mojo/edk/system/ports_message.h" |
#include "mojo/edk/system/request_context.h" |
@@ -59,6 +60,101 @@ class MessagePipeDispatcher::PortObserverThunk |
DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
}; |
+// A MessageFilter used by ReadMessage to determine whether a message should |
+// actually be consumed yet. |
+class ReadMessageFilter : public ports::MessageFilter { |
+ public: |
+ // Creates a new ReadMessageFilter which captures and potentially modifies |
+ // various (unowned) local state within MessagePipeDispatcher::ReadMessage. |
+ ReadMessageFilter(bool read_any_size, |
+ bool may_discard, |
+ uint32_t* num_bytes, |
+ uint32_t* num_handles, |
+ bool* no_space, |
+ bool* invalid_message) |
+ : read_any_size_(read_any_size), |
+ may_discard_(may_discard), |
+ num_bytes_(num_bytes), |
+ num_handles_(num_handles), |
+ no_space_(no_space), |
+ invalid_message_(invalid_message) {} |
+ |
+ ~ReadMessageFilter() override {} |
+ |
+ // ports::MessageFilter: |
+ bool Match(const ports::Message& m) override { |
+ const PortsMessage& message = static_cast<const PortsMessage&>(m); |
+ if (message.num_payload_bytes() < sizeof(MessageHeader)) { |
+ *invalid_message_ = true; |
+ return true; |
+ } |
+ |
+ const MessageHeader* header = |
+ static_cast<const MessageHeader*>(message.payload_bytes()); |
+ if (header->header_size > message.num_payload_bytes()) { |
+ *invalid_message_ = true; |
+ return true; |
+ } |
+ |
+ uint32_t bytes_to_read = 0; |
+ uint32_t bytes_available = |
+ static_cast<uint32_t>(message.num_payload_bytes()) - |
+ header->header_size; |
+ if (num_bytes_) { |
+ bytes_to_read = std::min(*num_bytes_, bytes_available); |
+ *num_bytes_ = bytes_available; |
+ } |
+ |
+ uint32_t handles_to_read = 0; |
+ uint32_t handles_available = header->num_dispatchers; |
+ if (num_handles_) { |
+ handles_to_read = std::min(*num_handles_, handles_available); |
+ *num_handles_ = handles_available; |
+ } |
+ |
+ if (handles_to_read < handles_available || |
+ (!read_any_size_ && bytes_to_read < bytes_available)) { |
+ *no_space_ = true; |
+ return may_discard_; |
+ } |
+ |
+ return true; |
+ } |
+ |
+ private: |
+ const bool read_any_size_; |
+ const bool may_discard_; |
+ uint32_t* const num_bytes_; |
+ uint32_t* const num_handles_; |
+ bool* const no_space_; |
+ bool* invalid_message_; |
yzshen1
2016/11/03 00:11:27
nit: now that the other pointers are made const, m
Ken Rockot(use gerrit already)
2016/11/03 01:25:07
done
|
+ |
+ DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter); |
+}; |
+ |
+#if DCHECK_IS_ON() |
+ |
+// A MessageFilter which never matches a message. Used to peek at the size of |
+// the next available message on a port, for debug logging only. |
+class PeekSizeMessageFilter : public ports::MessageFilter { |
+ public: |
+ PeekSizeMessageFilter(size_t* size_storage) : size_storage_(size_storage) {} |
+ ~PeekSizeMessageFilter() override {} |
+ |
+ // ports::MessageFilter: |
+ bool Match(const ports::Message& message) override { |
+ *size_storage_ = message.num_payload_bytes(); |
+ return false; |
+ } |
+ |
+ private: |
+ size_t* const size_storage_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter); |
+}; |
+ |
+#endif // DCHECK_IS_ON() |
+ |
MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
const ports::PortRef& port, |
uint64_t pipe_id, |
@@ -186,50 +282,9 @@ MojoResult MessagePipeDispatcher::ReadMessage( |
// This flag exists to support both new and old API behavior. |
ports::ScopedMessage ports_message; |
- int rv = node_controller_->node()->GetMessageIf( |
- port_, |
- [read_any_size, num_bytes, num_handles, &no_space, &may_discard, |
- &invalid_message]( |
- const ports::Message& next_message) { |
- const PortsMessage& message = |
- static_cast<const PortsMessage&>(next_message); |
- if (message.num_payload_bytes() < sizeof(MessageHeader)) { |
- invalid_message = true; |
- return true; |
- } |
- |
- const MessageHeader* header = |
- static_cast<const MessageHeader*>(message.payload_bytes()); |
- if (header->header_size > message.num_payload_bytes()) { |
- invalid_message = true; |
- return true; |
- } |
- |
- uint32_t bytes_to_read = 0; |
- uint32_t bytes_available = |
- static_cast<uint32_t>(message.num_payload_bytes()) - |
- header->header_size; |
- if (num_bytes) { |
- bytes_to_read = std::min(*num_bytes, bytes_available); |
- *num_bytes = bytes_available; |
- } |
- |
- uint32_t handles_to_read = 0; |
- uint32_t handles_available = header->num_dispatchers; |
- if (num_handles) { |
- handles_to_read = std::min(*num_handles, handles_available); |
- *num_handles = handles_available; |
- } |
- |
- if (handles_to_read < handles_available || |
- (!read_any_size && bytes_to_read < bytes_available)) { |
- no_space = true; |
- return may_discard; |
- } |
- |
- return true; |
- }, |
- &ports_message); |
+ ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, |
+ &no_space, &invalid_message); |
+ int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); |
if (invalid_message) |
return MOJO_RESULT_UNKNOWN; |
@@ -531,11 +586,8 @@ void MessagePipeDispatcher::OnPortStatusChanged() { |
if (port_status.has_messages) { |
ports::ScopedMessage unused; |
size_t message_size = 0; |
- node_controller_->node()->GetMessageIf( |
- port_, [&message_size](const ports::Message& message) { |
- message_size = message.num_payload_bytes(); |
- return false; |
- }, &unused); |
+ PeekSizeMessageFilter filter(&message_size); |
yzshen1
2016/11/03 00:11:27
Does it make sense to make message_size a member o
Ken Rockot(use gerrit already)
2016/11/03 01:25:07
done
|
+ node_controller_->node()->GetMessage(port_, &unused, &filter); |
DVLOG(4) << "New message detected on message pipe " << pipe_id_ |
<< " endpoint " << endpoint_ << " [port=" << port_.name() |
<< "; size=" << message_size << "]"; |