Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(86)

Side by Side Diff: mojo/public/cpp/bindings/lib/router.cc

Issue 1832193002: Mojo C++ bindings: refactor SyncHandleWatcher. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/router.h" 5 #include "mojo/public/cpp/bindings/lib/router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
117 filters_(std::move(filters)), 117 filters_(std::move(filters)),
118 connector_(std::move(message_pipe), Connector::SINGLE_THREADED_SEND), 118 connector_(std::move(message_pipe), Connector::SINGLE_THREADED_SEND),
119 incoming_receiver_(nullptr), 119 incoming_receiver_(nullptr),
120 next_request_id_(0), 120 next_request_id_(0),
121 testing_mode_(false), 121 testing_mode_(false),
122 pending_task_for_messages_(false), 122 pending_task_for_messages_(false),
123 encountered_error_(false), 123 encountered_error_(false),
124 weak_factory_(this) { 124 weak_factory_(this) {
125 filters_.SetSink(&thunk_); 125 filters_.SetSink(&thunk_);
126 if (expects_sync_requests) 126 if (expects_sync_requests)
127 connector_.RegisterSyncHandleWatch(); 127 connector_.AllowWokenUpBySyncWatchOnSameThread();
128 connector_.set_incoming_receiver(filters_.GetHead()); 128 connector_.set_incoming_receiver(filters_.GetHead());
129 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); 129 connector_.set_connection_error_handler([this]() { OnConnectionError(); });
130 } 130 }
131 131
132 Router::~Router() {} 132 Router::~Router() {}
133 133
134 bool Router::Accept(Message* message) { 134 bool Router::Accept(Message* message) {
135 DCHECK(thread_checker_.CalledOnValidThread()); 135 DCHECK(thread_checker_.CalledOnValidThread());
136 DCHECK(!message->has_flag(kMessageExpectsResponse)); 136 DCHECK(!message->has_flag(kMessageExpectsResponse));
137 return connector_.Accept(message); 137 return connector_.Accept(message);
(...skipping 11 matching lines...) Expand all
149 message->set_request_id(request_id); 149 message->set_request_id(request_id);
150 if (!connector_.Accept(message)) 150 if (!connector_.Accept(message))
151 return false; 151 return false;
152 152
153 if (!message->has_flag(kMessageIsSync)) { 153 if (!message->has_flag(kMessageIsSync)) {
154 // We assume ownership of |responder|. 154 // We assume ownership of |responder|.
155 async_responders_[request_id] = make_scoped_ptr(responder); 155 async_responders_[request_id] = make_scoped_ptr(responder);
156 return true; 156 return true;
157 } 157 }
158 158
159 if (!connector_.RegisterSyncHandleWatch())
160 return false;
161
162 bool response_received = false; 159 bool response_received = false;
163 scoped_ptr<MessageReceiver> sync_responder(responder); 160 scoped_ptr<MessageReceiver> sync_responder(responder);
164 sync_responses_.insert(std::make_pair( 161 sync_responses_.insert(std::make_pair(
165 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); 162 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
166 163
167 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); 164 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
168 bool result = connector_.RunSyncHandleWatch(&response_received); 165 bool result = connector_.SyncWatch(&response_received);
169 // Make sure that this instance hasn't been destroyed. 166 // Make sure that this instance hasn't been destroyed.
170 if (weak_self) { 167 if (weak_self) {
171 DCHECK(ContainsKey(sync_responses_, request_id)); 168 DCHECK(ContainsKey(sync_responses_, request_id));
172 auto iter = sync_responses_.find(request_id); 169 auto iter = sync_responses_.find(request_id);
173 DCHECK_EQ(&response_received, iter->second->response_received); 170 DCHECK_EQ(&response_received, iter->second->response_received);
174 if (result && response_received) { 171 if (result && response_received) {
175 scoped_ptr<Message> response = std::move(iter->second->response); 172 scoped_ptr<Message> response = std::move(iter->second->response);
176 ignore_result(sync_responder->Accept(response.get())); 173 ignore_result(sync_responder->Accept(response.get()));
177 } 174 }
178 sync_responses_.erase(iter); 175 sync_responses_.erase(iter);
179
180 connector_.UnregisterSyncHandleWatch();
181 } 176 }
182 177
183 // Return true means that we take ownership of |responder|. 178 // Return true means that we take ownership of |responder|.
184 return true; 179 return true;
185 } 180 }
186 181
187 void Router::EnableTestingMode() { 182 void Router::EnableTestingMode() {
188 DCHECK(thread_checker_.CalledOnValidThread()); 183 DCHECK(thread_checker_.CalledOnValidThread());
189 testing_mode_ = true; 184 testing_mode_ = true;
190 connector_.set_enforce_errors_from_incoming_receiver(false); 185 connector_.set_enforce_errors_from_incoming_receiver(false);
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
279 return responder->Accept(message); 274 return responder->Accept(message);
280 } else { 275 } else {
281 if (!incoming_receiver_) 276 if (!incoming_receiver_)
282 return false; 277 return false;
283 278
284 return incoming_receiver_->Accept(message); 279 return incoming_receiver_->Accept(message);
285 } 280 }
286 } 281 }
287 282
288 void Router::OnConnectionError() { 283 void Router::OnConnectionError() {
289 DCHECK(!encountered_error_); 284 DCHECK(!encountered_error_);
Ken Rockot(use gerrit already) 2016/03/26 03:09:26 If this DCHECK is appropriate then the function sh
yzshen1 2016/03/26 06:33:17 Oops! The DCHECK should be removed. Thanks!
290 285
286 if (encountered_error_)
287 return;
288
291 if (!pending_messages_.empty()) { 289 if (!pending_messages_.empty()) {
292 // After all the pending messages are processed, we will check whether an 290 // After all the pending messages are processed, we will check whether an
293 // error has been encountered and run the user's connection error handler 291 // error has been encountered and run the user's connection error handler
294 // if necessary. 292 // if necessary.
295 DCHECK(pending_task_for_messages_); 293 DCHECK(pending_task_for_messages_);
296 return; 294 return;
297 } 295 }
298 296
297 if (connector_.during_sync_handle_watcher_callback()) {
298 // We don't want the error handler to reenter an ongoing sync call.
299 base::MessageLoop::current()->PostTask(
300 FROM_HERE,
301 base::Bind(&Router::OnConnectionError, weak_factory_.GetWeakPtr()));
302 return;
303 }
304
299 encountered_error_ = true; 305 encountered_error_ = true;
300 error_handler_.Run(); 306 error_handler_.Run();
301 } 307 }
302 308
303 // ---------------------------------------------------------------------------- 309 // ----------------------------------------------------------------------------
304 310
305 } // namespace internal 311 } // namespace internal
306 } // namespace mojo 312 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698