Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(367)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 1723673002: Reland "Mojo C++ bindings: support sync methods - part 2" (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/connector.h" 5 #include "mojo/public/cpp/bindings/lib/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h"
10 #include "base/logging.h" 11 #include "base/logging.h"
11 #include "base/macros.h" 12 #include "base/macros.h"
12 #include "base/synchronization/lock.h" 13 #include "base/synchronization/lock.h"
14 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
13 15
14 namespace mojo { 16 namespace mojo {
15 namespace internal { 17 namespace internal {
16 18
17 namespace { 19 namespace {
18 20
19 // Similar to base::AutoLock, except that it does nothing if |lock| passed into 21 // Similar to base::AutoLock, except that it does nothing if |lock| passed into
20 // the constructor is null. 22 // the constructor is null.
21 class MayAutoLock { 23 class MayAutoLock {
22 public: 24 public:
(...skipping 22 matching lines...) Expand all
45 ConnectorConfig config, 47 ConnectorConfig config,
46 const MojoAsyncWaiter* waiter) 48 const MojoAsyncWaiter* waiter)
47 : waiter_(waiter), 49 : waiter_(waiter),
48 message_pipe_(std::move(message_pipe)), 50 message_pipe_(std::move(message_pipe)),
49 incoming_receiver_(nullptr), 51 incoming_receiver_(nullptr),
50 async_wait_id_(0), 52 async_wait_id_(0),
51 error_(false), 53 error_(false),
52 drop_writes_(false), 54 drop_writes_(false),
53 enforce_errors_from_incoming_receiver_(true), 55 enforce_errors_from_incoming_receiver_(true),
54 paused_(false), 56 paused_(false),
55 destroyed_flag_(nullptr), 57 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr),
56 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) { 58 register_sync_handle_watch_count_(0),
59 registered_with_sync_handle_watcher_(false),
60 sync_handle_watcher_callback_count_(0),
61 weak_factory_(this) {
57 // Even though we don't have an incoming receiver, we still want to monitor 62 // Even though we don't have an incoming receiver, we still want to monitor
58 // the message pipe to know if is closed or encounters an error. 63 // the message pipe to know if is closed or encounters an error.
59 WaitToReadMore(); 64 WaitToReadMore();
60 } 65 }
61 66
62 Connector::~Connector() { 67 Connector::~Connector() {
63 DCHECK(thread_checker_.CalledOnValidThread()); 68 DCHECK(thread_checker_.CalledOnValidThread());
64 69
65 if (destroyed_flag_)
66 *destroyed_flag_ = true;
67
68 CancelWait(); 70 CancelWait();
69 } 71 }
70 72
71 void Connector::CloseMessagePipe() { 73 void Connector::CloseMessagePipe() {
72 DCHECK(thread_checker_.CalledOnValidThread()); 74 DCHECK(thread_checker_.CalledOnValidThread());
73 75
74 CancelWait(); 76 CancelWait();
75 MayAutoLock locker(lock_.get()); 77 MayAutoLock locker(lock_.get());
76 Close(std::move(message_pipe_)); 78 Close(std::move(message_pipe_));
77 } 79 }
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 CHECK(false) << "Race condition or other bug detected"; 186 CHECK(false) << "Race condition or other bug detected";
185 return false; 187 return false;
186 default: 188 default:
187 // This particular write was rejected, presumably because of bad input. 189 // This particular write was rejected, presumably because of bad input.
188 // The pipe is not necessarily in a bad state. 190 // The pipe is not necessarily in a bad state.
189 return false; 191 return false;
190 } 192 }
191 return true; 193 return true;
192 } 194 }
193 195
196 bool Connector::RegisterSyncHandleWatch() {
197 DCHECK(thread_checker_.CalledOnValidThread());
198
199 if (error_)
200 return false;
201
202 register_sync_handle_watch_count_++;
203
204 if (!registered_with_sync_handle_watcher_ && !paused_) {
205 registered_with_sync_handle_watcher_ =
206 SyncHandleWatcher::current()->RegisterHandle(
207 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
208 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
209 base::Unretained(this)));
210 }
211 return true;
212 }
213
214 void Connector::UnregisterSyncHandleWatch() {
215 DCHECK(thread_checker_.CalledOnValidThread());
216
217 if (register_sync_handle_watch_count_ == 0) {
218 NOTREACHED();
219 return;
220 }
221
222 register_sync_handle_watch_count_--;
223 if (register_sync_handle_watch_count_ > 0)
224 return;
225
226 if (registered_with_sync_handle_watcher_) {
227 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
228 registered_with_sync_handle_watcher_ = false;
229 }
230 }
231
232 bool Connector::RunSyncHandleWatch(const bool* should_stop) {
233 DCHECK(thread_checker_.CalledOnValidThread());
234 DCHECK_GT(register_sync_handle_watch_count_, 0u);
235
236 if (error_)
237 return false;
238
239 ResumeIncomingMethodCallProcessing();
240
241 if (!should_stop_sync_handle_watch_)
242 should_stop_sync_handle_watch_ = new base::RefCountedData<bool>(false);
243
244 // This object may be destroyed during the WatchAllHandles() call. So we have
245 // to preserve the boolean that WatchAllHandles uses.
246 scoped_refptr<base::RefCountedData<bool>> preserver =
247 should_stop_sync_handle_watch_;
248 const bool* should_stop_array[] = {should_stop,
249 &should_stop_sync_handle_watch_->data};
250 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2);
251 }
252
194 // static 253 // static
195 void Connector::CallOnHandleReady(void* closure, MojoResult result) { 254 void Connector::CallOnHandleReady(void* closure, MojoResult result) {
196 Connector* self = static_cast<Connector*>(closure); 255 Connector* self = static_cast<Connector*>(closure);
197 self->OnHandleReady(result); 256 CHECK(self->async_wait_id_ != 0);
257 self->async_wait_id_ = 0;
258 self->OnHandleReadyInternal(result);
198 } 259 }
199 260
200 void Connector::OnHandleReady(MojoResult result) { 261 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
262 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr());
263
264 sync_handle_watcher_callback_count_++;
265 OnHandleReadyInternal(result);
266 // At this point, this object might have been deleted.
267 if (weak_self)
268 sync_handle_watcher_callback_count_--;
269 }
270
271 void Connector::OnHandleReadyInternal(MojoResult result) {
201 DCHECK(thread_checker_.CalledOnValidThread()); 272 DCHECK(thread_checker_.CalledOnValidThread());
202 273
203 CHECK(async_wait_id_ != 0);
204 async_wait_id_ = 0;
205 if (result != MOJO_RESULT_OK) { 274 if (result != MOJO_RESULT_OK) {
206 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 275 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
207 return; 276 return;
208 } 277 }
209 ReadAllAvailableMessages(); 278 ReadAllAvailableMessages();
210 // At this point, this object might have been deleted. Return. 279 // At this point, this object might have been deleted. Return.
211 } 280 }
212 281
213 void Connector::WaitToReadMore() { 282 void Connector::WaitToReadMore() {
214 CHECK(!async_wait_id_); 283 CHECK(!async_wait_id_);
215 CHECK(!paused_); 284 CHECK(!paused_);
216 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), 285 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
217 MOJO_HANDLE_SIGNAL_READABLE, 286 MOJO_HANDLE_SIGNAL_READABLE,
218 MOJO_DEADLINE_INDEFINITE, 287 MOJO_DEADLINE_INDEFINITE,
219 &Connector::CallOnHandleReady, 288 &Connector::CallOnHandleReady,
220 this); 289 this);
290
291 if (register_sync_handle_watch_count_ > 0 &&
292 !registered_with_sync_handle_watcher_) {
293 registered_with_sync_handle_watcher_ =
294 SyncHandleWatcher::current()->RegisterHandle(
295 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
296 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
297 base::Unretained(this)));
298 }
221 } 299 }
222 300
223 bool Connector::ReadSingleMessage(MojoResult* read_result) { 301 bool Connector::ReadSingleMessage(MojoResult* read_result) {
224 CHECK(!paused_); 302 CHECK(!paused_);
225 303
226 bool receiver_result = false; 304 bool receiver_result = false;
227 305
228 // Detect if |this| was destroyed during message dispatch. Allow for the 306 // Detect if |this| was destroyed during message dispatch. Allow for the
229 // possibility of re-entering ReadMore() through message dispatch. 307 // possibility of re-entering ReadMore() through message dispatch.
230 bool was_destroyed_during_dispatch = false; 308 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr();
231 bool* previous_destroyed_flag = destroyed_flag_;
232 destroyed_flag_ = &was_destroyed_during_dispatch;
233 309
234 Message message; 310 Message message;
235 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 311 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
236 *read_result = rv; 312 *read_result = rv;
237 313
238 if (rv == MOJO_RESULT_OK) { 314 if (rv == MOJO_RESULT_OK) {
239 // Dispatching the message may spin in a nested message loop. To ensure we 315 // Dispatching the message may spin in a nested message loop. To ensure we
240 // continue dispatching messages when this happens start listening for 316 // continue dispatching messages when this happens start listening for
241 // messagse now. 317 // messagse now.
242 if (!async_wait_id_) { 318 if (!async_wait_id_) {
243 // TODO: Need to evaluate the perf impact of this. 319 // TODO: Need to evaluate the perf impact of this.
244 WaitToReadMore(); 320 WaitToReadMore();
245 } 321 }
246 receiver_result = 322 receiver_result =
247 incoming_receiver_ && incoming_receiver_->Accept(&message); 323 incoming_receiver_ && incoming_receiver_->Accept(&message);
248 } 324 }
249 325
250 if (was_destroyed_during_dispatch) { 326 if (!weak_self)
251 if (previous_destroyed_flag)
252 *previous_destroyed_flag = true; // Propagate flag.
253 return false; 327 return false;
254 }
255
256 destroyed_flag_ = previous_destroyed_flag;
257 328
258 if (rv == MOJO_RESULT_SHOULD_WAIT) 329 if (rv == MOJO_RESULT_SHOULD_WAIT)
259 return true; 330 return true;
260 331
261 if (rv != MOJO_RESULT_OK) { 332 if (rv != MOJO_RESULT_OK) {
262 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 333 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
263 return false; 334 return false;
264 } 335 }
265 336
266 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { 337 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
(...skipping 21 matching lines...) Expand all
288 // because of thread hops. In that case, there'll still be an async 359 // because of thread hops. In that case, there'll still be an async
289 // waiter. 360 // waiter.
290 if (!async_wait_id_) 361 if (!async_wait_id_)
291 WaitToReadMore(); 362 WaitToReadMore();
292 break; 363 break;
293 } 364 }
294 } 365 }
295 } 366 }
296 367
297 void Connector::CancelWait() { 368 void Connector::CancelWait() {
298 if (!async_wait_id_) 369 if (async_wait_id_) {
299 return; 370 waiter_->CancelWait(async_wait_id_);
371 async_wait_id_ = 0;
372 }
300 373
301 waiter_->CancelWait(async_wait_id_); 374 if (registered_with_sync_handle_watcher_) {
302 async_wait_id_ = 0; 375 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
376 registered_with_sync_handle_watcher_ = false;
377 }
378
379 if (should_stop_sync_handle_watch_)
380 should_stop_sync_handle_watch_->data = true;
303 } 381 }
304 382
305 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { 383 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
306 if (error_ || !message_pipe_.is_valid()) 384 if (error_ || !message_pipe_.is_valid())
307 return; 385 return;
308 386
309 if (!force_pipe_reset && force_async_handler) 387 if (!force_pipe_reset && force_async_handler)
310 force_pipe_reset = true; 388 force_pipe_reset = true;
311 389
312 if (paused_) { 390 if (paused_) {
(...skipping 19 matching lines...) Expand all
332 if (!paused_) 410 if (!paused_)
333 WaitToReadMore(); 411 WaitToReadMore();
334 } else { 412 } else {
335 error_ = true; 413 error_ = true;
336 connection_error_handler_.Run(); 414 connection_error_handler_.Run();
337 } 415 }
338 } 416 }
339 417
340 } // namespace internal 418 } // namespace internal
341 } // namespace mojo 419 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/connector.h ('k') | mojo/public/cpp/bindings/lib/interface_ptr_state.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698