Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(127)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 2350883003: Mojo C++ bindings: fix Connector teardown. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/connector.h" 5 #include "mojo/public/cpp/bindings/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/location.h" 11 #include "base/location.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/synchronization/lock.h" 14 #include "base/synchronization/lock.h"
15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" 15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" 16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
17 17
18 namespace mojo { 18 namespace mojo {
19 19
20 Connector::Connector(ScopedMessagePipeHandle message_pipe, 20 Connector::Connector(ScopedMessagePipeHandle message_pipe,
21 ConnectorConfig config, 21 ConnectorConfig config,
22 scoped_refptr<base::SingleThreadTaskRunner> runner) 22 scoped_refptr<base::SingleThreadTaskRunner> runner)
23 : message_pipe_(std::move(message_pipe)), 23 : message_pipe_(std::move(message_pipe)),
24 task_runner_(std::move(runner)), 24 task_runner_(std::move(runner)),
25 handle_watcher_(task_runner_),
26 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), 25 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr),
27 weak_factory_(this) { 26 weak_factory_(this) {
28 weak_self_ = weak_factory_.GetWeakPtr(); 27 weak_self_ = weak_factory_.GetWeakPtr();
29 // Even though we don't have an incoming receiver, we still want to monitor 28 // Even though we don't have an incoming receiver, we still want to monitor
30 // the message pipe to know if is closed or encounters an error. 29 // the message pipe to know if is closed or encounters an error.
31 WaitToReadMore(); 30 WaitToReadMore();
32 } 31 }
33 32
34 Connector::~Connector() { 33 Connector::~Connector() {
35 { 34 {
36 // Allow for quick destruction on any thread if the pipe is already closed. 35 // Allow for quick destruction on any thread if the pipe is already closed.
37 base::AutoLock lock(connected_lock_); 36 base::AutoLock lock(connected_lock_);
38 if (!connected_) 37 if (!connected_)
39 return; 38 return;
40 } 39 }
41 40
42 DCHECK(thread_checker_.CalledOnValidThread()); 41 DCHECK(thread_checker_.CalledOnValidThread());
43 CancelWait(); 42 CancelWait();
44 } 43 }
45 44
46 void Connector::CloseMessagePipe() { 45 void Connector::CloseMessagePipe() {
47 DCHECK(thread_checker_.CalledOnValidThread()); 46 // Throw away the returned message pipe.
48 47 PassMessagePipe();
49 CancelWait();
50 internal::MayAutoLock locker(lock_.get());
51 message_pipe_.reset();
52
53 base::AutoLock lock(connected_lock_);
54 connected_ = false;
55 } 48 }
56 49
57 ScopedMessagePipeHandle Connector::PassMessagePipe() { 50 ScopedMessagePipeHandle Connector::PassMessagePipe() {
58 DCHECK(thread_checker_.CalledOnValidThread()); 51 DCHECK(thread_checker_.CalledOnValidThread());
59 52
60 CancelWait(); 53 CancelWait();
61 internal::MayAutoLock locker(lock_.get()); 54 internal::MayAutoLock locker(lock_.get());
62 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_); 55 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
56 weak_factory_.InvalidateWeakPtrs();
57 sync_handle_watcher_callback_count_ = 0;
63 58
64 base::AutoLock lock(connected_lock_); 59 base::AutoLock lock(connected_lock_);
65 connected_ = false; 60 connected_ = false;
66 return message_pipe; 61 return message_pipe;
67 } 62 }
68 63
69 void Connector::RaiseError() { 64 void Connector::RaiseError() {
70 DCHECK(thread_checker_.CalledOnValidThread()); 65 DCHECK(thread_checker_.CalledOnValidThread());
71 66
72 HandleError(true, true); 67 HandleError(true, true);
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
187 void Connector::OnWatcherHandleReady(MojoResult result) { 182 void Connector::OnWatcherHandleReady(MojoResult result) {
188 OnHandleReadyInternal(result); 183 OnHandleReadyInternal(result);
189 } 184 }
190 185
191 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { 186 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
192 base::WeakPtr<Connector> weak_self(weak_self_); 187 base::WeakPtr<Connector> weak_self(weak_self_);
193 188
194 sync_handle_watcher_callback_count_++; 189 sync_handle_watcher_callback_count_++;
195 OnHandleReadyInternal(result); 190 OnHandleReadyInternal(result);
196 // At this point, this object might have been deleted. 191 // At this point, this object might have been deleted.
197 if (weak_self) 192 if (weak_self) {
193 DCHECK_LT(0u, sync_handle_watcher_callback_count_);
198 sync_handle_watcher_callback_count_--; 194 sync_handle_watcher_callback_count_--;
195 }
199 } 196 }
200 197
201 void Connector::OnHandleReadyInternal(MojoResult result) { 198 void Connector::OnHandleReadyInternal(MojoResult result) {
202 DCHECK(thread_checker_.CalledOnValidThread()); 199 DCHECK(thread_checker_.CalledOnValidThread());
203 200
204 if (result != MOJO_RESULT_OK) { 201 if (result != MOJO_RESULT_OK) {
205 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 202 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
206 return; 203 return;
207 } 204 }
208 ReadAllAvailableMessages(); 205 ReadAllAvailableMessages();
209 // At this point, this object might have been deleted. Return. 206 // At this point, this object might have been deleted. Return.
210 } 207 }
211 208
212 void Connector::WaitToReadMore() { 209 void Connector::WaitToReadMore() {
213 CHECK(!paused_); 210 CHECK(!paused_);
214 DCHECK(!handle_watcher_.IsWatching()); 211 DCHECK(!handle_watcher_);
215 212
216 MojoResult rv = handle_watcher_.Start( 213 handle_watcher_.reset(new Watcher(task_runner_));
214 MojoResult rv = handle_watcher_->Start(
217 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 215 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
218 base::Bind(&Connector::OnWatcherHandleReady, 216 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
219 base::Unretained(this)));
220 217
221 if (rv != MOJO_RESULT_OK) { 218 if (rv != MOJO_RESULT_OK) {
222 // If the watch failed because the handle is invalid or its conditions can 219 // If the watch failed because the handle is invalid or its conditions can
223 // no longer be met, we signal the error asynchronously to avoid reentry. 220 // no longer be met, we signal the error asynchronously to avoid reentry.
224 task_runner_->PostTask( 221 task_runner_->PostTask(
225 FROM_HERE, 222 FROM_HERE,
226 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); 223 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
227 } 224 }
228 225
229 if (allow_woken_up_by_others_) { 226 if (allow_woken_up_by_others_) {
230 EnsureSyncWatcherExists(); 227 EnsureSyncWatcherExists();
231 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 228 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
232 } 229 }
233 } 230 }
234 231
235 bool Connector::ReadSingleMessage(MojoResult* read_result) { 232 bool Connector::ReadSingleMessage(MojoResult* read_result) {
236 CHECK(!paused_); 233 CHECK(!paused_);
237 234
238 bool receiver_result = false; 235 bool receiver_result = false;
239 236
240 // Detect if |this| was destroyed during message dispatch. Allow for the 237 // Detect if |this| was destroyed or the message pipe was closed/transferred
241 // possibility of re-entering ReadMore() through message dispatch. 238 // during message dispatch.
242 base::WeakPtr<Connector> weak_self = weak_self_; 239 base::WeakPtr<Connector> weak_self = weak_self_;
243 240
244 Message message; 241 Message message;
245 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 242 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
246 *read_result = rv; 243 *read_result = rv;
247 244
248 if (rv == MOJO_RESULT_OK) { 245 if (rv == MOJO_RESULT_OK) {
249 receiver_result = 246 receiver_result =
250 incoming_receiver_ && incoming_receiver_->Accept(&message); 247 incoming_receiver_ && incoming_receiver_->Accept(&message);
251 } 248 }
(...skipping 13 matching lines...) Expand all
265 HandleError(true, false); 262 HandleError(true, false);
266 return false; 263 return false;
267 } 264 }
268 return true; 265 return true;
269 } 266 }
270 267
271 void Connector::ReadAllAvailableMessages() { 268 void Connector::ReadAllAvailableMessages() {
272 while (!error_) { 269 while (!error_) {
273 MojoResult rv; 270 MojoResult rv;
274 271
275 // Return immediately if |this| was destroyed. Do not touch any members! 272 if (!ReadSingleMessage(&rv)) {
276 if (!ReadSingleMessage(&rv)) 273 // Return immediately without touching any members. |this| may have been
274 // destroyed.
277 return; 275 return;
276 }
278 277
279 if (paused_) 278 if (paused_)
280 return; 279 return;
281 280
282 if (rv == MOJO_RESULT_SHOULD_WAIT) 281 if (rv == MOJO_RESULT_SHOULD_WAIT)
283 break; 282 break;
284 } 283 }
285 } 284 }
286 285
287 void Connector::CancelWait() { 286 void Connector::CancelWait() {
288 handle_watcher_.Cancel(); 287 handle_watcher_.reset();
289 sync_watcher_.reset(); 288 sync_watcher_.reset();
290 } 289 }
291 290
292 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { 291 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
293 if (error_ || !message_pipe_.is_valid()) 292 if (error_ || !message_pipe_.is_valid())
294 return; 293 return;
295 294
296 if (paused_) { 295 if (paused_) {
297 // Enforce calling the error handler asynchronously if the user has paused 296 // Enforce calling the error handler asynchronously if the user has paused
298 // receiving messages. We need to wait until the user starts receiving 297 // receiving messages. We need to wait until the user starts receiving
(...skipping 27 matching lines...) Expand all
326 void Connector::EnsureSyncWatcherExists() { 325 void Connector::EnsureSyncWatcherExists() {
327 if (sync_watcher_) 326 if (sync_watcher_)
328 return; 327 return;
329 sync_watcher_.reset(new SyncHandleWatcher( 328 sync_watcher_.reset(new SyncHandleWatcher(
330 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 329 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
331 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, 330 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
332 base::Unretained(this)))); 331 base::Unretained(this))));
333 } 332 }
334 333
335 } // namespace mojo 334 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/connector.h ('k') | mojo/public/cpp/bindings/tests/connector_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698