OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/connector.h" | 5 #include "mojo/public/cpp/bindings/connector.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
181 return sync_watcher_->SyncWatch(should_stop); | 181 return sync_watcher_->SyncWatch(should_stop); |
182 } | 182 } |
183 | 183 |
184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { | 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { |
185 heap_profiler_tag_ = tag; | 185 heap_profiler_tag_ = tag; |
186 if (handle_watcher_) { | 186 if (handle_watcher_) { |
187 handle_watcher_->set_heap_profiler_tag(tag); | 187 handle_watcher_->set_heap_profiler_tag(tag); |
188 } | 188 } |
189 } | 189 } |
190 | 190 |
| 191 void Connector::EnableNestedDispatch(bool enabled) { |
| 192 nested_dispatch_enabled_ = enabled; |
| 193 handle_watcher_.reset(); |
| 194 WaitToReadMore(); |
| 195 } |
| 196 |
191 void Connector::OnWatcherHandleReady(MojoResult result) { | 197 void Connector::OnWatcherHandleReady(MojoResult result) { |
192 OnHandleReadyInternal(result); | 198 OnHandleReadyInternal(result); |
193 } | 199 } |
194 | 200 |
195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 201 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
196 base::WeakPtr<Connector> weak_self(weak_self_); | 202 base::WeakPtr<Connector> weak_self(weak_self_); |
197 | 203 |
198 sync_handle_watcher_callback_count_++; | 204 sync_handle_watcher_callback_count_++; |
199 OnHandleReadyInternal(result); | 205 OnHandleReadyInternal(result); |
200 // At this point, this object might have been deleted. | 206 // At this point, this object might have been deleted. |
201 if (weak_self) { | 207 if (weak_self) { |
202 DCHECK_LT(0u, sync_handle_watcher_callback_count_); | 208 DCHECK_LT(0u, sync_handle_watcher_callback_count_); |
203 sync_handle_watcher_callback_count_--; | 209 sync_handle_watcher_callback_count_--; |
204 } | 210 } |
205 } | 211 } |
206 | 212 |
207 void Connector::OnHandleReadyInternal(MojoResult result) { | 213 void Connector::OnHandleReadyInternal(MojoResult result) { |
208 DCHECK(thread_checker_.CalledOnValidThread()); | 214 DCHECK(thread_checker_.CalledOnValidThread()); |
209 | 215 |
210 if (result != MOJO_RESULT_OK) { | 216 if (result != MOJO_RESULT_OK) { |
211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 217 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
212 return; | 218 return; |
213 } | 219 } |
| 220 |
214 ReadAllAvailableMessages(); | 221 ReadAllAvailableMessages(); |
215 // At this point, this object might have been deleted. Return. | 222 // At this point, this object might have been deleted. Return. |
216 } | 223 } |
217 | 224 |
218 void Connector::WaitToReadMore() { | 225 void Connector::WaitToReadMore() { |
219 CHECK(!paused_); | 226 CHECK(!paused_); |
220 DCHECK(!handle_watcher_); | 227 DCHECK(!handle_watcher_); |
221 | 228 |
222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); | 229 handle_watcher_.reset(new SimpleWatcher( |
| 230 FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_)); |
223 if (heap_profiler_tag_) | 231 if (heap_profiler_tag_) |
224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); | 232 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); |
225 MojoResult rv = handle_watcher_->Start( | 233 MojoResult rv = handle_watcher_->Watch( |
226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 234 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); | 235 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); |
228 | 236 |
229 if (rv != MOJO_RESULT_OK) { | 237 if (rv != MOJO_RESULT_OK) { |
230 // If the watch failed because the handle is invalid or its conditions can | 238 // If the watch failed because the handle is invalid or its conditions can |
231 // no longer be met, we signal the error asynchronously to avoid reentry. | 239 // no longer be met, we signal the error asynchronously to avoid reentry. |
232 task_runner_->PostTask( | 240 task_runner_->PostTask( |
233 FROM_HERE, | 241 FROM_HERE, |
234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 242 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
| 243 } else { |
| 244 handle_watcher_->ArmOrNotify(); |
235 } | 245 } |
236 | 246 |
237 if (allow_woken_up_by_others_) { | 247 if (allow_woken_up_by_others_) { |
238 EnsureSyncWatcherExists(); | 248 EnsureSyncWatcherExists(); |
239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 249 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
240 } | 250 } |
241 } | 251 } |
242 | 252 |
243 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 253 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
244 CHECK(!paused_); | 254 CHECK(!paused_); |
245 | 255 |
246 bool receiver_result = false; | 256 bool receiver_result = false; |
247 | 257 |
248 // Detect if |this| was destroyed or the message pipe was closed/transferred | 258 // Detect if |this| was destroyed or the message pipe was closed/transferred |
249 // during message dispatch. | 259 // during message dispatch. |
250 base::WeakPtr<Connector> weak_self = weak_self_; | 260 base::WeakPtr<Connector> weak_self = weak_self_; |
251 | 261 |
252 Message message; | 262 Message message; |
253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 263 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
254 *read_result = rv; | 264 *read_result = rv; |
255 | 265 |
| 266 if (nested_dispatch_enabled_) { |
| 267 // When supporting nested dispatch, we have to rearm the Watcher immediately |
| 268 // after reading each message (i.e. before dispatch) to ensure that the next |
| 269 // inbound message can trigger OnHandleReady on the nested loop. |
| 270 handle_watcher_->ArmOrNotify(); |
| 271 } |
| 272 |
256 if (rv == MOJO_RESULT_OK) { | 273 if (rv == MOJO_RESULT_OK) { |
257 receiver_result = | 274 receiver_result = |
258 incoming_receiver_ && incoming_receiver_->Accept(&message); | 275 incoming_receiver_ && incoming_receiver_->Accept(&message); |
259 } | 276 } |
260 | 277 |
261 if (!weak_self) | 278 if (!weak_self) |
262 return false; | 279 return false; |
263 | 280 |
264 if (rv == MOJO_RESULT_SHOULD_WAIT) | 281 if (rv == MOJO_RESULT_SHOULD_WAIT) |
265 return true; | 282 return true; |
266 | 283 |
267 if (rv != MOJO_RESULT_OK) { | 284 if (rv != MOJO_RESULT_OK) { |
268 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); | 285 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); |
269 return false; | 286 return false; |
270 } | 287 } |
271 | 288 |
272 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { | 289 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { |
273 HandleError(true, false); | 290 HandleError(true, false); |
274 return false; | 291 return false; |
275 } | 292 } |
276 return true; | 293 return true; |
277 } | 294 } |
278 | 295 |
279 void Connector::ReadAllAvailableMessages() { | 296 void Connector::ReadAllAvailableMessages() { |
280 while (!error_) { | 297 while (!error_) { |
| 298 base::WeakPtr<Connector> weak_self = weak_self_; |
281 MojoResult rv; | 299 MojoResult rv; |
282 | 300 |
283 if (!ReadSingleMessage(&rv)) { | 301 // May delete |this.| |
284 // Return immediately without touching any members. |this| may have been | 302 if (!ReadSingleMessage(&rv)) |
285 // destroyed. | |
286 return; | |
287 } | |
288 | |
289 if (paused_) | |
290 return; | 303 return; |
291 | 304 |
292 if (rv == MOJO_RESULT_SHOULD_WAIT) | 305 if (!weak_self || paused_) |
293 break; | 306 return; |
| 307 |
| 308 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT); |
| 309 |
| 310 if (rv == MOJO_RESULT_SHOULD_WAIT) { |
| 311 // Attempt to re-arm the Watcher. |
| 312 MojoResult ready_result; |
| 313 MojoResult arm_result = handle_watcher_->Arm(&ready_result); |
| 314 if (arm_result == MOJO_RESULT_OK) |
| 315 return; |
| 316 |
| 317 // The watcher is already ready to notify again. |
| 318 DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result); |
| 319 |
| 320 if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) { |
| 321 HandleError(false, false); |
| 322 return; |
| 323 } |
| 324 |
| 325 // There's more to read now, so we'll just keep looping. |
| 326 DCHECK_EQ(MOJO_RESULT_OK, ready_result); |
| 327 } |
294 } | 328 } |
295 } | 329 } |
296 | 330 |
297 void Connector::CancelWait() { | 331 void Connector::CancelWait() { |
298 handle_watcher_.reset(); | 332 handle_watcher_.reset(); |
299 sync_watcher_.reset(); | 333 sync_watcher_.reset(); |
300 } | 334 } |
301 | 335 |
302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { | 336 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
303 if (error_ || !message_pipe_.is_valid()) | 337 if (error_ || !message_pipe_.is_valid()) |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
336 void Connector::EnsureSyncWatcherExists() { | 370 void Connector::EnsureSyncWatcherExists() { |
337 if (sync_watcher_) | 371 if (sync_watcher_) |
338 return; | 372 return; |
339 sync_watcher_.reset(new SyncHandleWatcher( | 373 sync_watcher_.reset(new SyncHandleWatcher( |
340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 374 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | 375 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
342 base::Unretained(this)))); | 376 base::Unretained(this)))); |
343 } | 377 } |
344 | 378 |
345 } // namespace mojo | 379 } // namespace mojo |
OLD | NEW |