Chromium Code Reviews| Index: ipc/ipc_channel_reader.cc |
| diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc |
| index f41a0d7737e4656cd279db0de8ef83c2b9e9fbaa..21217f36fdfedc44dd7ca2cf78148cd2ec28c356 100644 |
| --- a/ipc/ipc_channel_reader.cc |
| +++ b/ipc/ipc_channel_reader.cc |
| @@ -6,6 +6,8 @@ |
| #include "ipc/ipc_listener.h" |
| #include "ipc/ipc_logging.h" |
| +#include "ipc/ipc_message.h" |
| +#include "ipc/ipc_message_attachment_set.h" |
| #include "ipc/ipc_message_macros.h" |
| namespace IPC { |
| @@ -16,26 +18,35 @@ ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { |
| } |
| ChannelReader::~ChannelReader() { |
| + if (!blocked_ids_.empty()) |
| + StopObservingAttachmentBroker(); |
| } |
| -bool ChannelReader::ProcessIncomingMessages() { |
| +ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() { |
| while (true) { |
| int bytes_read = 0; |
| ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, |
| &bytes_read); |
| if (read_state == READ_FAILED) |
| - return false; |
| + return DISPATCH_ERROR; |
| if (read_state == READ_PENDING) |
| - return true; |
| + return DISPATCH_FINISHED; |
| DCHECK(bytes_read > 0); |
| - if (!DispatchInputData(input_buf_, bytes_read)) |
| - return false; |
| + if (!TranslateInputData(input_buf_, bytes_read)) |
| + return DISPATCH_ERROR; |
| + |
| + DispatchState state = DispatchMessages(); |
| + if (state != DISPATCH_FINISHED) |
| + return state; |
| } |
| } |
| -bool ChannelReader::AsyncReadComplete(int bytes_read) { |
| - return DispatchInputData(input_buf_, bytes_read); |
| +ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) { |
| + if (!TranslateInputData(input_buf_, bytes_read)) |
| + return DISPATCH_ERROR; |
| + |
| + return DispatchMessages(); |
| } |
| bool ChannelReader::IsInternalMessage(const Message& m) { |
| @@ -49,8 +60,8 @@ bool ChannelReader::IsHelloMessage(const Message& m) { |
| m.type() == Channel::HELLO_MESSAGE_TYPE; |
| } |
| -bool ChannelReader::DispatchInputData(const char* input_data, |
| - int input_data_len) { |
| +bool ChannelReader::TranslateInputData(const char* input_data, |
| + int input_data_len) { |
| const char* p; |
| const char* end; |
| @@ -75,27 +86,11 @@ bool ChannelReader::DispatchInputData(const char* input_data, |
| const char* message_tail = Message::FindNext(p, end); |
| if (message_tail) { |
| int len = static_cast<int>(message_tail - p); |
| - Message m(p, len); |
| - if (!WillDispatchInputMessage(&m)) |
| + Message* m = new Message(p, len); |
| + if (!GetNonBrokeredAttachments(m)) |
|
Tom Sepez
2015/07/13 18:28:47
Do we leak |m| here if we return early?
erikchen
2015/07/17 18:44:01
Yup. That's what I deserve for not using scoped_pt
|
| return false; |
| -#ifdef IPC_MESSAGE_LOG_ENABLED |
| - std::string name; |
| - Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL); |
| - TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", |
| - "name", name); |
| -#else |
| - TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", |
| - "class", IPC_MESSAGE_ID_CLASS(m.type()), |
| - "line", IPC_MESSAGE_ID_LINE(m.type())); |
| -#endif |
| - m.TraceMessageEnd(); |
| - if (IsInternalMessage(m)) |
| - HandleInternalMessage(m); |
| - else |
| - listener_->OnMessageReceived(m); |
| - if (m.dispatch_error()) |
| - listener_->OnBadMessageReceived(m); |
| + queued_messages_.push_back(m); |
| p = message_tail; |
| } else { |
| // Last message is partial. |
| @@ -111,6 +106,96 @@ bool ChannelReader::DispatchInputData(const char* input_data, |
| return true; |
| } |
| +ChannelReader::DispatchState ChannelReader::DispatchMessages() { |
| + while (!queued_messages_.empty()) { |
| + if (!blocked_ids_.empty()) |
| + return DISPATCH_WAITING_ON_BROKER; |
| + |
| + Message* m = queued_messages_.front(); |
| + |
| + AttachmentIdSet blocked_ids = GetBrokeredAttachments(m); |
| + if (!blocked_ids.empty()) { |
| + blocked_ids_.swap(blocked_ids); |
| + StartObservingAttachmentBroker(); |
| + return DISPATCH_WAITING_ON_BROKER; |
| + } |
| + |
| + DispatchMessage(m); |
| + queued_messages_.erase(queued_messages_.begin()); |
| + } |
| + return DISPATCH_FINISHED; |
| +} |
| + |
| +void ChannelReader::DispatchMessage(Message* m) { |
| +#ifdef IPC_MESSAGE_LOG_ENABLED |
| + std::string name; |
| + Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL); |
| + TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name", |
| + name); |
| +#else |
| + TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class", |
| + IPC_MESSAGE_ID_CLASS(m->type()), "line", |
| + IPC_MESSAGE_ID_LINE(m->type())); |
| +#endif |
| + m->TraceMessageEnd(); |
| + if (IsInternalMessage(*m)) |
| + HandleInternalMessage(*m); |
| + else |
| + listener_->OnMessageReceived(*m); |
| + if (m->dispatch_error()) |
| + listener_->OnBadMessageReceived(*m); |
| +} |
| + |
| +ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments( |
| + Message* msg) { |
| + std::set<BrokerableAttachment::AttachmentId> blocked_ids; |
| + |
| + MessageAttachmentSet* set = msg->attachment_set(); |
| + for (const scoped_refptr<BrokerableAttachment>& attachment : |
| + set->GetBrokerableAttachmentsForUpdating()) { |
| + if (attachment->NeedsBrokering()) { |
| + AttachmentBroker* broker = GetAttachmentBroker(); |
| + scoped_refptr<BrokerableAttachment> brokered_attachment; |
| + bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(), |
| + &brokered_attachment); |
| + if (!result) { |
| + blocked_ids.insert(attachment->GetIdentifier()); |
| + continue; |
| + } |
| + |
| + attachment->PopulateWithAttachment(brokered_attachment.get()); |
| + } |
| + } |
| + |
| + return blocked_ids; |
| +} |
| + |
| +void ChannelReader::ReceivedBrokerableAttachmentWithId( |
| + const BrokerableAttachment::AttachmentId& id) { |
| + if (blocked_ids_.empty()) |
| + return; |
| + |
| + for (AttachmentIdSet::iterator it = blocked_ids_.begin(); |
|
Tom Sepez
2015/07/13 18:28:47
nit: find()
erikchen
2015/07/17 18:44:01
Done.
|
| + it != blocked_ids_.end(); ++it) { |
| + if (*it == id) { |
| + blocked_ids_.erase(it); |
| + break; |
| + } |
| + } |
| + |
| + if (blocked_ids_.empty()) { |
| + StopObservingAttachmentBroker(); |
| + DispatchMessages(); |
| + } |
| +} |
| + |
| +void ChannelReader::StartObservingAttachmentBroker() { |
| + GetAttachmentBroker()->AddObserver(this); |
| +} |
| + |
| +void ChannelReader::StopObservingAttachmentBroker() { |
| + GetAttachmentBroker()->RemoveObserver(this); |
| +} |
| } // namespace internal |
| } // namespace IPC |