| Index: ipc/ipc_channel_reader.cc
|
| diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc
|
| index f41a0d7737e4656cd279db0de8ef83c2b9e9fbaa..79b79c23e536450402806c289e691ecafe64b3be 100644
|
| --- a/ipc/ipc_channel_reader.cc
|
| +++ b/ipc/ipc_channel_reader.cc
|
| @@ -4,8 +4,12 @@
|
|
|
| #include "ipc/ipc_channel_reader.h"
|
|
|
| +#include <algorithm>
|
| +
|
| #include "ipc/ipc_listener.h"
|
| #include "ipc/ipc_logging.h"
|
| +#include "ipc/ipc_message.h"
|
| +#include "ipc/ipc_message_attachment_set.h"
|
| #include "ipc/ipc_message_macros.h"
|
|
|
| namespace IPC {
|
| @@ -16,26 +20,35 @@ ChannelReader::ChannelReader(Listener* listener) : listener_(listener) {
|
| }
|
|
|
| ChannelReader::~ChannelReader() {
|
| + if (!blocked_ids_.empty())
|
| + StopObservingAttachmentBroker();
|
| }
|
|
|
| -bool ChannelReader::ProcessIncomingMessages() {
|
| +ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() {
|
| while (true) {
|
| int bytes_read = 0;
|
| ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
|
| &bytes_read);
|
| if (read_state == READ_FAILED)
|
| - return false;
|
| + return DISPATCH_ERROR;
|
| if (read_state == READ_PENDING)
|
| - return true;
|
| + return DISPATCH_FINISHED;
|
|
|
| DCHECK(bytes_read > 0);
|
| - if (!DispatchInputData(input_buf_, bytes_read))
|
| - return false;
|
| + if (!TranslateInputData(input_buf_, bytes_read))
|
| + return DISPATCH_ERROR;
|
| +
|
| + DispatchState state = DispatchMessages();
|
| + if (state != DISPATCH_FINISHED)
|
| + return state;
|
| }
|
| }
|
|
|
| -bool ChannelReader::AsyncReadComplete(int bytes_read) {
|
| - return DispatchInputData(input_buf_, bytes_read);
|
| +ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) {
|
| + if (!TranslateInputData(input_buf_, bytes_read))
|
| + return DISPATCH_ERROR;
|
| +
|
| + return DispatchMessages();
|
| }
|
|
|
| bool ChannelReader::IsInternalMessage(const Message& m) {
|
| @@ -49,8 +62,8 @@ bool ChannelReader::IsHelloMessage(const Message& m) {
|
| m.type() == Channel::HELLO_MESSAGE_TYPE;
|
| }
|
|
|
| -bool ChannelReader::DispatchInputData(const char* input_data,
|
| - int input_data_len) {
|
| +bool ChannelReader::TranslateInputData(const char* input_data,
|
| + int input_data_len) {
|
| const char* p;
|
| const char* end;
|
|
|
| @@ -75,27 +88,32 @@ bool ChannelReader::DispatchInputData(const char* input_data,
|
| const char* message_tail = Message::FindNext(p, end);
|
| if (message_tail) {
|
| int len = static_cast<int>(message_tail - p);
|
| - Message m(p, len);
|
| - if (!WillDispatchInputMessage(&m))
|
| +
|
| + Message translated_message(p, len);
|
| + if (!GetNonBrokeredAttachments(&translated_message))
|
| return false;
|
|
|
| -#ifdef IPC_MESSAGE_LOG_ENABLED
|
| - std::string name;
|
| - Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL);
|
| - TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData",
|
| - "name", name);
|
| -#else
|
| - TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData",
|
| - "class", IPC_MESSAGE_ID_CLASS(m.type()),
|
| - "line", IPC_MESSAGE_ID_LINE(m.type()));
|
| -#endif
|
| - m.TraceMessageEnd();
|
| - if (IsInternalMessage(m))
|
| - HandleInternalMessage(m);
|
| - else
|
| - listener_->OnMessageReceived(m);
|
| - if (m.dispatch_error())
|
| - listener_->OnBadMessageReceived(m);
|
| + // If there are no queued messages, attempt to immediately dispatch the
|
| + // newly translated message.
|
| + if (queued_messages_.empty()) {
|
| + DCHECK(blocked_ids_.empty());
|
| + AttachmentIdSet blocked_ids =
|
| + GetBrokeredAttachments(&translated_message);
|
| +
|
| + if (blocked_ids.empty()) {
|
| + // Dispatch the message and continue the loop.
|
| + DispatchMessage(&translated_message);
|
| + p = message_tail;
|
| + continue;
|
| + }
|
| +
|
| + blocked_ids_.swap(blocked_ids);
|
| + StartObservingAttachmentBroker();
|
| + }
|
| +
|
| + // Make a deep copy of |translated_message| to add to the queue.
|
| + scoped_ptr<Message> m(new Message(translated_message));
|
| + queued_messages_.push_back(m.release());
|
| p = message_tail;
|
| } else {
|
| // Last message is partial.
|
| @@ -111,6 +129,98 @@ bool ChannelReader::DispatchInputData(const char* input_data,
|
| return true;
|
| }
|
|
|
| +ChannelReader::DispatchState ChannelReader::DispatchMessages() {
|
| + while (!queued_messages_.empty()) {
|
| + if (!blocked_ids_.empty())
|
| + return DISPATCH_WAITING_ON_BROKER;
|
| +
|
| + Message* m = queued_messages_.front();
|
| +
|
| + AttachmentIdSet blocked_ids = GetBrokeredAttachments(m);
|
| + if (!blocked_ids.empty()) {
|
| + blocked_ids_.swap(blocked_ids);
|
| + StartObservingAttachmentBroker();
|
| + return DISPATCH_WAITING_ON_BROKER;
|
| + }
|
| +
|
| + DispatchMessage(m);
|
| + queued_messages_.erase(queued_messages_.begin());
|
| + }
|
| + return DISPATCH_FINISHED;
|
| +}
|
| +
|
| +void ChannelReader::DispatchMessage(Message* m) {
|
| +#ifdef IPC_MESSAGE_LOG_ENABLED
|
| + std::string name;
|
| + Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL);
|
| + TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name",
|
| + name);
|
| +#else
|
| + TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class",
|
| + IPC_MESSAGE_ID_CLASS(m->type()), "line",
|
| + IPC_MESSAGE_ID_LINE(m->type()));
|
| +#endif
|
| + m->TraceMessageEnd();
|
| + if (IsInternalMessage(*m))
|
| + HandleInternalMessage(*m);
|
| + else
|
| + listener_->OnMessageReceived(*m);
|
| + if (m->dispatch_error())
|
| + listener_->OnBadMessageReceived(*m);
|
| +}
|
| +
|
| +ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments(
|
| + Message* msg) {
|
| + std::set<BrokerableAttachment::AttachmentId> blocked_ids;
|
| +
|
| +#if USE_ATTACHMENT_BROKER
|
| + MessageAttachmentSet* set = msg->attachment_set();
|
| + for (const scoped_refptr<BrokerableAttachment>& attachment :
|
| + set->GetBrokerableAttachmentsForUpdating()) {
|
| + if (attachment->NeedsBrokering()) {
|
| + AttachmentBroker* broker = GetAttachmentBroker();
|
| + scoped_refptr<BrokerableAttachment> brokered_attachment;
|
| + bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(),
|
| + &brokered_attachment);
|
| + if (!result) {
|
| + blocked_ids.insert(attachment->GetIdentifier());
|
| + continue;
|
| + }
|
| +
|
| + attachment->PopulateWithAttachment(brokered_attachment.get());
|
| + }
|
| + }
|
| +#endif // USE_ATTACHMENT_BROKER
|
| +
|
| + return blocked_ids;
|
| +}
|
| +
|
| +void ChannelReader::ReceivedBrokerableAttachmentWithId(
|
| + const BrokerableAttachment::AttachmentId& id) {
|
| + if (blocked_ids_.empty())
|
| + return;
|
| +
|
| + auto it = find(blocked_ids_.begin(), blocked_ids_.end(), id);
|
| + if (it != blocked_ids_.end())
|
| + blocked_ids_.erase(it);
|
| +
|
| + if (blocked_ids_.empty()) {
|
| + StopObservingAttachmentBroker();
|
| + DispatchMessages();
|
| + }
|
| +}
|
| +
|
| +void ChannelReader::StartObservingAttachmentBroker() {
|
| +#if USE_ATTACHMENT_BROKER
|
| + GetAttachmentBroker()->AddObserver(this);
|
| +#endif // USE_ATTACHMENT_BROKER
|
| +}
|
| +
|
| +void ChannelReader::StopObservingAttachmentBroker() {
|
| +#if USE_ATTACHMENT_BROKER
|
| + GetAttachmentBroker()->RemoveObserver(this);
|
| +#endif // USE_ATTACHMENT_BROKER
|
| +}
|
|
|
| } // namespace internal
|
| } // namespace IPC
|
|
|