Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(192)

Side by Side Diff: ipc/ipc_channel_reader.cc

Issue 1206093002: Update ChannelReader to use AttachmentBroker. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@attachment_broker3_listener
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_channel_reader.h" 5 #include "ipc/ipc_channel_reader.h"
6 6
7 #include "ipc/ipc_listener.h" 7 #include "ipc/ipc_listener.h"
8 #include "ipc/ipc_logging.h" 8 #include "ipc/ipc_logging.h"
9 #include "ipc/ipc_message.h"
10 #include "ipc/ipc_message_attachment_set.h"
9 #include "ipc/ipc_message_macros.h" 11 #include "ipc/ipc_message_macros.h"
10 12
11 namespace IPC { 13 namespace IPC {
12 namespace internal { 14 namespace internal {
13 15
14 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { 16 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) {
15 memset(input_buf_, 0, sizeof(input_buf_)); 17 memset(input_buf_, 0, sizeof(input_buf_));
16 } 18 }
17 19
18 ChannelReader::~ChannelReader() { 20 ChannelReader::~ChannelReader() {
21 if (!blocked_ids_.empty())
22 StopObservingAttachmentBroker();
19 } 23 }
20 24
21 bool ChannelReader::ProcessIncomingMessages() { 25 ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() {
22 while (true) { 26 while (true) {
23 int bytes_read = 0; 27 int bytes_read = 0;
24 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, 28 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
25 &bytes_read); 29 &bytes_read);
26 if (read_state == READ_FAILED) 30 if (read_state == READ_FAILED)
27 return false; 31 return DISPATCH_ERROR;
28 if (read_state == READ_PENDING) 32 if (read_state == READ_PENDING)
29 return true; 33 return DISPATCH_FINISHED;
30 34
31 DCHECK(bytes_read > 0); 35 DCHECK(bytes_read > 0);
32 if (!DispatchInputData(input_buf_, bytes_read)) 36 if (!TranslateInputData(input_buf_, bytes_read))
33 return false; 37 return DISPATCH_ERROR;
38
39 DispatchState state = DispatchMessages();
40 if (state != DISPATCH_FINISHED)
41 return state;
34 } 42 }
35 } 43 }
36 44
37 bool ChannelReader::AsyncReadComplete(int bytes_read) { 45 ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) {
38 return DispatchInputData(input_buf_, bytes_read); 46 if (!TranslateInputData(input_buf_, bytes_read))
47 return DISPATCH_ERROR;
48
49 return DispatchMessages();
39 } 50 }
40 51
41 bool ChannelReader::IsInternalMessage(const Message& m) { 52 bool ChannelReader::IsInternalMessage(const Message& m) {
42 return m.routing_id() == MSG_ROUTING_NONE && 53 return m.routing_id() == MSG_ROUTING_NONE &&
43 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && 54 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE &&
44 m.type() <= Channel::HELLO_MESSAGE_TYPE; 55 m.type() <= Channel::HELLO_MESSAGE_TYPE;
45 } 56 }
46 57
47 bool ChannelReader::IsHelloMessage(const Message& m) { 58 bool ChannelReader::IsHelloMessage(const Message& m) {
48 return m.routing_id() == MSG_ROUTING_NONE && 59 return m.routing_id() == MSG_ROUTING_NONE &&
49 m.type() == Channel::HELLO_MESSAGE_TYPE; 60 m.type() == Channel::HELLO_MESSAGE_TYPE;
50 } 61 }
51 62
52 bool ChannelReader::DispatchInputData(const char* input_data, 63 bool ChannelReader::TranslateInputData(const char* input_data,
53 int input_data_len) { 64 int input_data_len) {
54 const char* p; 65 const char* p;
55 const char* end; 66 const char* end;
56 67
57 // Possibly combine with the overflow buffer to make a larger buffer. 68 // Possibly combine with the overflow buffer to make a larger buffer.
58 if (input_overflow_buf_.empty()) { 69 if (input_overflow_buf_.empty()) {
59 p = input_data; 70 p = input_data;
60 end = input_data + input_data_len; 71 end = input_data + input_data_len;
61 } else { 72 } else {
62 if (input_overflow_buf_.size() + input_data_len > 73 if (input_overflow_buf_.size() + input_data_len >
63 Channel::kMaximumMessageSize) { 74 Channel::kMaximumMessageSize) {
64 input_overflow_buf_.clear(); 75 input_overflow_buf_.clear();
65 LOG(ERROR) << "IPC message is too big"; 76 LOG(ERROR) << "IPC message is too big";
66 return false; 77 return false;
67 } 78 }
68 input_overflow_buf_.append(input_data, input_data_len); 79 input_overflow_buf_.append(input_data, input_data_len);
69 p = input_overflow_buf_.data(); 80 p = input_overflow_buf_.data();
70 end = p + input_overflow_buf_.size(); 81 end = p + input_overflow_buf_.size();
71 } 82 }
72 83
73 // Dispatch all complete messages in the data buffer. 84 // Dispatch all complete messages in the data buffer.
74 while (p < end) { 85 while (p < end) {
75 const char* message_tail = Message::FindNext(p, end); 86 const char* message_tail = Message::FindNext(p, end);
76 if (message_tail) { 87 if (message_tail) {
77 int len = static_cast<int>(message_tail - p); 88 int len = static_cast<int>(message_tail - p);
78 Message m(p, len); 89 Message* m = new Message(p, len);
79 if (!WillDispatchInputMessage(&m)) 90 if (!GetNonBrokeredAttachments(m))
Tom Sepez 2015/07/13 18:28:47 Do we leak |m| here if we return early?
erikchen 2015/07/17 18:44:01 Yup. That's what I deserve for not using scoped_pt
80 return false; 91 return false;
81 92
82 #ifdef IPC_MESSAGE_LOG_ENABLED 93 queued_messages_.push_back(m);
83 std::string name;
84 Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL);
85 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData",
86 "name", name);
87 #else
88 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData",
89 "class", IPC_MESSAGE_ID_CLASS(m.type()),
90 "line", IPC_MESSAGE_ID_LINE(m.type()));
91 #endif
92 m.TraceMessageEnd();
93 if (IsInternalMessage(m))
94 HandleInternalMessage(m);
95 else
96 listener_->OnMessageReceived(m);
97 if (m.dispatch_error())
98 listener_->OnBadMessageReceived(m);
99 p = message_tail; 94 p = message_tail;
100 } else { 95 } else {
101 // Last message is partial. 96 // Last message is partial.
102 break; 97 break;
103 } 98 }
104 } 99 }
105 100
106 // Save any partial data in the overflow buffer. 101 // Save any partial data in the overflow buffer.
107 input_overflow_buf_.assign(p, end - p); 102 input_overflow_buf_.assign(p, end - p);
108 103
109 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) 104 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
110 return false; 105 return false;
111 return true; 106 return true;
112 } 107 }
113 108
109 ChannelReader::DispatchState ChannelReader::DispatchMessages() {
110 while (!queued_messages_.empty()) {
111 if (!blocked_ids_.empty())
112 return DISPATCH_WAITING_ON_BROKER;
113
114 Message* m = queued_messages_.front();
115
116 AttachmentIdSet blocked_ids = GetBrokeredAttachments(m);
117 if (!blocked_ids.empty()) {
118 blocked_ids_.swap(blocked_ids);
119 StartObservingAttachmentBroker();
120 return DISPATCH_WAITING_ON_BROKER;
121 }
122
123 DispatchMessage(m);
124 queued_messages_.erase(queued_messages_.begin());
125 }
126 return DISPATCH_FINISHED;
127 }
128
129 void ChannelReader::DispatchMessage(Message* m) {
130 #ifdef IPC_MESSAGE_LOG_ENABLED
131 std::string name;
132 Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL);
133 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name",
134 name);
135 #else
136 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class",
137 IPC_MESSAGE_ID_CLASS(m->type()), "line",
138 IPC_MESSAGE_ID_LINE(m->type()));
139 #endif
140 m->TraceMessageEnd();
141 if (IsInternalMessage(*m))
142 HandleInternalMessage(*m);
143 else
144 listener_->OnMessageReceived(*m);
145 if (m->dispatch_error())
146 listener_->OnBadMessageReceived(*m);
147 }
148
149 ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments(
150 Message* msg) {
151 std::set<BrokerableAttachment::AttachmentId> blocked_ids;
152
153 MessageAttachmentSet* set = msg->attachment_set();
154 for (const scoped_refptr<BrokerableAttachment>& attachment :
155 set->GetBrokerableAttachmentsForUpdating()) {
156 if (attachment->NeedsBrokering()) {
157 AttachmentBroker* broker = GetAttachmentBroker();
158 scoped_refptr<BrokerableAttachment> brokered_attachment;
159 bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(),
160 &brokered_attachment);
161 if (!result) {
162 blocked_ids.insert(attachment->GetIdentifier());
163 continue;
164 }
165
166 attachment->PopulateWithAttachment(brokered_attachment.get());
167 }
168 }
169
170 return blocked_ids;
171 }
172
173 void ChannelReader::ReceivedBrokerableAttachmentWithId(
174 const BrokerableAttachment::AttachmentId& id) {
175 if (blocked_ids_.empty())
176 return;
177
178 for (AttachmentIdSet::iterator it = blocked_ids_.begin();
Tom Sepez 2015/07/13 18:28:47 nit: find()
erikchen 2015/07/17 18:44:01 Done.
179 it != blocked_ids_.end(); ++it) {
180 if (*it == id) {
181 blocked_ids_.erase(it);
182 break;
183 }
184 }
185
186 if (blocked_ids_.empty()) {
187 StopObservingAttachmentBroker();
188 DispatchMessages();
189 }
190 }
191
192 void ChannelReader::StartObservingAttachmentBroker() {
193 GetAttachmentBroker()->AddObserver(this);
194 }
195
196 void ChannelReader::StopObservingAttachmentBroker() {
197 GetAttachmentBroker()->RemoveObserver(this);
198 }
114 199
115 } // namespace internal 200 } // namespace internal
116 } // namespace IPC 201 } // namespace IPC
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698