| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/connector.h" | 5 #include "mojo/public/cpp/bindings/connector.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 #include <utility> | 8 #include <utility> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 181 return sync_watcher_->SyncWatch(should_stop); | 181 return sync_watcher_->SyncWatch(should_stop); |
| 182 } | 182 } |
| 183 | 183 |
| 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { | 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { |
| 185 heap_profiler_tag_ = tag; | 185 heap_profiler_tag_ = tag; |
| 186 if (handle_watcher_) { | 186 if (handle_watcher_) { |
| 187 handle_watcher_->set_heap_profiler_tag(tag); | 187 handle_watcher_->set_heap_profiler_tag(tag); |
| 188 } | 188 } |
| 189 } | 189 } |
| 190 | 190 |
| 191 void Connector::EnableNestedDispatch(bool enabled) { | |
| 192 nested_dispatch_enabled_ = enabled; | |
| 193 handle_watcher_.reset(); | |
| 194 WaitToReadMore(); | |
| 195 } | |
| 196 | |
| 197 void Connector::OnWatcherHandleReady(MojoResult result) { | 191 void Connector::OnWatcherHandleReady(MojoResult result) { |
| 198 OnHandleReadyInternal(result); | 192 OnHandleReadyInternal(result); |
| 199 } | 193 } |
| 200 | 194 |
| 201 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
| 202 base::WeakPtr<Connector> weak_self(weak_self_); | 196 base::WeakPtr<Connector> weak_self(weak_self_); |
| 203 | 197 |
| 204 sync_handle_watcher_callback_count_++; | 198 sync_handle_watcher_callback_count_++; |
| 205 OnHandleReadyInternal(result); | 199 OnHandleReadyInternal(result); |
| 206 // At this point, this object might have been deleted. | 200 // At this point, this object might have been deleted. |
| 207 if (weak_self) { | 201 if (weak_self) { |
| 208 DCHECK_LT(0u, sync_handle_watcher_callback_count_); | 202 DCHECK_LT(0u, sync_handle_watcher_callback_count_); |
| 209 sync_handle_watcher_callback_count_--; | 203 sync_handle_watcher_callback_count_--; |
| 210 } | 204 } |
| 211 } | 205 } |
| 212 | 206 |
| 213 void Connector::OnHandleReadyInternal(MojoResult result) { | 207 void Connector::OnHandleReadyInternal(MojoResult result) { |
| 214 DCHECK(thread_checker_.CalledOnValidThread()); | 208 DCHECK(thread_checker_.CalledOnValidThread()); |
| 215 | 209 |
| 216 if (result != MOJO_RESULT_OK) { | 210 if (result != MOJO_RESULT_OK) { |
| 217 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
| 218 return; | 212 return; |
| 219 } | 213 } |
| 220 | |
| 221 ReadAllAvailableMessages(); | 214 ReadAllAvailableMessages(); |
| 222 // At this point, this object might have been deleted. Return. | 215 // At this point, this object might have been deleted. Return. |
| 223 } | 216 } |
| 224 | 217 |
| 225 void Connector::WaitToReadMore() { | 218 void Connector::WaitToReadMore() { |
| 226 CHECK(!paused_); | 219 CHECK(!paused_); |
| 227 DCHECK(!handle_watcher_); | 220 DCHECK(!handle_watcher_); |
| 228 | 221 |
| 229 handle_watcher_.reset(new SimpleWatcher( | 222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); |
| 230 FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_)); | |
| 231 if (heap_profiler_tag_) | 223 if (heap_profiler_tag_) |
| 232 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); | 224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); |
| 233 MojoResult rv = handle_watcher_->Watch( | 225 MojoResult rv = handle_watcher_->Start( |
| 234 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 235 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); | 227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); |
| 236 | 228 |
| 237 if (rv != MOJO_RESULT_OK) { | 229 if (rv != MOJO_RESULT_OK) { |
| 238 // If the watch failed because the handle is invalid or its conditions can | 230 // If the watch failed because the handle is invalid or its conditions can |
| 239 // no longer be met, we signal the error asynchronously to avoid reentry. | 231 // no longer be met, we signal the error asynchronously to avoid reentry. |
| 240 task_runner_->PostTask( | 232 task_runner_->PostTask( |
| 241 FROM_HERE, | 233 FROM_HERE, |
| 242 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
| 243 } else { | |
| 244 handle_watcher_->ArmOrNotify(); | |
| 245 } | 235 } |
| 246 | 236 |
| 247 if (allow_woken_up_by_others_) { | 237 if (allow_woken_up_by_others_) { |
| 248 EnsureSyncWatcherExists(); | 238 EnsureSyncWatcherExists(); |
| 249 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 250 } | 240 } |
| 251 } | 241 } |
| 252 | 242 |
| 253 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 243 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
| 254 CHECK(!paused_); | 244 CHECK(!paused_); |
| 255 | 245 |
| 256 bool receiver_result = false; | 246 bool receiver_result = false; |
| 257 | 247 |
| 258 // Detect if |this| was destroyed or the message pipe was closed/transferred | 248 // Detect if |this| was destroyed or the message pipe was closed/transferred |
| 259 // during message dispatch. | 249 // during message dispatch. |
| 260 base::WeakPtr<Connector> weak_self = weak_self_; | 250 base::WeakPtr<Connector> weak_self = weak_self_; |
| 261 | 251 |
| 262 Message message; | 252 Message message; |
| 263 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
| 264 *read_result = rv; | 254 *read_result = rv; |
| 265 | 255 |
| 266 if (nested_dispatch_enabled_) { | |
| 267 // When supporting nested dispatch, we have to rearm the Watcher immediately | |
| 268 // after reading each message (i.e. before dispatch) to ensure that the next | |
| 269 // inbound message can trigger OnHandleReady on the nested loop. | |
| 270 handle_watcher_->ArmOrNotify(); | |
| 271 } | |
| 272 | |
| 273 if (rv == MOJO_RESULT_OK) { | 256 if (rv == MOJO_RESULT_OK) { |
| 274 receiver_result = | 257 receiver_result = |
| 275 incoming_receiver_ && incoming_receiver_->Accept(&message); | 258 incoming_receiver_ && incoming_receiver_->Accept(&message); |
| 276 } | 259 } |
| 277 | 260 |
| 278 if (!weak_self) | 261 if (!weak_self) |
| 279 return false; | 262 return false; |
| 280 | 263 |
| 281 if (rv == MOJO_RESULT_SHOULD_WAIT) | 264 if (rv == MOJO_RESULT_SHOULD_WAIT) |
| 282 return true; | 265 return true; |
| 283 | 266 |
| 284 if (rv != MOJO_RESULT_OK) { | 267 if (rv != MOJO_RESULT_OK) { |
| 285 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); | 268 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); |
| 286 return false; | 269 return false; |
| 287 } | 270 } |
| 288 | 271 |
| 289 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { | 272 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { |
| 290 HandleError(true, false); | 273 HandleError(true, false); |
| 291 return false; | 274 return false; |
| 292 } | 275 } |
| 293 return true; | 276 return true; |
| 294 } | 277 } |
| 295 | 278 |
| 296 void Connector::ReadAllAvailableMessages() { | 279 void Connector::ReadAllAvailableMessages() { |
| 297 while (!error_) { | 280 while (!error_) { |
| 298 base::WeakPtr<Connector> weak_self = weak_self_; | |
| 299 MojoResult rv; | 281 MojoResult rv; |
| 300 | 282 |
| 301 // May delete |this.| | 283 if (!ReadSingleMessage(&rv)) { |
| 302 if (!ReadSingleMessage(&rv)) | 284 // Return immediately without touching any members. |this| may have been |
| 285 // destroyed. |
| 286 return; |
| 287 } |
| 288 |
| 289 if (paused_) |
| 303 return; | 290 return; |
| 304 | 291 |
| 305 if (!weak_self || paused_) | 292 if (rv == MOJO_RESULT_SHOULD_WAIT) |
| 306 return; | 293 break; |
| 307 | |
| 308 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT); | |
| 309 | |
| 310 if (rv == MOJO_RESULT_SHOULD_WAIT) { | |
| 311 // Attempt to re-arm the Watcher. | |
| 312 MojoResult ready_result; | |
| 313 MojoResult arm_result = handle_watcher_->Arm(&ready_result); | |
| 314 if (arm_result == MOJO_RESULT_OK) | |
| 315 return; | |
| 316 | |
| 317 // The watcher is already ready to notify again. | |
| 318 DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result); | |
| 319 | |
| 320 if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) { | |
| 321 HandleError(false, false); | |
| 322 return; | |
| 323 } | |
| 324 | |
| 325 // There's more to read now, so we'll just keep looping. | |
| 326 DCHECK_EQ(MOJO_RESULT_OK, ready_result); | |
| 327 } | |
| 328 } | 294 } |
| 329 } | 295 } |
| 330 | 296 |
| 331 void Connector::CancelWait() { | 297 void Connector::CancelWait() { |
| 332 handle_watcher_.reset(); | 298 handle_watcher_.reset(); |
| 333 sync_watcher_.reset(); | 299 sync_watcher_.reset(); |
| 334 } | 300 } |
| 335 | 301 |
| 336 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { | 302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
| 337 if (error_ || !message_pipe_.is_valid()) | 303 if (error_ || !message_pipe_.is_valid()) |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 370 void Connector::EnsureSyncWatcherExists() { | 336 void Connector::EnsureSyncWatcherExists() { |
| 371 if (sync_watcher_) | 337 if (sync_watcher_) |
| 372 return; | 338 return; |
| 373 sync_watcher_.reset(new SyncHandleWatcher( | 339 sync_watcher_.reset(new SyncHandleWatcher( |
| 374 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 375 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | 341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
| 376 base::Unretained(this)))); | 342 base::Unretained(this)))); |
| 377 } | 343 } |
| 378 | 344 |
| 379 } // namespace mojo | 345 } // namespace mojo |
| OLD | NEW |