Index: ipc/ipc_channel_reader.cc |
diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc |
index f41a0d7737e4656cd279db0de8ef83c2b9e9fbaa..21217f36fdfedc44dd7ca2cf78148cd2ec28c356 100644 |
--- a/ipc/ipc_channel_reader.cc |
+++ b/ipc/ipc_channel_reader.cc |
@@ -6,6 +6,8 @@ |
#include "ipc/ipc_listener.h" |
#include "ipc/ipc_logging.h" |
+#include "ipc/ipc_message.h" |
+#include "ipc/ipc_message_attachment_set.h" |
#include "ipc/ipc_message_macros.h" |
namespace IPC { |
@@ -16,26 +18,35 @@ ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { |
} |
ChannelReader::~ChannelReader() { |
+ if (!blocked_ids_.empty()) |
+ StopObservingAttachmentBroker(); |
} |
-bool ChannelReader::ProcessIncomingMessages() { |
+ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() { |
while (true) { |
int bytes_read = 0; |
ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, |
&bytes_read); |
if (read_state == READ_FAILED) |
- return false; |
+ return DISPATCH_ERROR; |
if (read_state == READ_PENDING) |
- return true; |
+ return DISPATCH_FINISHED; |
DCHECK(bytes_read > 0); |
- if (!DispatchInputData(input_buf_, bytes_read)) |
- return false; |
+ if (!TranslateInputData(input_buf_, bytes_read)) |
+ return DISPATCH_ERROR; |
+ |
+ DispatchState state = DispatchMessages(); |
+ if (state != DISPATCH_FINISHED) |
+ return state; |
} |
} |
-bool ChannelReader::AsyncReadComplete(int bytes_read) { |
- return DispatchInputData(input_buf_, bytes_read); |
+ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) { |
+ if (!TranslateInputData(input_buf_, bytes_read)) |
+ return DISPATCH_ERROR; |
+ |
+ return DispatchMessages(); |
} |
bool ChannelReader::IsInternalMessage(const Message& m) { |
@@ -49,8 +60,8 @@ bool ChannelReader::IsHelloMessage(const Message& m) { |
m.type() == Channel::HELLO_MESSAGE_TYPE; |
} |
-bool ChannelReader::DispatchInputData(const char* input_data, |
- int input_data_len) { |
+bool ChannelReader::TranslateInputData(const char* input_data, |
+ int input_data_len) { |
const char* p; |
const char* end; |
@@ -75,27 +86,11 @@ bool ChannelReader::DispatchInputData(const char* input_data, |
const char* message_tail = Message::FindNext(p, end); |
if (message_tail) { |
int len = static_cast<int>(message_tail - p); |
- Message m(p, len); |
- if (!WillDispatchInputMessage(&m)) |
+ Message* m = new Message(p, len); |
+ if (!GetNonBrokeredAttachments(m)) |
Tom Sepez
2015/07/13 18:28:47
Do we leak |m| here if we return early?
erikchen
2015/07/17 18:44:01
Yup. That's what I deserve for not using scoped_pt
|
return false; |
-#ifdef IPC_MESSAGE_LOG_ENABLED |
- std::string name; |
- Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL); |
- TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", |
- "name", name); |
-#else |
- TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", |
- "class", IPC_MESSAGE_ID_CLASS(m.type()), |
- "line", IPC_MESSAGE_ID_LINE(m.type())); |
-#endif |
- m.TraceMessageEnd(); |
- if (IsInternalMessage(m)) |
- HandleInternalMessage(m); |
- else |
- listener_->OnMessageReceived(m); |
- if (m.dispatch_error()) |
- listener_->OnBadMessageReceived(m); |
+ queued_messages_.push_back(m); |
p = message_tail; |
} else { |
// Last message is partial. |
@@ -111,6 +106,96 @@ bool ChannelReader::DispatchInputData(const char* input_data, |
return true; |
} |
+ChannelReader::DispatchState ChannelReader::DispatchMessages() { |
+ while (!queued_messages_.empty()) { |
+ if (!blocked_ids_.empty()) |
+ return DISPATCH_WAITING_ON_BROKER; |
+ |
+ Message* m = queued_messages_.front(); |
+ |
+ AttachmentIdSet blocked_ids = GetBrokeredAttachments(m); |
+ if (!blocked_ids.empty()) { |
+ blocked_ids_.swap(blocked_ids); |
+ StartObservingAttachmentBroker(); |
+ return DISPATCH_WAITING_ON_BROKER; |
+ } |
+ |
+ DispatchMessage(m); |
+ queued_messages_.erase(queued_messages_.begin()); |
+ } |
+ return DISPATCH_FINISHED; |
+} |
+ |
+void ChannelReader::DispatchMessage(Message* m) { |
+#ifdef IPC_MESSAGE_LOG_ENABLED |
+ std::string name; |
+ Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL); |
+ TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name", |
+ name); |
+#else |
+ TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class", |
+ IPC_MESSAGE_ID_CLASS(m->type()), "line", |
+ IPC_MESSAGE_ID_LINE(m->type())); |
+#endif |
+ m->TraceMessageEnd(); |
+ if (IsInternalMessage(*m)) |
+ HandleInternalMessage(*m); |
+ else |
+ listener_->OnMessageReceived(*m); |
+ if (m->dispatch_error()) |
+ listener_->OnBadMessageReceived(*m); |
+} |
+ |
+ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments( |
+ Message* msg) { |
+ std::set<BrokerableAttachment::AttachmentId> blocked_ids; |
+ |
+ MessageAttachmentSet* set = msg->attachment_set(); |
+ for (const scoped_refptr<BrokerableAttachment>& attachment : |
+ set->GetBrokerableAttachmentsForUpdating()) { |
+ if (attachment->NeedsBrokering()) { |
+ AttachmentBroker* broker = GetAttachmentBroker(); |
+ scoped_refptr<BrokerableAttachment> brokered_attachment; |
+ bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(), |
+ &brokered_attachment); |
+ if (!result) { |
+ blocked_ids.insert(attachment->GetIdentifier()); |
+ continue; |
+ } |
+ |
+ attachment->PopulateWithAttachment(brokered_attachment.get()); |
+ } |
+ } |
+ |
+ return blocked_ids; |
+} |
+ |
+void ChannelReader::ReceivedBrokerableAttachmentWithId( |
+ const BrokerableAttachment::AttachmentId& id) { |
+ if (blocked_ids_.empty()) |
+ return; |
+ |
+ for (AttachmentIdSet::iterator it = blocked_ids_.begin(); |
Tom Sepez
2015/07/13 18:28:47
nit: find()
erikchen
2015/07/17 18:44:01
Done.
|
+ it != blocked_ids_.end(); ++it) { |
+ if (*it == id) { |
+ blocked_ids_.erase(it); |
+ break; |
+ } |
+ } |
+ |
+ if (blocked_ids_.empty()) { |
+ StopObservingAttachmentBroker(); |
+ DispatchMessages(); |
+ } |
+} |
+ |
+void ChannelReader::StartObservingAttachmentBroker() { |
+ GetAttachmentBroker()->AddObserver(this); |
+} |
+ |
+void ChannelReader::StopObservingAttachmentBroker() { |
+ GetAttachmentBroker()->RemoveObserver(this); |
+} |
} // namespace internal |
} // namespace IPC |