OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/connector.h" | 5 #include "mojo/public/cpp/bindings/connector.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
181 return sync_watcher_->SyncWatch(should_stop); | 181 return sync_watcher_->SyncWatch(should_stop); |
182 } | 182 } |
183 | 183 |
184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { | 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { |
185 heap_profiler_tag_ = tag; | 185 heap_profiler_tag_ = tag; |
186 if (handle_watcher_) { | 186 if (handle_watcher_) { |
187 handle_watcher_->set_heap_profiler_tag(tag); | 187 handle_watcher_->set_heap_profiler_tag(tag); |
188 } | 188 } |
189 } | 189 } |
190 | 190 |
191 void Connector::EnableNestedDispatch(bool enabled) { | |
192 nested_dispatch_enabled_ = enabled; | |
193 CancelWait(); | |
yzshen1
2017/03/03 00:03:50
Will this cause problem while we are in the middle
Ken Rockot(use gerrit already)
2017/03/03 00:37:05
Good point. Now I just reset handle_watcher_ befor
| |
194 WaitToReadMore(); | |
195 } | |
196 | |
191 void Connector::OnWatcherHandleReady(MojoResult result) { | 197 void Connector::OnWatcherHandleReady(MojoResult result) { |
192 OnHandleReadyInternal(result); | 198 OnHandleReadyInternal(result); |
193 } | 199 } |
194 | 200 |
195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 201 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
196 base::WeakPtr<Connector> weak_self(weak_self_); | 202 base::WeakPtr<Connector> weak_self(weak_self_); |
197 | 203 |
198 sync_handle_watcher_callback_count_++; | 204 sync_handle_watcher_callback_count_++; |
199 OnHandleReadyInternal(result); | 205 OnHandleReadyInternal(result); |
200 // At this point, this object might have been deleted. | 206 // At this point, this object might have been deleted. |
201 if (weak_self) { | 207 if (weak_self) { |
202 DCHECK_LT(0u, sync_handle_watcher_callback_count_); | 208 DCHECK_LT(0u, sync_handle_watcher_callback_count_); |
203 sync_handle_watcher_callback_count_--; | 209 sync_handle_watcher_callback_count_--; |
204 } | 210 } |
205 } | 211 } |
206 | 212 |
207 void Connector::OnHandleReadyInternal(MojoResult result) { | 213 void Connector::OnHandleReadyInternal(MojoResult result) { |
208 DCHECK(thread_checker_.CalledOnValidThread()); | 214 DCHECK(thread_checker_.CalledOnValidThread()); |
209 | 215 |
210 if (result != MOJO_RESULT_OK) { | 216 if (result != MOJO_RESULT_OK) { |
211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 217 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
212 return; | 218 return; |
213 } | 219 } |
214 ReadAllAvailableMessages(); | 220 |
215 // At this point, this object might have been deleted. Return. | 221 for (;;) { |
yzshen1
2017/03/03 00:03:50
nit: Does it make sense to merge ReadAllAvailableM
Ken Rockot(use gerrit already)
2017/03/03 00:37:05
Done
| |
222 DestructionTracker::Flag was_destroyed(&destruction_tracker_); | |
223 | |
224 // May delete |this|. | |
225 ReadAllAvailableMessages(); | |
226 | |
227 if (was_destroyed) | |
228 return; | |
229 | |
230 // We also may have been paused by some dispatch, in which case we're done. | |
231 if (!handle_watcher_) | |
232 return; | |
233 | |
234 // Attempt to re-arm the Watcher. | |
235 result = handle_watcher_->Arm(); | |
236 switch (result) { | |
237 case MOJO_RESULT_OK: | |
238 // Everything's cool. No more work to do. | |
239 return; | |
240 | |
241 case MOJO_RESULT_ALREADY_EXISTS: | |
242 // The handle is already readable again. Continue reading messagexs. | |
243 break; | |
244 | |
245 case MOJO_RESULT_FAILED_PRECONDITION: | |
246 // The handle will never be readable again. Notify of error immediately. | |
247 // May delete |this|. | |
248 HandleError(false, false); | |
249 return; | |
250 | |
251 default: | |
252 NOTREACHED(); | |
253 break; | |
254 } | |
255 } | |
216 } | 256 } |
217 | 257 |
218 void Connector::WaitToReadMore() { | 258 void Connector::WaitToReadMore() { |
219 CHECK(!paused_); | 259 CHECK(!paused_); |
220 DCHECK(!handle_watcher_); | 260 DCHECK(!handle_watcher_); |
221 | 261 |
222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); | 262 handle_watcher_.reset( |
263 new Watcher(FROM_HERE, Watcher::ArmingPolicy::MANUAL, task_runner_)); | |
223 if (heap_profiler_tag_) | 264 if (heap_profiler_tag_) |
224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); | 265 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); |
225 MojoResult rv = handle_watcher_->Start( | 266 MojoResult rv = handle_watcher_->Start( |
226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 267 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); | 268 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); |
228 | 269 |
229 if (rv != MOJO_RESULT_OK) { | 270 if (rv != MOJO_RESULT_OK) { |
230 // If the watch failed because the handle is invalid or its conditions can | 271 // If the watch failed because the handle is invalid or its conditions can |
231 // no longer be met, we signal the error asynchronously to avoid reentry. | 272 // no longer be met, we signal the error asynchronously to avoid reentry. |
232 task_runner_->PostTask( | 273 task_runner_->PostTask( |
233 FROM_HERE, | 274 FROM_HERE, |
234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 275 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
276 } else { | |
277 handle_watcher_->ArmOrNotify(); | |
235 } | 278 } |
236 | 279 |
237 if (allow_woken_up_by_others_) { | 280 if (allow_woken_up_by_others_) { |
238 EnsureSyncWatcherExists(); | 281 EnsureSyncWatcherExists(); |
239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 282 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
240 } | 283 } |
241 } | 284 } |
242 | 285 |
243 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 286 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
244 CHECK(!paused_); | 287 CHECK(!paused_); |
245 | 288 |
246 bool receiver_result = false; | 289 bool receiver_result = false; |
247 | 290 |
248 // Detect if |this| was destroyed or the message pipe was closed/transferred | 291 // Detect if |this| was destroyed or the message pipe was closed/transferred |
249 // during message dispatch. | 292 // during message dispatch. |
250 base::WeakPtr<Connector> weak_self = weak_self_; | 293 base::WeakPtr<Connector> weak_self = weak_self_; |
251 | 294 |
252 Message message; | 295 Message message; |
253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 296 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
254 *read_result = rv; | 297 *read_result = rv; |
255 | 298 |
299 if (nested_dispatch_enabled_) { | |
300 // When supporting nested dispatch, we have to rearm the Watcher immediately | |
301 // after reading each message (i.e. before dispatch) to ensure that the next | |
302 // inbound message can trigger OnHandleReady on the nested loop. | |
303 handle_watcher_->ArmOrNotify(); | |
304 } | |
305 | |
256 if (rv == MOJO_RESULT_OK) { | 306 if (rv == MOJO_RESULT_OK) { |
257 receiver_result = | 307 receiver_result = |
258 incoming_receiver_ && incoming_receiver_->Accept(&message); | 308 incoming_receiver_ && incoming_receiver_->Accept(&message); |
259 } | 309 } |
260 | 310 |
261 if (!weak_self) | 311 if (!weak_self) |
262 return false; | 312 return false; |
263 | 313 |
264 if (rv == MOJO_RESULT_SHOULD_WAIT) | 314 if (rv == MOJO_RESULT_SHOULD_WAIT) |
265 return true; | 315 return true; |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
336 void Connector::EnsureSyncWatcherExists() { | 386 void Connector::EnsureSyncWatcherExists() { |
337 if (sync_watcher_) | 387 if (sync_watcher_) |
338 return; | 388 return; |
339 sync_watcher_.reset(new SyncHandleWatcher( | 389 sync_watcher_.reset(new SyncHandleWatcher( |
340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 390 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | 391 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
342 base::Unretained(this)))); | 392 base::Unretained(this)))); |
343 } | 393 } |
344 | 394 |
345 } // namespace mojo | 395 } // namespace mojo |
OLD | NEW |