Index: ipc/ipc_channel_reader.cc |
diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc |
index f41a0d7737e4656cd279db0de8ef83c2b9e9fbaa..634f15a76f0e057ad910a3c11575c67cf60f1d58 100644 |
--- a/ipc/ipc_channel_reader.cc |
+++ b/ipc/ipc_channel_reader.cc |
@@ -4,8 +4,12 @@ |
#include "ipc/ipc_channel_reader.h" |
+#include <algorithm> |
+ |
#include "ipc/ipc_listener.h" |
#include "ipc/ipc_logging.h" |
+#include "ipc/ipc_message.h" |
+#include "ipc/ipc_message_attachment_set.h" |
#include "ipc/ipc_message_macros.h" |
namespace IPC { |
@@ -16,26 +20,35 @@ ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { |
} |
ChannelReader::~ChannelReader() { |
+ if (!blocked_ids_.empty()) |
+ StopObservingAttachmentBroker(); |
} |
-bool ChannelReader::ProcessIncomingMessages() { |
+ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() { |
while (true) { |
int bytes_read = 0; |
ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, |
&bytes_read); |
if (read_state == READ_FAILED) |
- return false; |
+ return DISPATCH_ERROR; |
if (read_state == READ_PENDING) |
- return true; |
+ return DISPATCH_FINISHED; |
DCHECK(bytes_read > 0); |
- if (!DispatchInputData(input_buf_, bytes_read)) |
- return false; |
+ if (!TranslateInputData(input_buf_, bytes_read)) |
+ return DISPATCH_ERROR; |
+ |
+ DispatchState state = DispatchMessages(); |
+ if (state != DISPATCH_FINISHED) |
+ return state; |
} |
} |
-bool ChannelReader::AsyncReadComplete(int bytes_read) { |
- return DispatchInputData(input_buf_, bytes_read); |
+ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) { |
+ if (!TranslateInputData(input_buf_, bytes_read)) |
+ return DISPATCH_ERROR; |
+ |
+ return DispatchMessages(); |
} |
bool ChannelReader::IsInternalMessage(const Message& m) { |
@@ -49,8 +62,8 @@ bool ChannelReader::IsHelloMessage(const Message& m) { |
m.type() == Channel::HELLO_MESSAGE_TYPE; |
} |
-bool ChannelReader::DispatchInputData(const char* input_data, |
- int input_data_len) { |
+bool ChannelReader::TranslateInputData(const char* input_data, |
+ int input_data_len) { |
const char* p; |
const char* end; |
@@ -75,27 +88,35 @@ bool ChannelReader::DispatchInputData(const char* input_data, |
const char* message_tail = Message::FindNext(p, end); |
if (message_tail) { |
int len = static_cast<int>(message_tail - p); |
- Message m(p, len); |
- if (!WillDispatchInputMessage(&m)) |
+ |
+ Message translated_message(p, len); |
+ if (!GetNonBrokeredAttachments(&translated_message)) |
return false; |
-#ifdef IPC_MESSAGE_LOG_ENABLED |
- std::string name; |
- Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL); |
- TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", |
- "name", name); |
-#else |
- TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", |
- "class", IPC_MESSAGE_ID_CLASS(m.type()), |
- "line", IPC_MESSAGE_ID_LINE(m.type())); |
-#endif |
- m.TraceMessageEnd(); |
- if (IsInternalMessage(m)) |
- HandleInternalMessage(m); |
- else |
- listener_->OnMessageReceived(m); |
- if (m.dispatch_error()) |
- listener_->OnBadMessageReceived(m); |
+ // If there are no queued messages, attempt to immediately dispatch the |
+ // newly translated message. |
+ if (queued_messages_.empty()) { |
+ DCHECK(blocked_ids_.empty()); |
+ AttachmentIdSet blocked_ids = |
+ GetBrokeredAttachments(&translated_message); |
+ |
+ if (blocked_ids.empty()) { |
+ // Dispatch the message and continue the loop. |
+ DispatchMessage(&translated_message); |
+ p = message_tail; |
+ continue; |
+ } |
+ |
+ // While we could theoretically updated blocked_ids_ and call |
Tom Sepez
2015/07/27 15:21:47
nit: I didn't see the code to which this comment a
erikchen
2015/07/27 16:23:48
This comment explains the lack of additional code
erikchen
2015/07/27 17:15:38
I started rewriting this comment several times in
|
+ // StartObservingAttachmentBroker(), doing so adds unnecessary |
+ // complexity. This attempt to dispatch |translated_message| is an |
+ // optimization, and TranslateInputData() is guaranteed to be followed |
+ // by DispatchMessages() which will do the same thing. |
+ } |
+ |
+ // Make a deep copy of |translated_message| to add to the queue. |
+ scoped_ptr<Message> m(new Message(translated_message)); |
+ queued_messages_.push_back(m.release()); |
p = message_tail; |
} else { |
// Last message is partial. |
@@ -111,6 +132,98 @@ bool ChannelReader::DispatchInputData(const char* input_data, |
return true; |
} |
+ChannelReader::DispatchState ChannelReader::DispatchMessages() { |
+ while (!queued_messages_.empty()) { |
+ if (!blocked_ids_.empty()) |
+ return DISPATCH_WAITING_ON_BROKER; |
+ |
+ Message* m = queued_messages_.front(); |
+ |
+ AttachmentIdSet blocked_ids = GetBrokeredAttachments(m); |
+ if (!blocked_ids.empty()) { |
+ blocked_ids_.swap(blocked_ids); |
+ StartObservingAttachmentBroker(); |
+ return DISPATCH_WAITING_ON_BROKER; |
+ } |
+ |
+ DispatchMessage(m); |
+ queued_messages_.erase(queued_messages_.begin()); |
+ } |
+ return DISPATCH_FINISHED; |
+} |
+ |
+void ChannelReader::DispatchMessage(Message* m) { |
+#ifdef IPC_MESSAGE_LOG_ENABLED |
+ std::string name; |
+ Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL); |
+ TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name", |
+ name); |
+#else |
+ TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class", |
+ IPC_MESSAGE_ID_CLASS(m->type()), "line", |
+ IPC_MESSAGE_ID_LINE(m->type())); |
+#endif |
+ m->TraceMessageEnd(); |
+ if (IsInternalMessage(*m)) |
+ HandleInternalMessage(*m); |
+ else |
+ listener_->OnMessageReceived(*m); |
+ if (m->dispatch_error()) |
+ listener_->OnBadMessageReceived(*m); |
+} |
+ |
+ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments( |
+ Message* msg) { |
+ std::set<BrokerableAttachment::AttachmentId> blocked_ids; |
+ |
+#if USE_ATTACHMENT_BROKER |
+ MessageAttachmentSet* set = msg->attachment_set(); |
+ for (const scoped_refptr<BrokerableAttachment>& attachment : |
+ set->GetBrokerableAttachmentsForUpdating()) { |
+ if (attachment->NeedsBrokering()) { |
+ AttachmentBroker* broker = GetAttachmentBroker(); |
+ scoped_refptr<BrokerableAttachment> brokered_attachment; |
+ bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(), |
+ &brokered_attachment); |
+ if (!result) { |
+ blocked_ids.insert(attachment->GetIdentifier()); |
+ continue; |
+ } |
+ |
+ attachment->PopulateWithAttachment(brokered_attachment.get()); |
+ } |
+ } |
+#endif // USE_ATTACHMENT_BROKER |
+ |
+ return blocked_ids; |
+} |
+ |
+void ChannelReader::ReceivedBrokerableAttachmentWithId( |
+ const BrokerableAttachment::AttachmentId& id) { |
+ if (blocked_ids_.empty()) |
+ return; |
+ |
+ auto it = find(blocked_ids_.begin(), blocked_ids_.end(), id); |
+ if (it != blocked_ids_.end()) |
+ blocked_ids_.erase(it); |
+ |
+ if (blocked_ids_.empty()) { |
+ StopObservingAttachmentBroker(); |
+ DispatchMessages(); |
+ } |
+} |
+ |
+void ChannelReader::StartObservingAttachmentBroker() { |
+#if USE_ATTACHMENT_BROKER |
+ GetAttachmentBroker()->AddObserver(this); |
+#endif // USE_ATTACHMENT_BROKER |
+} |
+ |
+void ChannelReader::StopObservingAttachmentBroker() { |
+#if USE_ATTACHMENT_BROKER |
+ GetAttachmentBroker()->RemoveObserver(this); |
+#endif // USE_ATTACHMENT_BROKER |
+} |
} // namespace internal |
} // namespace IPC |