OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_channel_reader.h" | 5 #include "ipc/ipc_channel_reader.h" |
6 | 6 |
| 7 #include <algorithm> |
| 8 |
7 #include "ipc/ipc_listener.h" | 9 #include "ipc/ipc_listener.h" |
8 #include "ipc/ipc_logging.h" | 10 #include "ipc/ipc_logging.h" |
| 11 #include "ipc/ipc_message.h" |
| 12 #include "ipc/ipc_message_attachment_set.h" |
9 #include "ipc/ipc_message_macros.h" | 13 #include "ipc/ipc_message_macros.h" |
10 | 14 |
11 namespace IPC { | 15 namespace IPC { |
12 namespace internal { | 16 namespace internal { |
13 | 17 |
14 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { | 18 ChannelReader::ChannelReader(Listener* listener) : listener_(listener) { |
15 memset(input_buf_, 0, sizeof(input_buf_)); | 19 memset(input_buf_, 0, sizeof(input_buf_)); |
16 } | 20 } |
17 | 21 |
18 ChannelReader::~ChannelReader() { | 22 ChannelReader::~ChannelReader() { |
| 23 if (!blocked_ids_.empty()) |
| 24 StopObservingAttachmentBroker(); |
19 } | 25 } |
20 | 26 |
21 bool ChannelReader::ProcessIncomingMessages() { | 27 ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() { |
22 while (true) { | 28 while (true) { |
23 int bytes_read = 0; | 29 int bytes_read = 0; |
24 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, | 30 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, |
25 &bytes_read); | 31 &bytes_read); |
26 if (read_state == READ_FAILED) | 32 if (read_state == READ_FAILED) |
27 return false; | 33 return DISPATCH_ERROR; |
28 if (read_state == READ_PENDING) | 34 if (read_state == READ_PENDING) |
29 return true; | 35 return DISPATCH_FINISHED; |
30 | 36 |
31 DCHECK(bytes_read > 0); | 37 DCHECK(bytes_read > 0); |
32 if (!DispatchInputData(input_buf_, bytes_read)) | 38 if (!TranslateInputData(input_buf_, bytes_read)) |
33 return false; | 39 return DISPATCH_ERROR; |
| 40 |
| 41 DispatchState state = DispatchMessages(); |
| 42 if (state != DISPATCH_FINISHED) |
| 43 return state; |
34 } | 44 } |
35 } | 45 } |
36 | 46 |
37 bool ChannelReader::AsyncReadComplete(int bytes_read) { | 47 ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) { |
38 return DispatchInputData(input_buf_, bytes_read); | 48 if (!TranslateInputData(input_buf_, bytes_read)) |
| 49 return DISPATCH_ERROR; |
| 50 |
| 51 return DispatchMessages(); |
39 } | 52 } |
40 | 53 |
41 bool ChannelReader::IsInternalMessage(const Message& m) { | 54 bool ChannelReader::IsInternalMessage(const Message& m) { |
42 return m.routing_id() == MSG_ROUTING_NONE && | 55 return m.routing_id() == MSG_ROUTING_NONE && |
43 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && | 56 m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && |
44 m.type() <= Channel::HELLO_MESSAGE_TYPE; | 57 m.type() <= Channel::HELLO_MESSAGE_TYPE; |
45 } | 58 } |
46 | 59 |
47 bool ChannelReader::IsHelloMessage(const Message& m) { | 60 bool ChannelReader::IsHelloMessage(const Message& m) { |
48 return m.routing_id() == MSG_ROUTING_NONE && | 61 return m.routing_id() == MSG_ROUTING_NONE && |
49 m.type() == Channel::HELLO_MESSAGE_TYPE; | 62 m.type() == Channel::HELLO_MESSAGE_TYPE; |
50 } | 63 } |
51 | 64 |
52 bool ChannelReader::DispatchInputData(const char* input_data, | 65 bool ChannelReader::TranslateInputData(const char* input_data, |
53 int input_data_len) { | 66 int input_data_len) { |
54 const char* p; | 67 const char* p; |
55 const char* end; | 68 const char* end; |
56 | 69 |
57 // Possibly combine with the overflow buffer to make a larger buffer. | 70 // Possibly combine with the overflow buffer to make a larger buffer. |
58 if (input_overflow_buf_.empty()) { | 71 if (input_overflow_buf_.empty()) { |
59 p = input_data; | 72 p = input_data; |
60 end = input_data + input_data_len; | 73 end = input_data + input_data_len; |
61 } else { | 74 } else { |
62 if (input_overflow_buf_.size() + input_data_len > | 75 if (input_overflow_buf_.size() + input_data_len > |
63 Channel::kMaximumMessageSize) { | 76 Channel::kMaximumMessageSize) { |
64 input_overflow_buf_.clear(); | 77 input_overflow_buf_.clear(); |
65 LOG(ERROR) << "IPC message is too big"; | 78 LOG(ERROR) << "IPC message is too big"; |
66 return false; | 79 return false; |
67 } | 80 } |
68 input_overflow_buf_.append(input_data, input_data_len); | 81 input_overflow_buf_.append(input_data, input_data_len); |
69 p = input_overflow_buf_.data(); | 82 p = input_overflow_buf_.data(); |
70 end = p + input_overflow_buf_.size(); | 83 end = p + input_overflow_buf_.size(); |
71 } | 84 } |
72 | 85 |
73 // Dispatch all complete messages in the data buffer. | 86 // Dispatch all complete messages in the data buffer. |
74 while (p < end) { | 87 while (p < end) { |
75 const char* message_tail = Message::FindNext(p, end); | 88 const char* message_tail = Message::FindNext(p, end); |
76 if (message_tail) { | 89 if (message_tail) { |
77 int len = static_cast<int>(message_tail - p); | 90 int len = static_cast<int>(message_tail - p); |
78 Message m(p, len); | 91 |
79 if (!WillDispatchInputMessage(&m)) | 92 Message translated_message(p, len); |
| 93 if (!GetNonBrokeredAttachments(&translated_message)) |
80 return false; | 94 return false; |
81 | 95 |
82 #ifdef IPC_MESSAGE_LOG_ENABLED | 96 // If there are no queued messages, attempt to immediately dispatch the |
83 std::string name; | 97 // newly translated message. |
84 Logging::GetInstance()->GetMessageText(m.type(), &name, &m, NULL); | 98 if (queued_messages_.empty()) { |
85 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", | 99 DCHECK(blocked_ids_.empty()); |
86 "name", name); | 100 AttachmentIdSet blocked_ids = |
87 #else | 101 GetBrokeredAttachments(&translated_message); |
88 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", | 102 |
89 "class", IPC_MESSAGE_ID_CLASS(m.type()), | 103 if (blocked_ids.empty()) { |
90 "line", IPC_MESSAGE_ID_LINE(m.type())); | 104 // Dispatch the message and continue the loop. |
91 #endif | 105 DispatchMessage(&translated_message); |
92 m.TraceMessageEnd(); | 106 p = message_tail; |
93 if (IsInternalMessage(m)) | 107 continue; |
94 HandleInternalMessage(m); | 108 } |
95 else | 109 |
96 listener_->OnMessageReceived(m); | 110 blocked_ids_.swap(blocked_ids); |
97 if (m.dispatch_error()) | 111 StartObservingAttachmentBroker(); |
98 listener_->OnBadMessageReceived(m); | 112 } |
| 113 |
| 114 // Make a deep copy of |translated_message| to add to the queue. |
| 115 scoped_ptr<Message> m(new Message(translated_message)); |
| 116 queued_messages_.push_back(m.release()); |
99 p = message_tail; | 117 p = message_tail; |
100 } else { | 118 } else { |
101 // Last message is partial. | 119 // Last message is partial. |
102 break; | 120 break; |
103 } | 121 } |
104 } | 122 } |
105 | 123 |
106 // Save any partial data in the overflow buffer. | 124 // Save any partial data in the overflow buffer. |
107 input_overflow_buf_.assign(p, end - p); | 125 input_overflow_buf_.assign(p, end - p); |
108 | 126 |
109 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) | 127 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) |
110 return false; | 128 return false; |
111 return true; | 129 return true; |
112 } | 130 } |
113 | 131 |
| 132 ChannelReader::DispatchState ChannelReader::DispatchMessages() { |
| 133 while (!queued_messages_.empty()) { |
| 134 if (!blocked_ids_.empty()) |
| 135 return DISPATCH_WAITING_ON_BROKER; |
| 136 |
| 137 Message* m = queued_messages_.front(); |
| 138 |
| 139 AttachmentIdSet blocked_ids = GetBrokeredAttachments(m); |
| 140 if (!blocked_ids.empty()) { |
| 141 blocked_ids_.swap(blocked_ids); |
| 142 StartObservingAttachmentBroker(); |
| 143 return DISPATCH_WAITING_ON_BROKER; |
| 144 } |
| 145 |
| 146 DispatchMessage(m); |
| 147 queued_messages_.erase(queued_messages_.begin()); |
| 148 } |
| 149 return DISPATCH_FINISHED; |
| 150 } |
| 151 |
| 152 void ChannelReader::DispatchMessage(Message* m) { |
| 153 #ifdef IPC_MESSAGE_LOG_ENABLED |
| 154 std::string name; |
| 155 Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL); |
| 156 TRACE_EVENT1("ipc,toplevel", "ChannelReader::DispatchInputData", "name", |
| 157 name); |
| 158 #else |
| 159 TRACE_EVENT2("ipc,toplevel", "ChannelReader::DispatchInputData", "class", |
| 160 IPC_MESSAGE_ID_CLASS(m->type()), "line", |
| 161 IPC_MESSAGE_ID_LINE(m->type())); |
| 162 #endif |
| 163 m->TraceMessageEnd(); |
| 164 if (IsInternalMessage(*m)) |
| 165 HandleInternalMessage(*m); |
| 166 else |
| 167 listener_->OnMessageReceived(*m); |
| 168 if (m->dispatch_error()) |
| 169 listener_->OnBadMessageReceived(*m); |
| 170 } |
| 171 |
| 172 ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments( |
| 173 Message* msg) { |
| 174 std::set<BrokerableAttachment::AttachmentId> blocked_ids; |
| 175 |
| 176 #if USE_ATTACHMENT_BROKER |
| 177 MessageAttachmentSet* set = msg->attachment_set(); |
| 178 for (const scoped_refptr<BrokerableAttachment>& attachment : |
| 179 set->GetBrokerableAttachmentsForUpdating()) { |
| 180 if (attachment->NeedsBrokering()) { |
| 181 AttachmentBroker* broker = GetAttachmentBroker(); |
| 182 scoped_refptr<BrokerableAttachment> brokered_attachment; |
| 183 bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(), |
| 184 &brokered_attachment); |
| 185 if (!result) { |
| 186 blocked_ids.insert(attachment->GetIdentifier()); |
| 187 continue; |
| 188 } |
| 189 |
| 190 attachment->PopulateWithAttachment(brokered_attachment.get()); |
| 191 } |
| 192 } |
| 193 #endif // USE_ATTACHMENT_BROKER |
| 194 |
| 195 return blocked_ids; |
| 196 } |
| 197 |
| 198 void ChannelReader::ReceivedBrokerableAttachmentWithId( |
| 199 const BrokerableAttachment::AttachmentId& id) { |
| 200 if (blocked_ids_.empty()) |
| 201 return; |
| 202 |
| 203 auto it = find(blocked_ids_.begin(), blocked_ids_.end(), id); |
| 204 if (it != blocked_ids_.end()) |
| 205 blocked_ids_.erase(it); |
| 206 |
| 207 if (blocked_ids_.empty()) { |
| 208 StopObservingAttachmentBroker(); |
| 209 DispatchMessages(); |
| 210 } |
| 211 } |
| 212 |
| 213 void ChannelReader::StartObservingAttachmentBroker() { |
| 214 #if USE_ATTACHMENT_BROKER |
| 215 GetAttachmentBroker()->AddObserver(this); |
| 216 #endif // USE_ATTACHMENT_BROKER |
| 217 } |
| 218 |
| 219 void ChannelReader::StopObservingAttachmentBroker() { |
| 220 #if USE_ATTACHMENT_BROKER |
| 221 GetAttachmentBroker()->RemoveObserver(this); |
| 222 #endif // USE_ATTACHMENT_BROKER |
| 223 } |
114 | 224 |
115 } // namespace internal | 225 } // namespace internal |
116 } // namespace IPC | 226 } // namespace IPC |
OLD | NEW |