Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(184)

Side by Side Diff: ipc/ipc_channel_reader.cc

Issue 1206093002: Update ChannelReader to use AttachmentBroker. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@attachment_broker3_listener
Patch Set: Comments from tsepez. Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_channel_reader.h ('k') | ipc/ipc_channel_reader_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_channel_reader.h" 5 #include "ipc/ipc_channel_reader.h"
6 6
7 #include <algorithm>
8
7 #include "ipc/ipc_listener.h" 9 #include "ipc/ipc_listener.h"
8 #include "ipc/ipc_logging.h" 10 #include "ipc/ipc_logging.h"
11 #include "ipc/ipc_message.h"
12 #include "ipc/ipc_message_attachment_set.h"
9 #include "ipc/ipc_message_macros.h" 13 #include "ipc/ipc_message_macros.h"
10 14
11 namespace IPC { 15 namespace IPC {
12 namespace internal { 16 namespace internal {
13 17
14 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { 18 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) {
15 memset(input_buf_, 0, sizeof(input_buf_)); 19 memset(input_buf_, 0, sizeof(input_buf_));
16 } 20 }
17 21
18 ChannelReader::~ChannelReader() { 22 ChannelReader::~ChannelReader() {
23 if (!blocked_ids_.empty())
24 StopObservingAttachmentBroker();
19 } 25 }
20 26
21 bool ChannelReader::ProcessIncomingMessages() { 27 ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() {
22 while (true) { 28 while (true) {
23 int bytes_read = 0; 29 int bytes_read = 0;
24 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, 30 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
25 &bytes_read); 31 &bytes_read);
26 if (read_state == READ_FAILED) 32 if (read_state == READ_FAILED)
27 return false; 33 return DISPATCH_ERROR;
28 if (read_state == READ_PENDING) 34 if (read_state == READ_PENDING)
29 return true; 35 return DISPATCH_FINISHED;
30 36
31 DCHECK(bytes_read > 0); 37 DCHECK(bytes_read > 0);
32 if (!DispatchInputData(input_buf_, bytes_read)) 38 if (!TranslateInputData(input_buf_, bytes_read))
33 return false; 39 return DISPATCH_ERROR;
40
41 DispatchState state = DispatchMessages();
42 if (state != DISPATCH_FINISHED)
43 return state;
34 } 44 }
35 } 45 }
36 46
37 bool ChannelReader::AsyncReadComplete(int bytes_read) { 47 ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) {
38 return DispatchInputData(input_buf_, bytes_read); 48 if (!TranslateInputData(input_buf_, bytes_read))
49 return DISPATCH_ERROR;
50
51 return DispatchMessages();
39 } 52 }
40 53
41 bool ChannelReader::IsInternalMessage(const Message& m) { 54 bool ChannelReader::IsInternalMessage(const Message& m) {
42 return m.routing_id() == MSG_ROUTING_NONE && 55 return m.routing_id() == MSG_ROUTING_NONE &&
43 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && 56 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE &&
44 m.type() <= Channel::HELLO_MESSAGE_TYPE; 57 m.type() <= Channel::HELLO_MESSAGE_TYPE;
45 } 58 }
46 59
47 bool ChannelReader::IsHelloMessage(const Message& m) { 60 bool ChannelReader::IsHelloMessage(const Message& m) {
48 return m.routing_id() == MSG_ROUTING_NONE && 61 return m.routing_id() == MSG_ROUTING_NONE &&
49 m.type() == Channel::HELLO_MESSAGE_TYPE; 62 m.type() == Channel::HELLO_MESSAGE_TYPE;
50 } 63 }
51 64
52 bool ChannelReader::DispatchInputData(const char* input_data, 65 bool ChannelReader::TranslateInputData(const char* input_data,
53 int input_data_len) { 66 int input_data_len) {
54 const char* p; 67 const char* p;
55 const char* end; 68 const char* end;
56 69
57 // Possibly combine with the overflow buffer to make a larger buffer. 70 // Possibly combine with the overflow buffer to make a larger buffer.
58 if (input_overflow_buf_.empty()) { 71 if (input_overflow_buf_.empty()) {
59 p = input_data; 72 p = input_data;
60 end = input_data + input_data_len; 73 end = input_data + input_data_len;
61 } else { 74 } else {
62 if (input_overflow_buf_.size() + input_data_len > 75 if (input_overflow_buf_.size() + input_data_len >
63 Channel::kMaximumMessageSize) { 76 Channel::kMaximumMessageSize) {
64 input_overflow_buf_.clear(); 77 input_overflow_buf_.clear();
65 LOG(ERROR) << "IPC message is too big"; 78 LOG(ERROR) << "IPC message is too big";
66 return false; 79 return false;
67 } 80 }
68 input_overflow_buf_.append(input_data, input_data_len); 81 input_overflow_buf_.append(input_data, input_data_len);
69 p = input_overflow_buf_.data(); 82 p = input_overflow_buf_.data();
70 end = p + input_overflow_buf_.size(); 83 end = p + input_overflow_buf_.size();
71 } 84 }
72 85
73 // Dispatch all complete messages in the data buffer. 86 // Dispatch all complete messages in the data buffer.
74 while (p < end) { 87 while (p < end) {
75 const char* message_tail = Message::FindNext(p, end); 88 const char* message_tail = Message::FindNext(p, end);
76 if (message_tail) { 89 if (message_tail) {
77 int len = static_cast<int>(message_tail - p); 90 int len = static_cast<int>(message_tail - p);
78 Message m(p, len); 91
79 if (!WillDispatchInputMessage(&m)) 92 Message translated_message(p, len);
93 if (!GetNonBrokeredAttachments(&translated_message))
80 return false; 94 return false;
81 95
82 #ifdef IPC_MESSAGE_LOG_ENABLED 96 // If there are no queued messages, attempt to immediately dispatch the
83 std::string name; 97 // newly translated message.
84 Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL); 98 if (queued_messages_.empty()) {
85 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", 99 DCHECK(blocked_ids_.empty());
86 "name", name); 100 AttachmentIdSet blocked_ids =
87 #else 101 GetBrokeredAttachments(&translated_message);
88 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", 102
89 "class", IPC_MESSAGE_ID_CLASS(m.type()), 103 if (blocked_ids.empty()) {
90 "line", IPC_MESSAGE_ID_LINE(m.type())); 104 // Dispatch the message and continue the loop.
91 #endif 105 DispatchMessage(&translated_message);
92 m.TraceMessageEnd(); 106 p = message_tail;
93 if (IsInternalMessage(m)) 107 continue;
94 HandleInternalMessage(m); 108 }
95 else 109
96 listener_->OnMessageReceived(m); 110 blocked_ids_.swap(blocked_ids);
97 if (m.dispatch_error()) 111 StartObservingAttachmentBroker();
98 listener_->OnBadMessageReceived(m); 112 }
113
114 // Make a deep copy of |translated_message| to add to the queue.
115 scoped_ptr<Message> m(new Message(translated_message));
116 queued_messages_.push_back(m.release());
99 p = message_tail; 117 p = message_tail;
100 } else { 118 } else {
101 // Last message is partial. 119 // Last message is partial.
102 break; 120 break;
103 } 121 }
104 } 122 }
105 123
106 // Save any partial data in the overflow buffer. 124 // Save any partial data in the overflow buffer.
107 input_overflow_buf_.assign(p, end - p); 125 input_overflow_buf_.assign(p, end - p);
108 126
109 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) 127 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
110 return false; 128 return false;
111 return true; 129 return true;
112 } 130 }
113 131
132 ChannelReader::DispatchState ChannelReader::DispatchMessages() {
133 while (!queued_messages_.empty()) {
134 if (!blocked_ids_.empty())
135 return DISPATCH_WAITING_ON_BROKER;
136
137 Message* m = queued_messages_.front();
138
139 AttachmentIdSet blocked_ids = GetBrokeredAttachments(m);
140 if (!blocked_ids.empty()) {
141 blocked_ids_.swap(blocked_ids);
142 StartObservingAttachmentBroker();
143 return DISPATCH_WAITING_ON_BROKER;
144 }
145
146 DispatchMessage(m);
147 queued_messages_.erase(queued_messages_.begin());
148 }
149 return DISPATCH_FINISHED;
150 }
151
152 void ChannelReader::DispatchMessage(Message* m) {
153 #ifdef IPC_MESSAGE_LOG_ENABLED
154 std::string name;
155 Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL);
156 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name",
157 name);
158 #else
159 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class",
160 IPC_MESSAGE_ID_CLASS(m->type()), "line",
161 IPC_MESSAGE_ID_LINE(m->type()));
162 #endif
163 m->TraceMessageEnd();
164 if (IsInternalMessage(*m))
165 HandleInternalMessage(*m);
166 else
167 listener_->OnMessageReceived(*m);
168 if (m->dispatch_error())
169 listener_->OnBadMessageReceived(*m);
170 }
171
172 ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments(
173 Message* msg) {
174 std::set<BrokerableAttachment::AttachmentId> blocked_ids;
175
176 #if USE_ATTACHMENT_BROKER
177 MessageAttachmentSet* set = msg->attachment_set();
178 for (const scoped_refptr<BrokerableAttachment>& attachment :
179 set->GetBrokerableAttachmentsForUpdating()) {
180 if (attachment->NeedsBrokering()) {
181 AttachmentBroker* broker = GetAttachmentBroker();
182 scoped_refptr<BrokerableAttachment> brokered_attachment;
183 bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(),
184 &brokered_attachment);
185 if (!result) {
186 blocked_ids.insert(attachment->GetIdentifier());
187 continue;
188 }
189
190 attachment->PopulateWithAttachment(brokered_attachment.get());
191 }
192 }
193 #endif // USE_ATTACHMENT_BROKER
194
195 return blocked_ids;
196 }
197
198 void ChannelReader::ReceivedBrokerableAttachmentWithId(
199 const BrokerableAttachment::AttachmentId& id) {
200 if (blocked_ids_.empty())
201 return;
202
203 auto it = find(blocked_ids_.begin(), blocked_ids_.end(), id);
204 if (it != blocked_ids_.end())
205 blocked_ids_.erase(it);
206
207 if (blocked_ids_.empty()) {
208 StopObservingAttachmentBroker();
209 DispatchMessages();
210 }
211 }
212
213 void ChannelReader::StartObservingAttachmentBroker() {
214 #if USE_ATTACHMENT_BROKER
215 GetAttachmentBroker()->AddObserver(this);
216 #endif // USE_ATTACHMENT_BROKER
217 }
218
219 void ChannelReader::StopObservingAttachmentBroker() {
220 #if USE_ATTACHMENT_BROKER
221 GetAttachmentBroker()->RemoveObserver(this);
222 #endif // USE_ATTACHMENT_BROKER
223 }
114 224
115 } // namespace internal 225 } // namespace internal
116 } // namespace IPC 226 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_channel_reader.h ('k') | ipc/ipc_channel_reader_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698