OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/connector.h" | 5 #include "mojo/public/cpp/bindings/connector.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
11 #include "base/location.h" | 11 #include "base/location.h" |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/macros.h" | 13 #include "base/macros.h" |
14 #include "base/synchronization/lock.h" | 14 #include "base/synchronization/lock.h" |
15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" | 15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" |
16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | 16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
17 | 17 |
18 namespace mojo { | 18 namespace mojo { |
19 | 19 |
20 Connector::Connector(ScopedMessagePipeHandle message_pipe, | 20 Connector::Connector(ScopedMessagePipeHandle message_pipe, |
21 ConnectorConfig config, | 21 ConnectorConfig config, |
22 scoped_refptr<base::SingleThreadTaskRunner> runner) | 22 scoped_refptr<base::SingleThreadTaskRunner> runner) |
23 : message_pipe_(std::move(message_pipe)), | 23 : message_pipe_(std::move(message_pipe)), |
24 task_runner_(std::move(runner)), | 24 task_runner_(std::move(runner)), |
25 handle_watcher_(task_runner_), | |
26 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), | 25 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), |
27 weak_factory_(this) { | 26 weak_factory_(this) { |
28 weak_self_ = weak_factory_.GetWeakPtr(); | 27 weak_self_ = weak_factory_.GetWeakPtr(); |
29 // Even though we don't have an incoming receiver, we still want to monitor | 28 // Even though we don't have an incoming receiver, we still want to monitor |
30 // the message pipe to know if is closed or encounters an error. | 29 // the message pipe to know if is closed or encounters an error. |
31 WaitToReadMore(); | 30 WaitToReadMore(); |
32 } | 31 } |
33 | 32 |
34 Connector::~Connector() { | 33 Connector::~Connector() { |
35 { | 34 { |
36 // Allow for quick destruction on any thread if the pipe is already closed. | 35 // Allow for quick destruction on any thread if the pipe is already closed. |
37 base::AutoLock lock(connected_lock_); | 36 base::AutoLock lock(connected_lock_); |
38 if (!connected_) | 37 if (!connected_) |
39 return; | 38 return; |
40 } | 39 } |
41 | 40 |
42 DCHECK(thread_checker_.CalledOnValidThread()); | 41 DCHECK(thread_checker_.CalledOnValidThread()); |
43 CancelWait(); | 42 CancelWait(); |
44 } | 43 } |
45 | 44 |
46 void Connector::CloseMessagePipe() { | 45 void Connector::CloseMessagePipe() { |
47 DCHECK(thread_checker_.CalledOnValidThread()); | 46 // Throw away the returned message pipe. |
48 | 47 PassMessagePipe(); |
49 CancelWait(); | |
50 internal::MayAutoLock locker(lock_.get()); | |
51 message_pipe_.reset(); | |
52 | |
53 base::AutoLock lock(connected_lock_); | |
54 connected_ = false; | |
55 } | 48 } |
56 | 49 |
57 ScopedMessagePipeHandle Connector::PassMessagePipe() { | 50 ScopedMessagePipeHandle Connector::PassMessagePipe() { |
58 DCHECK(thread_checker_.CalledOnValidThread()); | 51 DCHECK(thread_checker_.CalledOnValidThread()); |
59 | 52 |
60 CancelWait(); | 53 CancelWait(); |
61 internal::MayAutoLock locker(lock_.get()); | 54 internal::MayAutoLock locker(lock_.get()); |
62 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_); | 55 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_); |
| 56 weak_factory_.InvalidateWeakPtrs(); |
| 57 sync_handle_watcher_callback_count_ = 0; |
63 | 58 |
64 base::AutoLock lock(connected_lock_); | 59 base::AutoLock lock(connected_lock_); |
65 connected_ = false; | 60 connected_ = false; |
66 return message_pipe; | 61 return message_pipe; |
67 } | 62 } |
68 | 63 |
69 void Connector::RaiseError() { | 64 void Connector::RaiseError() { |
70 DCHECK(thread_checker_.CalledOnValidThread()); | 65 DCHECK(thread_checker_.CalledOnValidThread()); |
71 | 66 |
72 HandleError(true, true); | 67 HandleError(true, true); |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
187 void Connector::OnWatcherHandleReady(MojoResult result) { | 182 void Connector::OnWatcherHandleReady(MojoResult result) { |
188 OnHandleReadyInternal(result); | 183 OnHandleReadyInternal(result); |
189 } | 184 } |
190 | 185 |
191 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 186 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
192 base::WeakPtr<Connector> weak_self(weak_self_); | 187 base::WeakPtr<Connector> weak_self(weak_self_); |
193 | 188 |
194 sync_handle_watcher_callback_count_++; | 189 sync_handle_watcher_callback_count_++; |
195 OnHandleReadyInternal(result); | 190 OnHandleReadyInternal(result); |
196 // At this point, this object might have been deleted. | 191 // At this point, this object might have been deleted. |
197 if (weak_self) | 192 if (weak_self) { |
| 193 DCHECK_LT(0u, sync_handle_watcher_callback_count_); |
198 sync_handle_watcher_callback_count_--; | 194 sync_handle_watcher_callback_count_--; |
| 195 } |
199 } | 196 } |
200 | 197 |
201 void Connector::OnHandleReadyInternal(MojoResult result) { | 198 void Connector::OnHandleReadyInternal(MojoResult result) { |
202 DCHECK(thread_checker_.CalledOnValidThread()); | 199 DCHECK(thread_checker_.CalledOnValidThread()); |
203 | 200 |
204 if (result != MOJO_RESULT_OK) { | 201 if (result != MOJO_RESULT_OK) { |
205 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 202 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
206 return; | 203 return; |
207 } | 204 } |
208 ReadAllAvailableMessages(); | 205 ReadAllAvailableMessages(); |
209 // At this point, this object might have been deleted. Return. | 206 // At this point, this object might have been deleted. Return. |
210 } | 207 } |
211 | 208 |
212 void Connector::WaitToReadMore() { | 209 void Connector::WaitToReadMore() { |
213 CHECK(!paused_); | 210 CHECK(!paused_); |
214 DCHECK(!handle_watcher_.IsWatching()); | 211 DCHECK(!handle_watcher_); |
215 | 212 |
216 MojoResult rv = handle_watcher_.Start( | 213 handle_watcher_.reset(new Watcher(task_runner_)); |
| 214 MojoResult rv = handle_watcher_->Start( |
217 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 215 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
218 base::Bind(&Connector::OnWatcherHandleReady, | 216 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); |
219 base::Unretained(this))); | |
220 | 217 |
221 if (rv != MOJO_RESULT_OK) { | 218 if (rv != MOJO_RESULT_OK) { |
222 // If the watch failed because the handle is invalid or its conditions can | 219 // If the watch failed because the handle is invalid or its conditions can |
223 // no longer be met, we signal the error asynchronously to avoid reentry. | 220 // no longer be met, we signal the error asynchronously to avoid reentry. |
224 task_runner_->PostTask( | 221 task_runner_->PostTask( |
225 FROM_HERE, | 222 FROM_HERE, |
226 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 223 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
227 } | 224 } |
228 | 225 |
229 if (allow_woken_up_by_others_) { | 226 if (allow_woken_up_by_others_) { |
230 EnsureSyncWatcherExists(); | 227 EnsureSyncWatcherExists(); |
231 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 228 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
232 } | 229 } |
233 } | 230 } |
234 | 231 |
235 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 232 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
236 CHECK(!paused_); | 233 CHECK(!paused_); |
237 | 234 |
238 bool receiver_result = false; | 235 bool receiver_result = false; |
239 | 236 |
240 // Detect if |this| was destroyed during message dispatch. Allow for the | 237 // Detect if |this| was destroyed or the message pipe was closed/transferred |
241 // possibility of re-entering ReadMore() through message dispatch. | 238 // during message dispatch. |
242 base::WeakPtr<Connector> weak_self = weak_self_; | 239 base::WeakPtr<Connector> weak_self = weak_self_; |
243 | 240 |
244 Message message; | 241 Message message; |
245 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 242 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
246 *read_result = rv; | 243 *read_result = rv; |
247 | 244 |
248 if (rv == MOJO_RESULT_OK) { | 245 if (rv == MOJO_RESULT_OK) { |
249 receiver_result = | 246 receiver_result = |
250 incoming_receiver_ && incoming_receiver_->Accept(&message); | 247 incoming_receiver_ && incoming_receiver_->Accept(&message); |
251 } | 248 } |
(...skipping 13 matching lines...) Expand all Loading... |
265 HandleError(true, false); | 262 HandleError(true, false); |
266 return false; | 263 return false; |
267 } | 264 } |
268 return true; | 265 return true; |
269 } | 266 } |
270 | 267 |
271 void Connector::ReadAllAvailableMessages() { | 268 void Connector::ReadAllAvailableMessages() { |
272 while (!error_) { | 269 while (!error_) { |
273 MojoResult rv; | 270 MojoResult rv; |
274 | 271 |
275 // Return immediately if |this| was destroyed. Do not touch any members! | 272 if (!ReadSingleMessage(&rv)) { |
276 if (!ReadSingleMessage(&rv)) | 273 // Return immediately without touching any members. |this| may have been |
| 274 // destroyed. |
277 return; | 275 return; |
| 276 } |
278 | 277 |
279 if (paused_) | 278 if (paused_) |
280 return; | 279 return; |
281 | 280 |
282 if (rv == MOJO_RESULT_SHOULD_WAIT) | 281 if (rv == MOJO_RESULT_SHOULD_WAIT) |
283 break; | 282 break; |
284 } | 283 } |
285 } | 284 } |
286 | 285 |
287 void Connector::CancelWait() { | 286 void Connector::CancelWait() { |
288 handle_watcher_.Cancel(); | 287 handle_watcher_.reset(); |
289 sync_watcher_.reset(); | 288 sync_watcher_.reset(); |
290 } | 289 } |
291 | 290 |
292 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { | 291 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
293 if (error_ || !message_pipe_.is_valid()) | 292 if (error_ || !message_pipe_.is_valid()) |
294 return; | 293 return; |
295 | 294 |
296 if (paused_) { | 295 if (paused_) { |
297 // Enforce calling the error handler asynchronously if the user has paused | 296 // Enforce calling the error handler asynchronously if the user has paused |
298 // receiving messages. We need to wait until the user starts receiving | 297 // receiving messages. We need to wait until the user starts receiving |
(...skipping 27 matching lines...) Expand all Loading... |
326 void Connector::EnsureSyncWatcherExists() { | 325 void Connector::EnsureSyncWatcherExists() { |
327 if (sync_watcher_) | 326 if (sync_watcher_) |
328 return; | 327 return; |
329 sync_watcher_.reset(new SyncHandleWatcher( | 328 sync_watcher_.reset(new SyncHandleWatcher( |
330 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 329 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
331 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | 330 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
332 base::Unretained(this)))); | 331 base::Unretained(this)))); |
333 } | 332 } |
334 | 333 |
335 } // namespace mojo | 334 } // namespace mojo |
OLD | NEW |