OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/router.h" | 5 #include "mojo/public/cpp/bindings/lib/router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
(...skipping 23 matching lines...) Expand all Loading... |
34 : router_(router), | 34 : router_(router), |
35 accept_was_invoked_(false), | 35 accept_was_invoked_(false), |
36 task_runner_(std::move(runner)) {} | 36 task_runner_(std::move(runner)) {} |
37 ~ResponderThunk() override { | 37 ~ResponderThunk() override { |
38 if (!accept_was_invoked_) { | 38 if (!accept_was_invoked_) { |
39 // The Mojo application handled a message that was expecting a response | 39 // The Mojo application handled a message that was expecting a response |
40 // but did not send a response. | 40 // but did not send a response. |
41 // We raise an error to signal the calling application that an error | 41 // We raise an error to signal the calling application that an error |
42 // condition occurred. Without this the calling application would have no | 42 // condition occurred. Without this the calling application would have no |
43 // way of knowing it should stop waiting for a response. | 43 // way of knowing it should stop waiting for a response. |
| 44 Result error(Result::Type::RESPONSE_DROPPED); |
44 if (task_runner_->RunsTasksOnCurrentThread()) { | 45 if (task_runner_->RunsTasksOnCurrentThread()) { |
45 // Please note that even if this code is run from a different task | 46 // Please note that even if this code is run from a different task |
46 // runner on the same thread as |task_runner_|, it is okay to directly | 47 // runner on the same thread as |task_runner_|, it is okay to directly |
47 // call Router::RaiseError(), because it will raise error from the | 48 // call Router::RaiseError(), because it will raise error from the |
48 // correct task runner asynchronously. | 49 // correct task runner asynchronously. |
49 if (router_) | 50 if (router_) |
50 router_->RaiseError(); | 51 router_->RaiseError(std::move(error)); |
51 } else { | 52 } else { |
52 task_runner_->PostTask(FROM_HERE, | 53 task_runner_->PostTask(FROM_HERE, |
53 base::Bind(&Router::RaiseError, router_)); | 54 base::Bind(&Router::RaiseError, router_, |
| 55 base::Passed(&error))); |
54 } | 56 } |
55 } | 57 } |
56 } | 58 } |
57 | 59 |
58 // MessageReceiver implementation: | 60 // MessageReceiver implementation: |
59 bool Accept(Message* message) override { | 61 Result Accept(Message* message) override { |
60 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 62 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
61 accept_was_invoked_ = true; | 63 accept_was_invoked_ = true; |
62 DCHECK(message->has_flag(kMessageIsResponse)); | 64 DCHECK(message->has_flag(kMessageIsResponse)); |
63 | 65 |
64 bool result = false; | 66 if (!router_) |
| 67 return Result(Result::Type::RESPONSE_DROPPED); |
65 | 68 |
66 if (router_) | 69 return router_->Accept(message); |
67 result = router_->Accept(message); | |
68 | |
69 return result; | |
70 } | 70 } |
71 | 71 |
72 // MessageReceiverWithStatus implementation: | 72 // MessageReceiverWithStatus implementation: |
73 bool IsValid() override { | 73 bool IsValid() override { |
74 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 74 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
75 return router_ && !router_->encountered_error() && router_->is_valid(); | 75 return router_ && !router_->encountered_error() && router_->is_valid(); |
76 } | 76 } |
77 | 77 |
78 void DCheckInvalid(const std::string& message) override { | 78 void DCheckInvalid(const std::string& message) override { |
79 if (task_runner_->RunsTasksOnCurrentThread()) { | 79 if (task_runner_->RunsTasksOnCurrentThread()) { |
(...skipping 21 matching lines...) Expand all Loading... |
101 | 101 |
102 // ---------------------------------------------------------------------------- | 102 // ---------------------------------------------------------------------------- |
103 | 103 |
104 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) | 104 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) |
105 : router_(router) { | 105 : router_(router) { |
106 } | 106 } |
107 | 107 |
108 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { | 108 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { |
109 } | 109 } |
110 | 110 |
111 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { | 111 MessageReceiver::Result Router::HandleIncomingMessageThunk::Accept( |
| 112 Message* message) { |
112 return router_->HandleIncomingMessage(message); | 113 return router_->HandleIncomingMessage(message); |
113 } | 114 } |
114 | 115 |
115 // ---------------------------------------------------------------------------- | 116 // ---------------------------------------------------------------------------- |
116 | 117 |
117 Router::Router(ScopedMessagePipeHandle message_pipe, | 118 Router::Router(ScopedMessagePipeHandle message_pipe, |
118 FilterChain filters, | 119 FilterChain filters, |
119 bool expects_sync_requests, | 120 bool expects_sync_requests, |
120 scoped_refptr<base::SingleThreadTaskRunner> runner) | 121 scoped_refptr<base::SingleThreadTaskRunner> runner) |
121 : thunk_(this), | 122 : thunk_(this), |
122 filters_(std::move(filters)), | 123 filters_(std::move(filters)), |
123 connector_(std::move(message_pipe), | 124 connector_(std::move(message_pipe), |
124 Connector::SINGLE_THREADED_SEND, | 125 Connector::SINGLE_THREADED_SEND, |
125 std::move(runner)), | 126 std::move(runner)), |
126 incoming_receiver_(nullptr), | 127 incoming_receiver_(nullptr), |
127 next_request_id_(0), | 128 next_request_id_(0), |
128 testing_mode_(false), | 129 testing_mode_(false), |
129 pending_task_for_messages_(false), | 130 pending_task_for_messages_(false), |
130 encountered_error_(false), | 131 encountered_error_(false), |
131 weak_factory_(this) { | 132 weak_factory_(this) { |
132 filters_.SetSink(&thunk_); | 133 filters_.SetSink(&thunk_); |
133 if (expects_sync_requests) | 134 if (expects_sync_requests) |
134 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 135 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
135 connector_.set_incoming_receiver(filters_.GetHead()); | 136 connector_.set_incoming_receiver(filters_.GetHead()); |
136 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); | 137 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); |
137 } | 138 } |
138 | 139 |
139 Router::~Router() {} | 140 Router::~Router() {} |
140 | 141 |
141 bool Router::Accept(Message* message) { | 142 MessageReceiver::Result Router::Accept(Message* message) { |
142 DCHECK(thread_checker_.CalledOnValidThread()); | 143 DCHECK(thread_checker_.CalledOnValidThread()); |
143 DCHECK(!message->has_flag(kMessageExpectsResponse)); | 144 DCHECK(!message->has_flag(kMessageExpectsResponse)); |
144 return connector_.Accept(message); | 145 return connector_.Accept(message); |
145 } | 146 } |
146 | 147 |
147 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { | 148 MessageReceiver::Result Router::AcceptWithResponder( |
| 149 Message* message, |
| 150 MessageReceiver* responder) { |
148 DCHECK(thread_checker_.CalledOnValidThread()); | 151 DCHECK(thread_checker_.CalledOnValidThread()); |
149 DCHECK(message->has_flag(kMessageExpectsResponse)); | 152 DCHECK(message->has_flag(kMessageExpectsResponse)); |
150 | 153 |
151 // Reserve 0 in case we want it to convey special meaning in the future. | 154 // Reserve 0 in case we want it to convey special meaning in the future. |
152 uint64_t request_id = next_request_id_++; | 155 uint64_t request_id = next_request_id_++; |
153 if (request_id == 0) | 156 if (request_id == 0) |
154 request_id = next_request_id_++; | 157 request_id = next_request_id_++; |
155 | 158 |
156 bool is_sync = message->has_flag(kMessageIsSync); | 159 bool is_sync = message->has_flag(kMessageIsSync); |
157 message->set_request_id(request_id); | 160 message->set_request_id(request_id); |
158 if (!connector_.Accept(message)) | 161 if (!connector_.Accept(message).Succeeded()) |
159 return false; | 162 return Result(Result::Type::SEND_FAILED); |
160 | 163 |
161 if (!is_sync) { | 164 if (!is_sync) { |
162 // We assume ownership of |responder|. | 165 // We assume ownership of |responder|. |
163 async_responders_[request_id] = base::WrapUnique(responder); | 166 async_responders_[request_id] = base::WrapUnique(responder); |
164 return true; | 167 return Result::ForSuccess(); |
165 } | 168 } |
166 | 169 |
167 bool response_received = false; | 170 bool response_received = false; |
168 std::unique_ptr<MessageReceiver> sync_responder(responder); | 171 std::unique_ptr<MessageReceiver> sync_responder(responder); |
169 sync_responses_.insert(std::make_pair( | 172 sync_responses_.insert(std::make_pair( |
170 request_id, base::WrapUnique(new SyncResponseInfo(&response_received)))); | 173 request_id, base::WrapUnique(new SyncResponseInfo(&response_received)))); |
171 | 174 |
172 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | 175 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
173 connector_.SyncWatch(&response_received); | 176 connector_.SyncWatch(&response_received); |
174 // Make sure that this instance hasn't been destroyed. | 177 // Make sure that this instance hasn't been destroyed. |
175 if (weak_self) { | 178 if (weak_self) { |
176 DCHECK(ContainsKey(sync_responses_, request_id)); | 179 DCHECK(ContainsKey(sync_responses_, request_id)); |
177 auto iter = sync_responses_.find(request_id); | 180 auto iter = sync_responses_.find(request_id); |
178 DCHECK_EQ(&response_received, iter->second->response_received); | 181 DCHECK_EQ(&response_received, iter->second->response_received); |
179 if (response_received) { | 182 if (response_received) { |
180 std::unique_ptr<Message> response = std::move(iter->second->response); | 183 std::unique_ptr<Message> response = std::move(iter->second->response); |
181 ignore_result(sync_responder->Accept(response.get())); | 184 ignore_result(sync_responder->Accept(response.get())); |
182 } | 185 } |
183 sync_responses_.erase(iter); | 186 sync_responses_.erase(iter); |
184 } | 187 } |
185 | 188 |
186 // Return true means that we take ownership of |responder|. | 189 // Success means that we take ownership of |responder|. |
187 return true; | 190 return Result::ForSuccess(); |
188 } | 191 } |
189 | 192 |
190 void Router::EnableTestingMode() { | 193 void Router::EnableTestingMode() { |
191 DCHECK(thread_checker_.CalledOnValidThread()); | 194 DCHECK(thread_checker_.CalledOnValidThread()); |
192 testing_mode_ = true; | 195 testing_mode_ = true; |
193 connector_.set_enforce_errors_from_incoming_receiver(false); | 196 connector_.set_enforce_errors_from_incoming_receiver(false); |
194 } | 197 } |
195 | 198 |
196 bool Router::HandleIncomingMessage(Message* message) { | 199 MessageReceiver::Result Router::HandleIncomingMessage(Message* message) { |
197 DCHECK(thread_checker_.CalledOnValidThread()); | 200 DCHECK(thread_checker_.CalledOnValidThread()); |
198 | 201 |
199 const bool during_sync_call = | 202 const bool during_sync_call = |
200 connector_.during_sync_handle_watcher_callback(); | 203 connector_.during_sync_handle_watcher_callback(); |
201 if (!message->has_flag(kMessageIsSync) && | 204 if (!message->has_flag(kMessageIsSync) && |
202 (during_sync_call || !pending_messages_.empty())) { | 205 (during_sync_call || !pending_messages_.empty())) { |
203 std::unique_ptr<Message> pending_message(new Message); | 206 std::unique_ptr<Message> pending_message(new Message); |
204 message->MoveTo(pending_message.get()); | 207 message->MoveTo(pending_message.get()); |
205 pending_messages_.push(std::move(pending_message)); | 208 pending_messages_.push(std::move(pending_message)); |
206 | 209 |
207 if (!pending_task_for_messages_) { | 210 if (!pending_task_for_messages_) { |
208 pending_task_for_messages_ = true; | 211 pending_task_for_messages_ = true; |
209 connector_.task_runner()->PostTask( | 212 connector_.task_runner()->PostTask( |
210 FROM_HERE, base::Bind(&Router::HandleQueuedMessages, | 213 FROM_HERE, base::Bind(&Router::HandleQueuedMessages, |
211 weak_factory_.GetWeakPtr())); | 214 weak_factory_.GetWeakPtr())); |
212 } | 215 } |
213 | 216 |
214 return true; | 217 return Result::ForSuccess(); |
215 } | 218 } |
216 | 219 |
217 return HandleMessageInternal(message); | 220 return HandleMessageInternal(message); |
218 } | 221 } |
219 | 222 |
220 void Router::HandleQueuedMessages() { | 223 void Router::HandleQueuedMessages() { |
221 DCHECK(thread_checker_.CalledOnValidThread()); | 224 DCHECK(thread_checker_.CalledOnValidThread()); |
222 DCHECK(pending_task_for_messages_); | 225 DCHECK(pending_task_for_messages_); |
223 | 226 |
224 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | 227 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
225 while (!pending_messages_.empty()) { | 228 while (!pending_messages_.empty()) { |
226 std::unique_ptr<Message> message(std::move(pending_messages_.front())); | 229 std::unique_ptr<Message> message(std::move(pending_messages_.front())); |
227 pending_messages_.pop(); | 230 pending_messages_.pop(); |
228 | 231 |
229 bool result = HandleMessageInternal(message.get()); | 232 Result result = HandleMessageInternal(message.get()); |
230 if (!weak_self) | 233 if (!weak_self) |
231 return; | 234 return; |
232 | 235 |
233 if (!result && !testing_mode_) { | 236 if (!result.Succeeded() && !testing_mode_) { |
234 connector_.RaiseError(); | 237 connector_.RaiseError(std::move(result)); |
235 break; | 238 break; |
236 } | 239 } |
237 } | 240 } |
238 | 241 |
239 pending_task_for_messages_ = false; | 242 pending_task_for_messages_ = false; |
240 | 243 |
241 // We may have already seen a connection error from the connector, but | 244 // We may have already seen a connection error from the connector, but |
242 // haven't notified the user because we want to process all the queued | 245 // haven't notified the user because we want to process all the queued |
243 // messages first. We should do it now. | 246 // messages first. We should do it now. |
244 if (connector_.encountered_error() && !encountered_error_) | 247 if (connector_.encountered_error() && !encountered_error_) |
245 OnConnectionError(); | 248 OnConnectionError(); |
246 } | 249 } |
247 | 250 |
248 bool Router::HandleMessageInternal(Message* message) { | 251 MessageReceiver::Result Router::HandleMessageInternal(Message* message) { |
249 if (message->has_flag(kMessageExpectsResponse)) { | 252 if (message->has_flag(kMessageExpectsResponse)) { |
250 if (!incoming_receiver_) | 253 if (!incoming_receiver_) |
251 return false; | 254 return Result(Result::Type::REQUEST_DROPPED); |
252 | 255 |
253 MessageReceiverWithStatus* responder = new ResponderThunk( | 256 MessageReceiverWithStatus* responder = new ResponderThunk( |
254 weak_factory_.GetWeakPtr(), connector_.task_runner()); | 257 weak_factory_.GetWeakPtr(), connector_.task_runner()); |
255 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); | 258 Result result = incoming_receiver_->AcceptWithResponder(message, responder); |
256 if (!ok) | 259 if (!result.Succeeded()) |
257 delete responder; | 260 delete responder; |
258 return ok; | 261 return result; |
259 | 262 |
260 } else if (message->has_flag(kMessageIsResponse)) { | 263 } else if (message->has_flag(kMessageIsResponse)) { |
261 uint64_t request_id = message->request_id(); | 264 uint64_t request_id = message->request_id(); |
262 | 265 |
263 if (message->has_flag(kMessageIsSync)) { | 266 if (message->has_flag(kMessageIsSync)) { |
264 auto it = sync_responses_.find(request_id); | 267 auto it = sync_responses_.find(request_id); |
265 if (it == sync_responses_.end()) { | 268 if (it == sync_responses_.end()) { |
266 DCHECK(testing_mode_); | 269 DCHECK(testing_mode_); |
267 return false; | 270 return Result::ForUnexpectedResponse(interface_name_, message); |
268 } | 271 } |
269 it->second->response.reset(new Message()); | 272 it->second->response.reset(new Message()); |
270 message->MoveTo(it->second->response.get()); | 273 message->MoveTo(it->second->response.get()); |
271 *it->second->response_received = true; | 274 *it->second->response_received = true; |
272 return true; | 275 return Result::ForSuccess(); |
273 } | 276 } |
274 | 277 |
275 auto it = async_responders_.find(request_id); | 278 auto it = async_responders_.find(request_id); |
276 if (it == async_responders_.end()) { | 279 if (it == async_responders_.end()) { |
277 DCHECK(testing_mode_); | 280 DCHECK(testing_mode_); |
278 return false; | 281 return Result::ForUnexpectedResponse(interface_name_, message); |
279 } | 282 } |
280 std::unique_ptr<MessageReceiver> responder = std::move(it->second); | 283 std::unique_ptr<MessageReceiver> responder = std::move(it->second); |
281 async_responders_.erase(it); | 284 async_responders_.erase(it); |
282 return responder->Accept(message); | 285 return responder->Accept(message); |
283 } else { | 286 } else { |
284 if (!incoming_receiver_) | 287 if (!incoming_receiver_) |
285 return false; | 288 return Result(Result::Type::REQUEST_DROPPED); |
286 | 289 |
287 return incoming_receiver_->Accept(message); | 290 return incoming_receiver_->Accept(message); |
288 } | 291 } |
289 } | 292 } |
290 | 293 |
291 void Router::OnConnectionError() { | 294 void Router::OnConnectionError() { |
292 if (encountered_error_) | 295 if (encountered_error_) |
293 return; | 296 return; |
294 | 297 |
295 if (!pending_messages_.empty()) { | 298 if (!pending_messages_.empty()) { |
(...skipping 13 matching lines...) Expand all Loading... |
309 } | 312 } |
310 | 313 |
311 encountered_error_ = true; | 314 encountered_error_ = true; |
312 error_handler_.Run(); | 315 error_handler_.Run(); |
313 } | 316 } |
314 | 317 |
315 // ---------------------------------------------------------------------------- | 318 // ---------------------------------------------------------------------------- |
316 | 319 |
317 } // namespace internal | 320 } // namespace internal |
318 } // namespace mojo | 321 } // namespace mojo |
OLD | NEW |