Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/connector.h" | 5 #include "mojo/public/cpp/bindings/connector.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 #include <utility> | 8 #include <utility> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 181 return sync_watcher_->SyncWatch(should_stop); | 181 return sync_watcher_->SyncWatch(should_stop); |
| 182 } | 182 } |
| 183 | 183 |
| 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { | 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { |
| 185 heap_profiler_tag_ = tag; | 185 heap_profiler_tag_ = tag; |
| 186 if (handle_watcher_) { | 186 if (handle_watcher_) { |
| 187 handle_watcher_->set_heap_profiler_tag(tag); | 187 handle_watcher_->set_heap_profiler_tag(tag); |
| 188 } | 188 } |
| 189 } | 189 } |
| 190 | 190 |
| 191 void Connector::EnableNestedDispatch(bool enabled) { | |
| 192 nested_dispatch_enabled_ = enabled; | |
| 193 CancelWait(); | |
|
yzshen1
2017/03/03 00:03:50
Will this cause problem while we are in the middle
Ken Rockot(use gerrit already)
2017/03/03 00:37:05
Good point. Now I just reset handle_watcher_ befor
| |
| 194 WaitToReadMore(); | |
| 195 } | |
| 196 | |
| 191 void Connector::OnWatcherHandleReady(MojoResult result) { | 197 void Connector::OnWatcherHandleReady(MojoResult result) { |
| 192 OnHandleReadyInternal(result); | 198 OnHandleReadyInternal(result); |
| 193 } | 199 } |
| 194 | 200 |
| 195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 201 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
| 196 base::WeakPtr<Connector> weak_self(weak_self_); | 202 base::WeakPtr<Connector> weak_self(weak_self_); |
| 197 | 203 |
| 198 sync_handle_watcher_callback_count_++; | 204 sync_handle_watcher_callback_count_++; |
| 199 OnHandleReadyInternal(result); | 205 OnHandleReadyInternal(result); |
| 200 // At this point, this object might have been deleted. | 206 // At this point, this object might have been deleted. |
| 201 if (weak_self) { | 207 if (weak_self) { |
| 202 DCHECK_LT(0u, sync_handle_watcher_callback_count_); | 208 DCHECK_LT(0u, sync_handle_watcher_callback_count_); |
| 203 sync_handle_watcher_callback_count_--; | 209 sync_handle_watcher_callback_count_--; |
| 204 } | 210 } |
| 205 } | 211 } |
| 206 | 212 |
| 207 void Connector::OnHandleReadyInternal(MojoResult result) { | 213 void Connector::OnHandleReadyInternal(MojoResult result) { |
| 208 DCHECK(thread_checker_.CalledOnValidThread()); | 214 DCHECK(thread_checker_.CalledOnValidThread()); |
| 209 | 215 |
| 210 if (result != MOJO_RESULT_OK) { | 216 if (result != MOJO_RESULT_OK) { |
| 211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 217 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
| 212 return; | 218 return; |
| 213 } | 219 } |
| 214 ReadAllAvailableMessages(); | 220 |
| 215 // At this point, this object might have been deleted. Return. | 221 for (;;) { |
|
yzshen1
2017/03/03 00:03:50
nit: Does it make sense to merge ReadAllAvailableM
Ken Rockot(use gerrit already)
2017/03/03 00:37:05
Done
| |
| 222 DestructionTracker::Flag was_destroyed(&destruction_tracker_); | |
| 223 | |
| 224 // May delete |this|. | |
| 225 ReadAllAvailableMessages(); | |
| 226 | |
| 227 if (was_destroyed) | |
| 228 return; | |
| 229 | |
| 230 // We also may have been paused by some dispatch, in which case we're done. | |
| 231 if (!handle_watcher_) | |
| 232 return; | |
| 233 | |
| 234 // Attempt to re-arm the Watcher. | |
| 235 result = handle_watcher_->Arm(); | |
| 236 switch (result) { | |
| 237 case MOJO_RESULT_OK: | |
| 238 // Everything's cool. No more work to do. | |
| 239 return; | |
| 240 | |
| 241 case MOJO_RESULT_ALREADY_EXISTS: | |
| 242 // The handle is already readable again. Continue reading messagexs. | |
| 243 break; | |
| 244 | |
| 245 case MOJO_RESULT_FAILED_PRECONDITION: | |
| 246 // The handle will never be readable again. Notify of error immediately. | |
| 247 // May delete |this|. | |
| 248 HandleError(false, false); | |
| 249 return; | |
| 250 | |
| 251 default: | |
| 252 NOTREACHED(); | |
| 253 break; | |
| 254 } | |
| 255 } | |
| 216 } | 256 } |
| 217 | 257 |
| 218 void Connector::WaitToReadMore() { | 258 void Connector::WaitToReadMore() { |
| 219 CHECK(!paused_); | 259 CHECK(!paused_); |
| 220 DCHECK(!handle_watcher_); | 260 DCHECK(!handle_watcher_); |
| 221 | 261 |
| 222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); | 262 handle_watcher_.reset( |
| 263 new Watcher(FROM_HERE, Watcher::ArmingPolicy::MANUAL, task_runner_)); | |
| 223 if (heap_profiler_tag_) | 264 if (heap_profiler_tag_) |
| 224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); | 265 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); |
| 225 MojoResult rv = handle_watcher_->Start( | 266 MojoResult rv = handle_watcher_->Start( |
| 226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 267 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); | 268 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); |
| 228 | 269 |
| 229 if (rv != MOJO_RESULT_OK) { | 270 if (rv != MOJO_RESULT_OK) { |
| 230 // If the watch failed because the handle is invalid or its conditions can | 271 // If the watch failed because the handle is invalid or its conditions can |
| 231 // no longer be met, we signal the error asynchronously to avoid reentry. | 272 // no longer be met, we signal the error asynchronously to avoid reentry. |
| 232 task_runner_->PostTask( | 273 task_runner_->PostTask( |
| 233 FROM_HERE, | 274 FROM_HERE, |
| 234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 275 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
| 276 } else { | |
| 277 handle_watcher_->ArmOrNotify(); | |
| 235 } | 278 } |
| 236 | 279 |
| 237 if (allow_woken_up_by_others_) { | 280 if (allow_woken_up_by_others_) { |
| 238 EnsureSyncWatcherExists(); | 281 EnsureSyncWatcherExists(); |
| 239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 282 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 240 } | 283 } |
| 241 } | 284 } |
| 242 | 285 |
| 243 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 286 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
| 244 CHECK(!paused_); | 287 CHECK(!paused_); |
| 245 | 288 |
| 246 bool receiver_result = false; | 289 bool receiver_result = false; |
| 247 | 290 |
| 248 // Detect if |this| was destroyed or the message pipe was closed/transferred | 291 // Detect if |this| was destroyed or the message pipe was closed/transferred |
| 249 // during message dispatch. | 292 // during message dispatch. |
| 250 base::WeakPtr<Connector> weak_self = weak_self_; | 293 base::WeakPtr<Connector> weak_self = weak_self_; |
| 251 | 294 |
| 252 Message message; | 295 Message message; |
| 253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 296 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
| 254 *read_result = rv; | 297 *read_result = rv; |
| 255 | 298 |
| 299 if (nested_dispatch_enabled_) { | |
| 300 // When supporting nested dispatch, we have to rearm the Watcher immediately | |
| 301 // after reading each message (i.e. before dispatch) to ensure that the next | |
| 302 // inbound message can trigger OnHandleReady on the nested loop. | |
| 303 handle_watcher_->ArmOrNotify(); | |
| 304 } | |
| 305 | |
| 256 if (rv == MOJO_RESULT_OK) { | 306 if (rv == MOJO_RESULT_OK) { |
| 257 receiver_result = | 307 receiver_result = |
| 258 incoming_receiver_ && incoming_receiver_->Accept(&message); | 308 incoming_receiver_ && incoming_receiver_->Accept(&message); |
| 259 } | 309 } |
| 260 | 310 |
| 261 if (!weak_self) | 311 if (!weak_self) |
| 262 return false; | 312 return false; |
| 263 | 313 |
| 264 if (rv == MOJO_RESULT_SHOULD_WAIT) | 314 if (rv == MOJO_RESULT_SHOULD_WAIT) |
| 265 return true; | 315 return true; |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 336 void Connector::EnsureSyncWatcherExists() { | 386 void Connector::EnsureSyncWatcherExists() { |
| 337 if (sync_watcher_) | 387 if (sync_watcher_) |
| 338 return; | 388 return; |
| 339 sync_watcher_.reset(new SyncHandleWatcher( | 389 sync_watcher_.reset(new SyncHandleWatcher( |
| 340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 390 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| 341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | 391 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
| 342 base::Unretained(this)))); | 392 base::Unretained(this)))); |
| 343 } | 393 } |
| 344 | 394 |
| 345 } // namespace mojo | 395 } // namespace mojo |
| OLD | NEW |