Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(302)

Unified Diff: ipc/ipc_channel_reader.cc

Issue 1206093002: Update ChannelReader to use AttachmentBroker. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@attachment_broker3_listener
Patch Set: Add an optimization to immediately dispatch messages after translation. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « ipc/ipc_channel_reader.h ('k') | ipc/ipc_channel_reader_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ipc/ipc_channel_reader.cc
diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc
index f41a0d7737e4656cd279db0de8ef83c2b9e9fbaa..634f15a76f0e057ad910a3c11575c67cf60f1d58 100644
--- a/ipc/ipc_channel_reader.cc
+++ b/ipc/ipc_channel_reader.cc
@@ -4,8 +4,12 @@
#include "ipc/ipc_channel_reader.h"
+#include <algorithm>
+
#include "ipc/ipc_listener.h"
#include "ipc/ipc_logging.h"
+#include "ipc/ipc_message.h"
+#include "ipc/ipc_message_attachment_set.h"
#include "ipc/ipc_message_macros.h"
namespace IPC {
@@ -16,26 +20,35 @@ ChannelReader::ChannelReader(Listener* listener) : listener_(listener) {
}
ChannelReader::~ChannelReader() {
+ if (!blocked_ids_.empty())
+ StopObservingAttachmentBroker();
}
-bool ChannelReader::ProcessIncomingMessages() {
+ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() {
while (true) {
int bytes_read = 0;
ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
&bytes_read);
if (read_state == READ_FAILED)
- return false;
+ return DISPATCH_ERROR;
if (read_state == READ_PENDING)
- return true;
+ return DISPATCH_FINISHED;
DCHECK(bytes_read > 0);
- if (!DispatchInputData(input_buf_, bytes_read))
- return false;
+ if (!TranslateInputData(input_buf_, bytes_read))
+ return DISPATCH_ERROR;
+
+ DispatchState state = DispatchMessages();
+ if (state != DISPATCH_FINISHED)
+ return state;
}
}
-bool ChannelReader::AsyncReadComplete(int bytes_read) {
- return DispatchInputData(input_buf_, bytes_read);
+ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) {
+ if (!TranslateInputData(input_buf_, bytes_read))
+ return DISPATCH_ERROR;
+
+ return DispatchMessages();
}
bool ChannelReader::IsInternalMessage(const Message& m) {
@@ -49,8 +62,8 @@ bool ChannelReader::IsHelloMessage(const Message& m) {
m.type() == Channel::HELLO_MESSAGE_TYPE;
}
-bool ChannelReader::DispatchInputData(const char* input_data,
- int input_data_len) {
+bool ChannelReader::TranslateInputData(const char* input_data,
+ int input_data_len) {
const char* p;
const char* end;
@@ -75,27 +88,35 @@ bool ChannelReader::DispatchInputData(const char* input_data,
const char* message_tail = Message::FindNext(p, end);
if (message_tail) {
int len = static_cast<int>(message_tail - p);
- Message m(p, len);
- if (!WillDispatchInputMessage(&m))
+
+ Message translated_message(p, len);
+ if (!GetNonBrokeredAttachments(&translated_message))
return false;
-#ifdef IPC_MESSAGE_LOG_ENABLED
- std::string name;
- Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL);
- TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData",
- "name", name);
-#else
- TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData",
- "class", IPC_MESSAGE_ID_CLASS(m.type()),
- "line", IPC_MESSAGE_ID_LINE(m.type()));
-#endif
- m.TraceMessageEnd();
- if (IsInternalMessage(m))
- HandleInternalMessage(m);
- else
- listener_->OnMessageReceived(m);
- if (m.dispatch_error())
- listener_->OnBadMessageReceived(m);
+ // If there are no queued messages, attempt to immediately dispatch the
+ // newly translated message.
+ if (queued_messages_.empty()) {
+ DCHECK(blocked_ids_.empty());
+ AttachmentIdSet blocked_ids =
+ GetBrokeredAttachments(&translated_message);
+
+ if (blocked_ids.empty()) {
+ // Dispatch the message and continue the loop.
+ DispatchMessage(&translated_message);
+ p = message_tail;
+ continue;
+ }
+
+ // While we could theoretically updated blocked_ids_ and call
Tom Sepez 2015/07/27 15:21:47 nit: I didn't see the code to which this comment a
erikchen 2015/07/27 16:23:48 This comment explains the lack of additional code
erikchen 2015/07/27 17:15:38 I started rewriting this comment several times in
+ // StartObservingAttachmentBroker(), doing so adds unnecessary
+ // complexity. This attempt to dispatch |translated_message| is an
+ // optimization, and TranslateInputData() is guaranteed to be followed
+ // by DispatchMessages() which will do the same thing.
+ }
+
+ // Make a deep copy of |translated_message| to add to the queue.
+ scoped_ptr<Message> m(new Message(translated_message));
+ queued_messages_.push_back(m.release());
p = message_tail;
} else {
// Last message is partial.
@@ -111,6 +132,98 @@ bool ChannelReader::DispatchInputData(const char* input_data,
return true;
}
+ChannelReader::DispatchState ChannelReader::DispatchMessages() {
+ while (!queued_messages_.empty()) {
+ if (!blocked_ids_.empty())
+ return DISPATCH_WAITING_ON_BROKER;
+
+ Message* m = queued_messages_.front();
+
+ AttachmentIdSet blocked_ids = GetBrokeredAttachments(m);
+ if (!blocked_ids.empty()) {
+ blocked_ids_.swap(blocked_ids);
+ StartObservingAttachmentBroker();
+ return DISPATCH_WAITING_ON_BROKER;
+ }
+
+ DispatchMessage(m);
+ queued_messages_.erase(queued_messages_.begin());
+ }
+ return DISPATCH_FINISHED;
+}
+
+void ChannelReader::DispatchMessage(Message* m) {
+#ifdef IPC_MESSAGE_LOG_ENABLED
+ std::string name;
+ Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL);
+ TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name",
+ name);
+#else
+ TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class",
+ IPC_MESSAGE_ID_CLASS(m->type()), "line",
+ IPC_MESSAGE_ID_LINE(m->type()));
+#endif
+ m->TraceMessageEnd();
+ if (IsInternalMessage(*m))
+ HandleInternalMessage(*m);
+ else
+ listener_->OnMessageReceived(*m);
+ if (m->dispatch_error())
+ listener_->OnBadMessageReceived(*m);
+}
+
+ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments(
+ Message* msg) {
+ std::set<BrokerableAttachment::AttachmentId> blocked_ids;
+
+#if USE_ATTACHMENT_BROKER
+ MessageAttachmentSet* set = msg->attachment_set();
+ for (const scoped_refptr<BrokerableAttachment>& attachment :
+ set->GetBrokerableAttachmentsForUpdating()) {
+ if (attachment->NeedsBrokering()) {
+ AttachmentBroker* broker = GetAttachmentBroker();
+ scoped_refptr<BrokerableAttachment> brokered_attachment;
+ bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(),
+ &brokered_attachment);
+ if (!result) {
+ blocked_ids.insert(attachment->GetIdentifier());
+ continue;
+ }
+
+ attachment->PopulateWithAttachment(brokered_attachment.get());
+ }
+ }
+#endif // USE_ATTACHMENT_BROKER
+
+ return blocked_ids;
+}
+
+void ChannelReader::ReceivedBrokerableAttachmentWithId(
+ const BrokerableAttachment::AttachmentId& id) {
+ if (blocked_ids_.empty())
+ return;
+
+ auto it = find(blocked_ids_.begin(), blocked_ids_.end(), id);
+ if (it != blocked_ids_.end())
+ blocked_ids_.erase(it);
+
+ if (blocked_ids_.empty()) {
+ StopObservingAttachmentBroker();
+ DispatchMessages();
+ }
+}
+
+void ChannelReader::StartObservingAttachmentBroker() {
+#if USE_ATTACHMENT_BROKER
+ GetAttachmentBroker()->AddObserver(this);
+#endif // USE_ATTACHMENT_BROKER
+}
+
+void ChannelReader::StopObservingAttachmentBroker() {
+#if USE_ATTACHMENT_BROKER
+ GetAttachmentBroker()->RemoveObserver(this);
+#endif // USE_ATTACHMENT_BROKER
+}
} // namespace internal
} // namespace IPC
« no previous file with comments | « ipc/ipc_channel_reader.h ('k') | ipc/ipc_channel_reader_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698