Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1169)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 1720093002: Revert of Mojo C++ bindings: support sync methods - part 2 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/connector.h" 5 #include "mojo/public/cpp/bindings/lib/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h"
11 #include "base/logging.h" 10 #include "base/logging.h"
12 #include "base/macros.h" 11 #include "base/macros.h"
13 #include "base/synchronization/lock.h" 12 #include "base/synchronization/lock.h"
14 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
15 13
16 namespace mojo { 14 namespace mojo {
17 namespace internal { 15 namespace internal {
18 16
19 namespace { 17 namespace {
20 18
21 // Similar to base::AutoLock, except that it does nothing if |lock| passed into 19 // Similar to base::AutoLock, except that it does nothing if |lock| passed into
22 // the constructor is null. 20 // the constructor is null.
23 class MayAutoLock { 21 class MayAutoLock {
24 public: 22 public:
(...skipping 22 matching lines...) Expand all
47 ConnectorConfig config, 45 ConnectorConfig config,
48 const MojoAsyncWaiter* waiter) 46 const MojoAsyncWaiter* waiter)
49 : waiter_(waiter), 47 : waiter_(waiter),
50 message_pipe_(std::move(message_pipe)), 48 message_pipe_(std::move(message_pipe)),
51 incoming_receiver_(nullptr), 49 incoming_receiver_(nullptr),
52 async_wait_id_(0), 50 async_wait_id_(0),
53 error_(false), 51 error_(false),
54 drop_writes_(false), 52 drop_writes_(false),
55 enforce_errors_from_incoming_receiver_(true), 53 enforce_errors_from_incoming_receiver_(true),
56 paused_(false), 54 paused_(false),
57 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), 55 destroyed_flag_(nullptr),
58 register_sync_handle_watch_count_(0), 56 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) {
59 registered_with_sync_handle_watcher_(false),
60 sync_handle_watcher_callback_count_(0),
61 weak_factory_(this) {
62 // Even though we don't have an incoming receiver, we still want to monitor 57 // Even though we don't have an incoming receiver, we still want to monitor
63 // the message pipe to know if is closed or encounters an error. 58 // the message pipe to know if is closed or encounters an error.
64 WaitToReadMore(); 59 WaitToReadMore();
65 } 60 }
66 61
67 Connector::~Connector() { 62 Connector::~Connector() {
68 DCHECK(thread_checker_.CalledOnValidThread()); 63 DCHECK(thread_checker_.CalledOnValidThread());
69 64
65 if (destroyed_flag_)
66 *destroyed_flag_ = true;
67
70 CancelWait(); 68 CancelWait();
71 } 69 }
72 70
73 void Connector::CloseMessagePipe() { 71 void Connector::CloseMessagePipe() {
74 DCHECK(thread_checker_.CalledOnValidThread()); 72 DCHECK(thread_checker_.CalledOnValidThread());
75 73
76 CancelWait(); 74 CancelWait();
77 MayAutoLock locker(lock_.get()); 75 MayAutoLock locker(lock_.get());
78 Close(std::move(message_pipe_)); 76 Close(std::move(message_pipe_));
79 } 77 }
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
186 CHECK(false) << "Race condition or other bug detected"; 184 CHECK(false) << "Race condition or other bug detected";
187 return false; 185 return false;
188 default: 186 default:
189 // This particular write was rejected, presumably because of bad input. 187 // This particular write was rejected, presumably because of bad input.
190 // The pipe is not necessarily in a bad state. 188 // The pipe is not necessarily in a bad state.
191 return false; 189 return false;
192 } 190 }
193 return true; 191 return true;
194 } 192 }
195 193
196 bool Connector::RegisterSyncHandleWatch() {
197 DCHECK(thread_checker_.CalledOnValidThread());
198
199 if (error_)
200 return false;
201
202 register_sync_handle_watch_count_++;
203
204 if (!registered_with_sync_handle_watcher_ && !paused_) {
205 registered_with_sync_handle_watcher_ =
206 SyncHandleWatcher::current()->RegisterHandle(
207 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
208 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
209 base::Unretained(this)));
210 }
211 return true;
212 }
213
214 void Connector::UnregisterSyncHandleWatch() {
215 DCHECK(thread_checker_.CalledOnValidThread());
216
217 if (register_sync_handle_watch_count_ == 0) {
218 NOTREACHED();
219 return;
220 }
221
222 register_sync_handle_watch_count_--;
223 if (register_sync_handle_watch_count_ > 0)
224 return;
225
226 if (registered_with_sync_handle_watcher_) {
227 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
228 registered_with_sync_handle_watcher_ = false;
229 }
230 }
231
232 bool Connector::RunSyncHandleWatch(const bool* should_stop) {
233 DCHECK(thread_checker_.CalledOnValidThread());
234 DCHECK_GT(register_sync_handle_watch_count_, 0u);
235
236 if (error_)
237 return false;
238
239 ResumeIncomingMethodCallProcessing();
240
241 return SyncHandleWatcher::current()->WatchAllHandles(message_pipe_.get(),
242 should_stop);
243 }
244
245 // static 194 // static
246 void Connector::CallOnHandleReady(void* closure, MojoResult result) { 195 void Connector::CallOnHandleReady(void* closure, MojoResult result) {
247 Connector* self = static_cast<Connector*>(closure); 196 Connector* self = static_cast<Connector*>(closure);
248 CHECK(self->async_wait_id_ != 0); 197 self->OnHandleReady(result);
249 self->async_wait_id_ = 0;
250 self->OnHandleReadyInternal(result);
251 } 198 }
252 199
253 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { 200 void Connector::OnHandleReady(MojoResult result) {
254 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr());
255
256 sync_handle_watcher_callback_count_++;
257 OnHandleReadyInternal(result);
258 // At this point, this object might have been deleted.
259 if (weak_self)
260 sync_handle_watcher_callback_count_--;
261 }
262
263 void Connector::OnHandleReadyInternal(MojoResult result) {
264 DCHECK(thread_checker_.CalledOnValidThread()); 201 DCHECK(thread_checker_.CalledOnValidThread());
265 202
203 CHECK(async_wait_id_ != 0);
204 async_wait_id_ = 0;
266 if (result != MOJO_RESULT_OK) { 205 if (result != MOJO_RESULT_OK) {
267 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 206 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
268 return; 207 return;
269 } 208 }
270 ReadAllAvailableMessages(); 209 ReadAllAvailableMessages();
271 // At this point, this object might have been deleted. Return. 210 // At this point, this object might have been deleted. Return.
272 } 211 }
273 212
274 void Connector::WaitToReadMore() { 213 void Connector::WaitToReadMore() {
275 CHECK(!async_wait_id_); 214 CHECK(!async_wait_id_);
276 CHECK(!paused_); 215 CHECK(!paused_);
277 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), 216 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
278 MOJO_HANDLE_SIGNAL_READABLE, 217 MOJO_HANDLE_SIGNAL_READABLE,
279 MOJO_DEADLINE_INDEFINITE, 218 MOJO_DEADLINE_INDEFINITE,
280 &Connector::CallOnHandleReady, 219 &Connector::CallOnHandleReady,
281 this); 220 this);
282
283 if (register_sync_handle_watch_count_ > 0 &&
284 !registered_with_sync_handle_watcher_) {
285 registered_with_sync_handle_watcher_ =
286 SyncHandleWatcher::current()->RegisterHandle(
287 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
288 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
289 base::Unretained(this)));
290 }
291 } 221 }
292 222
293 bool Connector::ReadSingleMessage(MojoResult* read_result) { 223 bool Connector::ReadSingleMessage(MojoResult* read_result) {
294 CHECK(!paused_); 224 CHECK(!paused_);
295 225
296 bool receiver_result = false; 226 bool receiver_result = false;
297 227
298 // Detect if |this| was destroyed during message dispatch. Allow for the 228 // Detect if |this| was destroyed during message dispatch. Allow for the
299 // possibility of re-entering ReadMore() through message dispatch. 229 // possibility of re-entering ReadMore() through message dispatch.
300 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); 230 bool was_destroyed_during_dispatch = false;
231 bool* previous_destroyed_flag = destroyed_flag_;
232 destroyed_flag_ = &was_destroyed_during_dispatch;
301 233
302 Message message; 234 Message message;
303 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 235 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
304 *read_result = rv; 236 *read_result = rv;
305 237
306 if (rv == MOJO_RESULT_OK) { 238 if (rv == MOJO_RESULT_OK) {
307 // Dispatching the message may spin in a nested message loop. To ensure we 239 // Dispatching the message may spin in a nested message loop. To ensure we
308 // continue dispatching messages when this happens start listening for 240 // continue dispatching messages when this happens start listening for
309 // messagse now. 241 // messagse now.
310 if (!async_wait_id_) { 242 if (!async_wait_id_) {
311 // TODO: Need to evaluate the perf impact of this. 243 // TODO: Need to evaluate the perf impact of this.
312 WaitToReadMore(); 244 WaitToReadMore();
313 } 245 }
314 receiver_result = 246 receiver_result =
315 incoming_receiver_ && incoming_receiver_->Accept(&message); 247 incoming_receiver_ && incoming_receiver_->Accept(&message);
316 } 248 }
317 249
318 if (!weak_self) 250 if (was_destroyed_during_dispatch) {
251 if (previous_destroyed_flag)
252 *previous_destroyed_flag = true; // Propagate flag.
319 return false; 253 return false;
254 }
255
256 destroyed_flag_ = previous_destroyed_flag;
320 257
321 if (rv == MOJO_RESULT_SHOULD_WAIT) 258 if (rv == MOJO_RESULT_SHOULD_WAIT)
322 return true; 259 return true;
323 260
324 if (rv != MOJO_RESULT_OK) { 261 if (rv != MOJO_RESULT_OK) {
325 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 262 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
326 return false; 263 return false;
327 } 264 }
328 265
329 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { 266 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
(...skipping 21 matching lines...) Expand all
351 // because of thread hops. In that case, there'll still be an async 288 // because of thread hops. In that case, there'll still be an async
352 // waiter. 289 // waiter.
353 if (!async_wait_id_) 290 if (!async_wait_id_)
354 WaitToReadMore(); 291 WaitToReadMore();
355 break; 292 break;
356 } 293 }
357 } 294 }
358 } 295 }
359 296
360 void Connector::CancelWait() { 297 void Connector::CancelWait() {
361 if (async_wait_id_) { 298 if (!async_wait_id_)
362 waiter_->CancelWait(async_wait_id_); 299 return;
363 async_wait_id_ = 0;
364 }
365 300
366 if (registered_with_sync_handle_watcher_) { 301 waiter_->CancelWait(async_wait_id_);
367 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); 302 async_wait_id_ = 0;
368 registered_with_sync_handle_watcher_ = false;
369 }
370 } 303 }
371 304
372 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { 305 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
373 if (error_ || !message_pipe_.is_valid()) 306 if (error_ || !message_pipe_.is_valid())
374 return; 307 return;
375 308
376 if (!force_pipe_reset && force_async_handler) 309 if (!force_pipe_reset && force_async_handler)
377 force_pipe_reset = true; 310 force_pipe_reset = true;
378 311
379 if (paused_) { 312 if (paused_) {
(...skipping 19 matching lines...) Expand all
399 if (!paused_) 332 if (!paused_)
400 WaitToReadMore(); 333 WaitToReadMore();
401 } else { 334 } else {
402 error_ = true; 335 error_ = true;
403 connection_error_handler_.Run(); 336 connection_error_handler_.Run();
404 } 337 }
405 } 338 }
406 339
407 } // namespace internal 340 } // namespace internal
408 } // namespace mojo 341 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/connector.h ('k') | mojo/public/cpp/bindings/lib/interface_ptr_state.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698