OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_channel_reader.h" | 5 #include "ipc/ipc_channel_reader.h" |
6 | 6 |
7 #include <algorithm> | |
8 | |
7 #include "ipc/ipc_listener.h" | 9 #include "ipc/ipc_listener.h" |
8 #include "ipc/ipc_logging.h" | 10 #include "ipc/ipc_logging.h" |
11 #include "ipc/ipc_message.h" | |
12 #include "ipc/ipc_message_attachment_set.h" | |
9 #include "ipc/ipc_message_macros.h" | 13 #include "ipc/ipc_message_macros.h" |
10 | 14 |
11 namespace IPC { | 15 namespace IPC { |
12 namespace internal { | 16 namespace internal { |
13 | 17 |
14 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { | 18 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { |
15 memset(input_buf_, 0, sizeof(input_buf_)); | 19 memset(input_buf_, 0, sizeof(input_buf_)); |
16 } | 20 } |
17 | 21 |
18 ChannelReader::~ChannelReader() { | 22 ChannelReader::~ChannelReader() { |
23 if (!blocked_ids_.empty()) | |
24 StopObservingAttachmentBroker(); | |
19 } | 25 } |
20 | 26 |
21 bool ChannelReader::ProcessIncomingMessages() { | 27 ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() { |
22 while (true) { | 28 while (true) { |
23 int bytes_read = 0; | 29 int bytes_read = 0; |
24 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, | 30 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, |
25 &bytes_read); | 31 &bytes_read); |
26 if (read_state == READ_FAILED) | 32 if (read_state == READ_FAILED) |
27 return false; | 33 return DISPATCH_ERROR; |
28 if (read_state == READ_PENDING) | 34 if (read_state == READ_PENDING) |
29 return true; | 35 return DISPATCH_FINISHED; |
30 | 36 |
31 DCHECK(bytes_read > 0); | 37 DCHECK(bytes_read > 0); |
32 if (!DispatchInputData(input_buf_, bytes_read)) | 38 if (!TranslateInputData(input_buf_, bytes_read)) |
33 return false; | 39 return DISPATCH_ERROR; |
40 | |
41 DispatchState state = DispatchMessages(); | |
42 if (state != DISPATCH_FINISHED) | |
43 return state; | |
34 } | 44 } |
35 } | 45 } |
36 | 46 |
37 bool ChannelReader::AsyncReadComplete(int bytes_read) { | 47 ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) { |
38 return DispatchInputData(input_buf_, bytes_read); | 48 if (!TranslateInputData(input_buf_, bytes_read)) |
49 return DISPATCH_ERROR; | |
50 | |
51 return DispatchMessages(); | |
39 } | 52 } |
40 | 53 |
41 bool ChannelReader::IsInternalMessage(const Message& m) { | 54 bool ChannelReader::IsInternalMessage(const Message& m) { |
42 return m.routing_id() == MSG_ROUTING_NONE && | 55 return m.routing_id() == MSG_ROUTING_NONE && |
43 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && | 56 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && |
44 m.type() <= Channel::HELLO_MESSAGE_TYPE; | 57 m.type() <= Channel::HELLO_MESSAGE_TYPE; |
45 } | 58 } |
46 | 59 |
47 bool ChannelReader::IsHelloMessage(const Message& m) { | 60 bool ChannelReader::IsHelloMessage(const Message& m) { |
48 return m.routing_id() == MSG_ROUTING_NONE && | 61 return m.routing_id() == MSG_ROUTING_NONE && |
49 m.type() == Channel::HELLO_MESSAGE_TYPE; | 62 m.type() == Channel::HELLO_MESSAGE_TYPE; |
50 } | 63 } |
51 | 64 |
52 bool ChannelReader::DispatchInputData(const char* input_data, | 65 bool ChannelReader::TranslateInputData(const char* input_data, |
53 int input_data_len) { | 66 int input_data_len) { |
54 const char* p; | 67 const char* p; |
55 const char* end; | 68 const char* end; |
56 | 69 |
57 // Possibly combine with the overflow buffer to make a larger buffer. | 70 // Possibly combine with the overflow buffer to make a larger buffer. |
58 if (input_overflow_buf_.empty()) { | 71 if (input_overflow_buf_.empty()) { |
59 p = input_data; | 72 p = input_data; |
60 end = input_data + input_data_len; | 73 end = input_data + input_data_len; |
61 } else { | 74 } else { |
62 if (input_overflow_buf_.size() + input_data_len > | 75 if (input_overflow_buf_.size() + input_data_len > |
63 Channel::kMaximumMessageSize) { | 76 Channel::kMaximumMessageSize) { |
64 input_overflow_buf_.clear(); | 77 input_overflow_buf_.clear(); |
65 LOG(ERROR) << "IPC message is too big"; | 78 LOG(ERROR) << "IPC message is too big"; |
66 return false; | 79 return false; |
67 } | 80 } |
68 input_overflow_buf_.append(input_data, input_data_len); | 81 input_overflow_buf_.append(input_data, input_data_len); |
69 p = input_overflow_buf_.data(); | 82 p = input_overflow_buf_.data(); |
70 end = p + input_overflow_buf_.size(); | 83 end = p + input_overflow_buf_.size(); |
71 } | 84 } |
72 | 85 |
73 // Dispatch all complete messages in the data buffer. | 86 // Dispatch all complete messages in the data buffer. |
74 while (p < end) { | 87 while (p < end) { |
75 const char* message_tail = Message::FindNext(p, end); | 88 const char* message_tail = Message::FindNext(p, end); |
76 if (message_tail) { | 89 if (message_tail) { |
77 int len = static_cast<int>(message_tail - p); | 90 int len = static_cast<int>(message_tail - p); |
78 Message m(p, len); | 91 |
79 if (!WillDispatchInputMessage(&m)) | 92 Message translated_message(p, len); |
93 if (!GetNonBrokeredAttachments(&translated_message)) | |
80 return false; | 94 return false; |
81 | 95 |
82 #ifdef IPC_MESSAGE_LOG_ENABLED | 96 // If there are no queued messages, attempt to immediately dispatch the |
83 std::string name; | 97 // newly translated message. |
84 Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL); | 98 if (queued_messages_.empty()) { |
85 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", | 99 DCHECK(blocked_ids_.empty()); |
86 "name", name); | 100 AttachmentIdSet blocked_ids = |
87 #else | 101 GetBrokeredAttachments(&translated_message); |
88 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", | 102 |
89 "class", IPC_MESSAGE_ID_CLASS(m.type()), | 103 if (blocked_ids.empty()) { |
90 "line", IPC_MESSAGE_ID_LINE(m.type())); | 104 // Dispatch the message and continue the loop. |
91 #endif | 105 DispatchMessage(&translated_message); |
92 m.TraceMessageEnd(); | 106 p = message_tail; |
93 if (IsInternalMessage(m)) | 107 continue; |
94 HandleInternalMessage(m); | 108 } |
95 else | 109 |
96 listener_->OnMessageReceived(m); | 110 // While we could theoretically updated blocked_ids_ and call |
Tom Sepez
2015/07/27 15:21:47
nit: I didn't see the code to which this comment a
erikchen
2015/07/27 16:23:48
This comment explains the lack of additional code
erikchen
2015/07/27 17:15:38
I started rewriting this comment several times in
| |
97 if (m.dispatch_error()) | 111 // StartObservingAttachmentBroker(), doing so adds unnecessary |
98 listener_->OnBadMessageReceived(m); | 112 // complexity. This attempt to dispatch |translated_message| is an |
113 // optimization, and TranslateInputData() is guaranteed to be followed | |
114 // by DispatchMessages() which will do the same thing. | |
115 } | |
116 | |
117 // Make a deep copy of |translated_message| to add to the queue. | |
118 scoped_ptr<Message> m(new Message(translated_message)); | |
119 queued_messages_.push_back(m.release()); | |
99 p = message_tail; | 120 p = message_tail; |
100 } else { | 121 } else { |
101 // Last message is partial. | 122 // Last message is partial. |
102 break; | 123 break; |
103 } | 124 } |
104 } | 125 } |
105 | 126 |
106 // Save any partial data in the overflow buffer. | 127 // Save any partial data in the overflow buffer. |
107 input_overflow_buf_.assign(p, end - p); | 128 input_overflow_buf_.assign(p, end - p); |
108 | 129 |
109 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) | 130 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) |
110 return false; | 131 return false; |
111 return true; | 132 return true; |
112 } | 133 } |
113 | 134 |
135 ChannelReader::DispatchState ChannelReader::DispatchMessages() { | |
136 while (!queued_messages_.empty()) { | |
137 if (!blocked_ids_.empty()) | |
138 return DISPATCH_WAITING_ON_BROKER; | |
139 | |
140 Message* m = queued_messages_.front(); | |
141 | |
142 AttachmentIdSet blocked_ids = GetBrokeredAttachments(m); | |
143 if (!blocked_ids.empty()) { | |
144 blocked_ids_.swap(blocked_ids); | |
145 StartObservingAttachmentBroker(); | |
146 return DISPATCH_WAITING_ON_BROKER; | |
147 } | |
148 | |
149 DispatchMessage(m); | |
150 queued_messages_.erase(queued_messages_.begin()); | |
151 } | |
152 return DISPATCH_FINISHED; | |
153 } | |
154 | |
155 void ChannelReader::DispatchMessage(Message* m) { | |
156 #ifdef IPC_MESSAGE_LOG_ENABLED | |
157 std::string name; | |
158 Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL); | |
159 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name", | |
160 name); | |
161 #else | |
162 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class", | |
163 IPC_MESSAGE_ID_CLASS(m->type()), "line", | |
164 IPC_MESSAGE_ID_LINE(m->type())); | |
165 #endif | |
166 m->TraceMessageEnd(); | |
167 if (IsInternalMessage(*m)) | |
168 HandleInternalMessage(*m); | |
169 else | |
170 listener_->OnMessageReceived(*m); | |
171 if (m->dispatch_error()) | |
172 listener_->OnBadMessageReceived(*m); | |
173 } | |
174 | |
175 ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments( | |
176 Message* msg) { | |
177 std::set<BrokerableAttachment::AttachmentId> blocked_ids; | |
178 | |
179 #if USE_ATTACHMENT_BROKER | |
180 MessageAttachmentSet* set = msg->attachment_set(); | |
181 for (const scoped_refptr<BrokerableAttachment>& attachment : | |
182 set->GetBrokerableAttachmentsForUpdating()) { | |
183 if (attachment->NeedsBrokering()) { | |
184 AttachmentBroker* broker = GetAttachmentBroker(); | |
185 scoped_refptr<BrokerableAttachment> brokered_attachment; | |
186 bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(), | |
187 &brokered_attachment); | |
188 if (!result) { | |
189 blocked_ids.insert(attachment->GetIdentifier()); | |
190 continue; | |
191 } | |
192 | |
193 attachment->PopulateWithAttachment(brokered_attachment.get()); | |
194 } | |
195 } | |
196 #endif // USE_ATTACHMENT_BROKER | |
197 | |
198 return blocked_ids; | |
199 } | |
200 | |
201 void ChannelReader::ReceivedBrokerableAttachmentWithId( | |
202 const BrokerableAttachment::AttachmentId& id) { | |
203 if (blocked_ids_.empty()) | |
204 return; | |
205 | |
206 auto it = find(blocked_ids_.begin(), blocked_ids_.end(), id); | |
207 if (it != blocked_ids_.end()) | |
208 blocked_ids_.erase(it); | |
209 | |
210 if (blocked_ids_.empty()) { | |
211 StopObservingAttachmentBroker(); | |
212 DispatchMessages(); | |
213 } | |
214 } | |
215 | |
216 void ChannelReader::StartObservingAttachmentBroker() { | |
217 #if USE_ATTACHMENT_BROKER | |
218 GetAttachmentBroker()->AddObserver(this); | |
219 #endif // USE_ATTACHMENT_BROKER | |
220 } | |
221 | |
222 void ChannelReader::StopObservingAttachmentBroker() { | |
223 #if USE_ATTACHMENT_BROKER | |
224 GetAttachmentBroker()->RemoveObserver(this); | |
225 #endif // USE_ATTACHMENT_BROKER | |
226 } | |
114 | 227 |
115 } // namespace internal | 228 } // namespace internal |
116 } // namespace IPC | 229 } // namespace IPC |
OLD | NEW |