Chromium Code Reviews| Index: mojo/edk/system/message_pipe_dispatcher.cc |
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
| index 999869a172eed7539377902a893a3e5f963b423f..e9baf645a131af851b3f6b439fd477f2985ae086 100644 |
| --- a/mojo/edk/system/message_pipe_dispatcher.cc |
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc |
| @@ -14,6 +14,7 @@ |
| #include "mojo/edk/system/core.h" |
| #include "mojo/edk/system/message_for_transit.h" |
| #include "mojo/edk/system/node_controller.h" |
| +#include "mojo/edk/system/ports/message_filter.h" |
| #include "mojo/edk/system/ports_message.h" |
| #include "mojo/edk/system/request_context.h" |
| @@ -59,6 +60,101 @@ class MessagePipeDispatcher::PortObserverThunk |
| DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
| }; |
| +// A MessageFilter used by ReadMessage to determine whether a message should |
| +// actually be consumed yet. |
| +class ReadMessageFilter : public ports::MessageFilter { |
| + public: |
| + // Creates a new ReadMessageFilter which captures and potentially modifies |
| + // various (unowned) local state within MessagePipeDispatcher::ReadMessage. |
| + ReadMessageFilter(bool read_any_size, |
| + bool may_discard, |
| + uint32_t* num_bytes, |
| + uint32_t* num_handles, |
| + bool* no_space, |
| + bool* invalid_message) |
| + : read_any_size_(read_any_size), |
| + may_discard_(may_discard), |
| + num_bytes_(num_bytes), |
| + num_handles_(num_handles), |
| + no_space_(no_space), |
| + invalid_message_(invalid_message) {} |
| + |
| + ~ReadMessageFilter() override {} |
| + |
| + // ports::MessageFilter: |
| + bool Match(const ports::Message& m) override { |
| + const PortsMessage& message = static_cast<const PortsMessage&>(m); |
| + if (message.num_payload_bytes() < sizeof(MessageHeader)) { |
| + *invalid_message_ = true; |
| + return true; |
| + } |
| + |
| + const MessageHeader* header = |
| + static_cast<const MessageHeader*>(message.payload_bytes()); |
| + if (header->header_size > message.num_payload_bytes()) { |
| + *invalid_message_ = true; |
| + return true; |
| + } |
| + |
| + uint32_t bytes_to_read = 0; |
| + uint32_t bytes_available = |
| + static_cast<uint32_t>(message.num_payload_bytes()) - |
| + header->header_size; |
| + if (num_bytes_) { |
| + bytes_to_read = std::min(*num_bytes_, bytes_available); |
| + *num_bytes_ = bytes_available; |
| + } |
| + |
| + uint32_t handles_to_read = 0; |
| + uint32_t handles_available = header->num_dispatchers; |
| + if (num_handles_) { |
| + handles_to_read = std::min(*num_handles_, handles_available); |
| + *num_handles_ = handles_available; |
| + } |
| + |
| + if (handles_to_read < handles_available || |
| + (!read_any_size_ && bytes_to_read < bytes_available)) { |
| + *no_space_ = true; |
| + return may_discard_; |
| + } |
| + |
| + return true; |
| + } |
| + |
| + private: |
| + const bool read_any_size_; |
| + const bool may_discard_; |
| + uint32_t* const num_bytes_; |
| + uint32_t* const num_handles_; |
| + bool* const no_space_; |
| + bool* invalid_message_; |
|
yzshen1
2016/11/03 00:11:27
nit: now that the other pointers are made const, m
Ken Rockot(use gerrit already)
2016/11/03 01:25:07
done
|
| + |
| + DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter); |
| +}; |
| + |
| +#if DCHECK_IS_ON() |
| + |
| +// A MessageFilter which never matches a message. Used to peek at the size of |
| +// the next available message on a port, for debug logging only. |
| +class PeekSizeMessageFilter : public ports::MessageFilter { |
| + public: |
| + PeekSizeMessageFilter(size_t* size_storage) : size_storage_(size_storage) {} |
| + ~PeekSizeMessageFilter() override {} |
| + |
| + // ports::MessageFilter: |
| + bool Match(const ports::Message& message) override { |
| + *size_storage_ = message.num_payload_bytes(); |
| + return false; |
| + } |
| + |
| + private: |
| + size_t* const size_storage_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter); |
| +}; |
| + |
| +#endif // DCHECK_IS_ON() |
| + |
| MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
| const ports::PortRef& port, |
| uint64_t pipe_id, |
| @@ -186,50 +282,9 @@ MojoResult MessagePipeDispatcher::ReadMessage( |
| // This flag exists to support both new and old API behavior. |
| ports::ScopedMessage ports_message; |
| - int rv = node_controller_->node()->GetMessageIf( |
| - port_, |
| - [read_any_size, num_bytes, num_handles, &no_space, &may_discard, |
| - &invalid_message]( |
| - const ports::Message& next_message) { |
| - const PortsMessage& message = |
| - static_cast<const PortsMessage&>(next_message); |
| - if (message.num_payload_bytes() < sizeof(MessageHeader)) { |
| - invalid_message = true; |
| - return true; |
| - } |
| - |
| - const MessageHeader* header = |
| - static_cast<const MessageHeader*>(message.payload_bytes()); |
| - if (header->header_size > message.num_payload_bytes()) { |
| - invalid_message = true; |
| - return true; |
| - } |
| - |
| - uint32_t bytes_to_read = 0; |
| - uint32_t bytes_available = |
| - static_cast<uint32_t>(message.num_payload_bytes()) - |
| - header->header_size; |
| - if (num_bytes) { |
| - bytes_to_read = std::min(*num_bytes, bytes_available); |
| - *num_bytes = bytes_available; |
| - } |
| - |
| - uint32_t handles_to_read = 0; |
| - uint32_t handles_available = header->num_dispatchers; |
| - if (num_handles) { |
| - handles_to_read = std::min(*num_handles, handles_available); |
| - *num_handles = handles_available; |
| - } |
| - |
| - if (handles_to_read < handles_available || |
| - (!read_any_size && bytes_to_read < bytes_available)) { |
| - no_space = true; |
| - return may_discard; |
| - } |
| - |
| - return true; |
| - }, |
| - &ports_message); |
| + ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, |
| + &no_space, &invalid_message); |
| + int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); |
| if (invalid_message) |
| return MOJO_RESULT_UNKNOWN; |
| @@ -531,11 +586,8 @@ void MessagePipeDispatcher::OnPortStatusChanged() { |
| if (port_status.has_messages) { |
| ports::ScopedMessage unused; |
| size_t message_size = 0; |
| - node_controller_->node()->GetMessageIf( |
| - port_, [&message_size](const ports::Message& message) { |
| - message_size = message.num_payload_bytes(); |
| - return false; |
| - }, &unused); |
| + PeekSizeMessageFilter filter(&message_size); |
|
yzshen1
2016/11/03 00:11:27
Does it make sense to make message_size a member o
Ken Rockot(use gerrit already)
2016/11/03 01:25:07
done
|
| + node_controller_->node()->GetMessage(port_, &unused, &filter); |
| DVLOG(4) << "New message detected on message pipe " << pipe_id_ |
| << " endpoint " << endpoint_ << " [port=" << port_.name() |
| << "; size=" << message_size << "]"; |