Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(169)

Side by Side Diff: ipc/ipc_channel_reader.cc

Issue 1206093002: Update ChannelReader to use AttachmentBroker. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@attachment_broker3_listener
Patch Set: Add an optimization to immediately dispatch messages after translation. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_channel_reader.h ('k') | ipc/ipc_channel_reader_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_channel_reader.h" 5 #include "ipc/ipc_channel_reader.h"
6 6
7 #include <algorithm>
8
7 #include "ipc/ipc_listener.h" 9 #include "ipc/ipc_listener.h"
8 #include "ipc/ipc_logging.h" 10 #include "ipc/ipc_logging.h"
11 #include "ipc/ipc_message.h"
12 #include "ipc/ipc_message_attachment_set.h"
9 #include "ipc/ipc_message_macros.h" 13 #include "ipc/ipc_message_macros.h"
10 14
11 namespace IPC { 15 namespace IPC {
12 namespace internal { 16 namespace internal {
13 17
14 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { 18 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) {
15 memset(input_buf_, 0, sizeof(input_buf_)); 19 memset(input_buf_, 0, sizeof(input_buf_));
16 } 20 }
17 21
18 ChannelReader::~ChannelReader() { 22 ChannelReader::~ChannelReader() {
23 if (!blocked_ids_.empty())
24 StopObservingAttachmentBroker();
19 } 25 }
20 26
21 bool ChannelReader::ProcessIncomingMessages() { 27 ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() {
22 while (true) { 28 while (true) {
23 int bytes_read = 0; 29 int bytes_read = 0;
24 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, 30 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
25 &bytes_read); 31 &bytes_read);
26 if (read_state == READ_FAILED) 32 if (read_state == READ_FAILED)
27 return false; 33 return DISPATCH_ERROR;
28 if (read_state == READ_PENDING) 34 if (read_state == READ_PENDING)
29 return true; 35 return DISPATCH_FINISHED;
30 36
31 DCHECK(bytes_read > 0); 37 DCHECK(bytes_read > 0);
32 if (!DispatchInputData(input_buf_, bytes_read)) 38 if (!TranslateInputData(input_buf_, bytes_read))
33 return false; 39 return DISPATCH_ERROR;
40
41 DispatchState state = DispatchMessages();
42 if (state != DISPATCH_FINISHED)
43 return state;
34 } 44 }
35 } 45 }
36 46
37 bool ChannelReader::AsyncReadComplete(int bytes_read) { 47 ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) {
38 return DispatchInputData(input_buf_, bytes_read); 48 if (!TranslateInputData(input_buf_, bytes_read))
49 return DISPATCH_ERROR;
50
51 return DispatchMessages();
39 } 52 }
40 53
41 bool ChannelReader::IsInternalMessage(const Message& m) { 54 bool ChannelReader::IsInternalMessage(const Message& m) {
42 return m.routing_id() == MSG_ROUTING_NONE && 55 return m.routing_id() == MSG_ROUTING_NONE &&
43 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && 56 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE &&
44 m.type() <= Channel::HELLO_MESSAGE_TYPE; 57 m.type() <= Channel::HELLO_MESSAGE_TYPE;
45 } 58 }
46 59
47 bool ChannelReader::IsHelloMessage(const Message& m) { 60 bool ChannelReader::IsHelloMessage(const Message& m) {
48 return m.routing_id() == MSG_ROUTING_NONE && 61 return m.routing_id() == MSG_ROUTING_NONE &&
49 m.type() == Channel::HELLO_MESSAGE_TYPE; 62 m.type() == Channel::HELLO_MESSAGE_TYPE;
50 } 63 }
51 64
52 bool ChannelReader::DispatchInputData(const char* input_data, 65 bool ChannelReader::TranslateInputData(const char* input_data,
53 int input_data_len) { 66 int input_data_len) {
54 const char* p; 67 const char* p;
55 const char* end; 68 const char* end;
56 69
57 // Possibly combine with the overflow buffer to make a larger buffer. 70 // Possibly combine with the overflow buffer to make a larger buffer.
58 if (input_overflow_buf_.empty()) { 71 if (input_overflow_buf_.empty()) {
59 p = input_data; 72 p = input_data;
60 end = input_data + input_data_len; 73 end = input_data + input_data_len;
61 } else { 74 } else {
62 if (input_overflow_buf_.size() + input_data_len > 75 if (input_overflow_buf_.size() + input_data_len >
63 Channel::kMaximumMessageSize) { 76 Channel::kMaximumMessageSize) {
64 input_overflow_buf_.clear(); 77 input_overflow_buf_.clear();
65 LOG(ERROR) << "IPC message is too big"; 78 LOG(ERROR) << "IPC message is too big";
66 return false; 79 return false;
67 } 80 }
68 input_overflow_buf_.append(input_data, input_data_len); 81 input_overflow_buf_.append(input_data, input_data_len);
69 p = input_overflow_buf_.data(); 82 p = input_overflow_buf_.data();
70 end = p + input_overflow_buf_.size(); 83 end = p + input_overflow_buf_.size();
71 } 84 }
72 85
73 // Dispatch all complete messages in the data buffer. 86 // Dispatch all complete messages in the data buffer.
74 while (p < end) { 87 while (p < end) {
75 const char* message_tail = Message::FindNext(p, end); 88 const char* message_tail = Message::FindNext(p, end);
76 if (message_tail) { 89 if (message_tail) {
77 int len = static_cast<int>(message_tail - p); 90 int len = static_cast<int>(message_tail - p);
78 Message m(p, len); 91
79 if (!WillDispatchInputMessage(&m)) 92 Message translated_message(p, len);
93 if (!GetNonBrokeredAttachments(&translated_message))
80 return false; 94 return false;
81 95
82 #ifdef IPC_MESSAGE_LOG_ENABLED 96 // If there are no queued messages, attempt to immediately dispatch the
83 std::string name; 97 // newly translated message.
84 Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL); 98 if (queued_messages_.empty()) {
85 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", 99 DCHECK(blocked_ids_.empty());
86 "name", name); 100 AttachmentIdSet blocked_ids =
87 #else 101 GetBrokeredAttachments(&translated_message);
88 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", 102
89 "class", IPC_MESSAGE_ID_CLASS(m.type()), 103 if (blocked_ids.empty()) {
90 "line", IPC_MESSAGE_ID_LINE(m.type())); 104 // Dispatch the message and continue the loop.
91 #endif 105 DispatchMessage(&translated_message);
92 m.TraceMessageEnd(); 106 p = message_tail;
93 if (IsInternalMessage(m)) 107 continue;
94 HandleInternalMessage(m); 108 }
95 else 109
96 listener_->OnMessageReceived(m); 110 // While we could theoretically updated blocked_ids_ and call
Tom Sepez 2015/07/27 15:21:47 nit: I didn't see the code to which this comment a
erikchen 2015/07/27 16:23:48 This comment explains the lack of additional code
erikchen 2015/07/27 17:15:38 I started rewriting this comment several times in
97 if (m.dispatch_error()) 111 // StartObservingAttachmentBroker(), doing so adds unnecessary
98 listener_->OnBadMessageReceived(m); 112 // complexity. This attempt to dispatch |translated_message| is an
113 // optimization, and TranslateInputData() is guaranteed to be followed
114 // by DispatchMessages() which will do the same thing.
115 }
116
117 // Make a deep copy of |translated_message| to add to the queue.
118 scoped_ptr<Message> m(new Message(translated_message));
119 queued_messages_.push_back(m.release());
99 p = message_tail; 120 p = message_tail;
100 } else { 121 } else {
101 // Last message is partial. 122 // Last message is partial.
102 break; 123 break;
103 } 124 }
104 } 125 }
105 126
106 // Save any partial data in the overflow buffer. 127 // Save any partial data in the overflow buffer.
107 input_overflow_buf_.assign(p, end - p); 128 input_overflow_buf_.assign(p, end - p);
108 129
109 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) 130 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
110 return false; 131 return false;
111 return true; 132 return true;
112 } 133 }
113 134
135 ChannelReader::DispatchState ChannelReader::DispatchMessages() {
136 while (!queued_messages_.empty()) {
137 if (!blocked_ids_.empty())
138 return DISPATCH_WAITING_ON_BROKER;
139
140 Message* m = queued_messages_.front();
141
142 AttachmentIdSet blocked_ids = GetBrokeredAttachments(m);
143 if (!blocked_ids.empty()) {
144 blocked_ids_.swap(blocked_ids);
145 StartObservingAttachmentBroker();
146 return DISPATCH_WAITING_ON_BROKER;
147 }
148
149 DispatchMessage(m);
150 queued_messages_.erase(queued_messages_.begin());
151 }
152 return DISPATCH_FINISHED;
153 }
154
155 void ChannelReader::DispatchMessage(Message* m) {
156 #ifdef IPC_MESSAGE_LOG_ENABLED
157 std::string name;
158 Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL);
159 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name",
160 name);
161 #else
162 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class",
163 IPC_MESSAGE_ID_CLASS(m->type()), "line",
164 IPC_MESSAGE_ID_LINE(m->type()));
165 #endif
166 m->TraceMessageEnd();
167 if (IsInternalMessage(*m))
168 HandleInternalMessage(*m);
169 else
170 listener_->OnMessageReceived(*m);
171 if (m->dispatch_error())
172 listener_->OnBadMessageReceived(*m);
173 }
174
175 ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments(
176 Message* msg) {
177 std::set<BrokerableAttachment::AttachmentId> blocked_ids;
178
179 #if USE_ATTACHMENT_BROKER
180 MessageAttachmentSet* set = msg->attachment_set();
181 for (const scoped_refptr<BrokerableAttachment>& attachment :
182 set->GetBrokerableAttachmentsForUpdating()) {
183 if (attachment->NeedsBrokering()) {
184 AttachmentBroker* broker = GetAttachmentBroker();
185 scoped_refptr<BrokerableAttachment> brokered_attachment;
186 bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(),
187 &brokered_attachment);
188 if (!result) {
189 blocked_ids.insert(attachment->GetIdentifier());
190 continue;
191 }
192
193 attachment->PopulateWithAttachment(brokered_attachment.get());
194 }
195 }
196 #endif // USE_ATTACHMENT_BROKER
197
198 return blocked_ids;
199 }
200
201 void ChannelReader::ReceivedBrokerableAttachmentWithId(
202 const BrokerableAttachment::AttachmentId& id) {
203 if (blocked_ids_.empty())
204 return;
205
206 auto it = find(blocked_ids_.begin(), blocked_ids_.end(), id);
207 if (it != blocked_ids_.end())
208 blocked_ids_.erase(it);
209
210 if (blocked_ids_.empty()) {
211 StopObservingAttachmentBroker();
212 DispatchMessages();
213 }
214 }
215
216 void ChannelReader::StartObservingAttachmentBroker() {
217 #if USE_ATTACHMENT_BROKER
218 GetAttachmentBroker()->AddObserver(this);
219 #endif // USE_ATTACHMENT_BROKER
220 }
221
222 void ChannelReader::StopObservingAttachmentBroker() {
223 #if USE_ATTACHMENT_BROKER
224 GetAttachmentBroker()->RemoveObserver(this);
225 #endif // USE_ATTACHMENT_BROKER
226 }
114 227
115 } // namespace internal 228 } // namespace internal
116 } // namespace IPC 229 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_channel_reader.h ('k') | ipc/ipc_channel_reader_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698