OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/router.h" | 5 #include "mojo/public/cpp/bindings/lib/router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
117 filters_(std::move(filters)), | 117 filters_(std::move(filters)), |
118 connector_(std::move(message_pipe), Connector::SINGLE_THREADED_SEND), | 118 connector_(std::move(message_pipe), Connector::SINGLE_THREADED_SEND), |
119 incoming_receiver_(nullptr), | 119 incoming_receiver_(nullptr), |
120 next_request_id_(0), | 120 next_request_id_(0), |
121 testing_mode_(false), | 121 testing_mode_(false), |
122 pending_task_for_messages_(false), | 122 pending_task_for_messages_(false), |
123 encountered_error_(false), | 123 encountered_error_(false), |
124 weak_factory_(this) { | 124 weak_factory_(this) { |
125 filters_.SetSink(&thunk_); | 125 filters_.SetSink(&thunk_); |
126 if (expects_sync_requests) | 126 if (expects_sync_requests) |
127 connector_.RegisterSyncHandleWatch(); | 127 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
128 connector_.set_incoming_receiver(filters_.GetHead()); | 128 connector_.set_incoming_receiver(filters_.GetHead()); |
129 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); | 129 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); |
130 } | 130 } |
131 | 131 |
132 Router::~Router() {} | 132 Router::~Router() {} |
133 | 133 |
134 bool Router::Accept(Message* message) { | 134 bool Router::Accept(Message* message) { |
135 DCHECK(thread_checker_.CalledOnValidThread()); | 135 DCHECK(thread_checker_.CalledOnValidThread()); |
136 DCHECK(!message->has_flag(kMessageExpectsResponse)); | 136 DCHECK(!message->has_flag(kMessageExpectsResponse)); |
137 return connector_.Accept(message); | 137 return connector_.Accept(message); |
(...skipping 11 matching lines...) Expand all Loading... |
149 message->set_request_id(request_id); | 149 message->set_request_id(request_id); |
150 if (!connector_.Accept(message)) | 150 if (!connector_.Accept(message)) |
151 return false; | 151 return false; |
152 | 152 |
153 if (!message->has_flag(kMessageIsSync)) { | 153 if (!message->has_flag(kMessageIsSync)) { |
154 // We assume ownership of |responder|. | 154 // We assume ownership of |responder|. |
155 async_responders_[request_id] = make_scoped_ptr(responder); | 155 async_responders_[request_id] = make_scoped_ptr(responder); |
156 return true; | 156 return true; |
157 } | 157 } |
158 | 158 |
159 if (!connector_.RegisterSyncHandleWatch()) | |
160 return false; | |
161 | |
162 bool response_received = false; | 159 bool response_received = false; |
163 scoped_ptr<MessageReceiver> sync_responder(responder); | 160 scoped_ptr<MessageReceiver> sync_responder(responder); |
164 sync_responses_.insert(std::make_pair( | 161 sync_responses_.insert(std::make_pair( |
165 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); | 162 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); |
166 | 163 |
167 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | 164 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
168 bool result = connector_.RunSyncHandleWatch(&response_received); | 165 bool result = connector_.SyncWatch(&response_received); |
169 // Make sure that this instance hasn't been destroyed. | 166 // Make sure that this instance hasn't been destroyed. |
170 if (weak_self) { | 167 if (weak_self) { |
171 DCHECK(ContainsKey(sync_responses_, request_id)); | 168 DCHECK(ContainsKey(sync_responses_, request_id)); |
172 auto iter = sync_responses_.find(request_id); | 169 auto iter = sync_responses_.find(request_id); |
173 DCHECK_EQ(&response_received, iter->second->response_received); | 170 DCHECK_EQ(&response_received, iter->second->response_received); |
174 if (result && response_received) { | 171 if (result && response_received) { |
175 scoped_ptr<Message> response = std::move(iter->second->response); | 172 scoped_ptr<Message> response = std::move(iter->second->response); |
176 ignore_result(sync_responder->Accept(response.get())); | 173 ignore_result(sync_responder->Accept(response.get())); |
177 } | 174 } |
178 sync_responses_.erase(iter); | 175 sync_responses_.erase(iter); |
179 | |
180 connector_.UnregisterSyncHandleWatch(); | |
181 } | 176 } |
182 | 177 |
183 // Return true means that we take ownership of |responder|. | 178 // Return true means that we take ownership of |responder|. |
184 return true; | 179 return true; |
185 } | 180 } |
186 | 181 |
187 void Router::EnableTestingMode() { | 182 void Router::EnableTestingMode() { |
188 DCHECK(thread_checker_.CalledOnValidThread()); | 183 DCHECK(thread_checker_.CalledOnValidThread()); |
189 testing_mode_ = true; | 184 testing_mode_ = true; |
190 connector_.set_enforce_errors_from_incoming_receiver(false); | 185 connector_.set_enforce_errors_from_incoming_receiver(false); |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
279 return responder->Accept(message); | 274 return responder->Accept(message); |
280 } else { | 275 } else { |
281 if (!incoming_receiver_) | 276 if (!incoming_receiver_) |
282 return false; | 277 return false; |
283 | 278 |
284 return incoming_receiver_->Accept(message); | 279 return incoming_receiver_->Accept(message); |
285 } | 280 } |
286 } | 281 } |
287 | 282 |
288 void Router::OnConnectionError() { | 283 void Router::OnConnectionError() { |
289 DCHECK(!encountered_error_); | 284 if (encountered_error_) |
| 285 return; |
290 | 286 |
291 if (!pending_messages_.empty()) { | 287 if (!pending_messages_.empty()) { |
292 // After all the pending messages are processed, we will check whether an | 288 // After all the pending messages are processed, we will check whether an |
293 // error has been encountered and run the user's connection error handler | 289 // error has been encountered and run the user's connection error handler |
294 // if necessary. | 290 // if necessary. |
295 DCHECK(pending_task_for_messages_); | 291 DCHECK(pending_task_for_messages_); |
296 return; | 292 return; |
297 } | 293 } |
298 | 294 |
| 295 if (connector_.during_sync_handle_watcher_callback()) { |
| 296 // We don't want the error handler to reenter an ongoing sync call. |
| 297 base::MessageLoop::current()->PostTask( |
| 298 FROM_HERE, |
| 299 base::Bind(&Router::OnConnectionError, weak_factory_.GetWeakPtr())); |
| 300 return; |
| 301 } |
| 302 |
299 encountered_error_ = true; | 303 encountered_error_ = true; |
300 error_handler_.Run(); | 304 error_handler_.Run(); |
301 } | 305 } |
302 | 306 |
303 // ---------------------------------------------------------------------------- | 307 // ---------------------------------------------------------------------------- |
304 | 308 |
305 } // namespace internal | 309 } // namespace internal |
306 } // namespace mojo | 310 } // namespace mojo |
OLD | NEW |