Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(696)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/connector.h" 5 #include "mojo/public/cpp/bindings/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
181 return sync_watcher_->SyncWatch(should_stop); 181 return sync_watcher_->SyncWatch(should_stop);
182 } 182 }
183 183
184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) {
185 heap_profiler_tag_ = tag; 185 heap_profiler_tag_ = tag;
186 if (handle_watcher_) { 186 if (handle_watcher_) {
187 handle_watcher_->set_heap_profiler_tag(tag); 187 handle_watcher_->set_heap_profiler_tag(tag);
188 } 188 }
189 } 189 }
190 190
191 void Connector::EnableNestedDispatch(bool enabled) {
192 nested_dispatch_enabled_ = enabled;
193 CancelWait();
yzshen1 2017/03/03 00:03:50 Will this cause problem while we are in the middle
Ken Rockot(use gerrit already) 2017/03/03 00:37:05 Good point. Now I just reset handle_watcher_ befor
194 WaitToReadMore();
195 }
196
191 void Connector::OnWatcherHandleReady(MojoResult result) { 197 void Connector::OnWatcherHandleReady(MojoResult result) {
192 OnHandleReadyInternal(result); 198 OnHandleReadyInternal(result);
193 } 199 }
194 200
195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { 201 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
196 base::WeakPtr<Connector> weak_self(weak_self_); 202 base::WeakPtr<Connector> weak_self(weak_self_);
197 203
198 sync_handle_watcher_callback_count_++; 204 sync_handle_watcher_callback_count_++;
199 OnHandleReadyInternal(result); 205 OnHandleReadyInternal(result);
200 // At this point, this object might have been deleted. 206 // At this point, this object might have been deleted.
201 if (weak_self) { 207 if (weak_self) {
202 DCHECK_LT(0u, sync_handle_watcher_callback_count_); 208 DCHECK_LT(0u, sync_handle_watcher_callback_count_);
203 sync_handle_watcher_callback_count_--; 209 sync_handle_watcher_callback_count_--;
204 } 210 }
205 } 211 }
206 212
207 void Connector::OnHandleReadyInternal(MojoResult result) { 213 void Connector::OnHandleReadyInternal(MojoResult result) {
208 DCHECK(thread_checker_.CalledOnValidThread()); 214 DCHECK(thread_checker_.CalledOnValidThread());
209 215
210 if (result != MOJO_RESULT_OK) { 216 if (result != MOJO_RESULT_OK) {
211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 217 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
212 return; 218 return;
213 } 219 }
214 ReadAllAvailableMessages(); 220
215 // At this point, this object might have been deleted. Return. 221 for (;;) {
yzshen1 2017/03/03 00:03:50 nit: Does it make sense to merge ReadAllAvailableM
Ken Rockot(use gerrit already) 2017/03/03 00:37:05 Done
222 DestructionTracker::Flag was_destroyed(&destruction_tracker_);
223
224 // May delete |this|.
225 ReadAllAvailableMessages();
226
227 if (was_destroyed)
228 return;
229
230 // We also may have been paused by some dispatch, in which case we're done.
231 if (!handle_watcher_)
232 return;
233
234 // Attempt to re-arm the Watcher.
235 result = handle_watcher_->Arm();
236 switch (result) {
237 case MOJO_RESULT_OK:
238 // Everything's cool. No more work to do.
239 return;
240
241 case MOJO_RESULT_ALREADY_EXISTS:
242 // The handle is already readable again. Continue reading messagexs.
243 break;
244
245 case MOJO_RESULT_FAILED_PRECONDITION:
246 // The handle will never be readable again. Notify of error immediately.
247 // May delete |this|.
248 HandleError(false, false);
249 return;
250
251 default:
252 NOTREACHED();
253 break;
254 }
255 }
216 } 256 }
217 257
218 void Connector::WaitToReadMore() { 258 void Connector::WaitToReadMore() {
219 CHECK(!paused_); 259 CHECK(!paused_);
220 DCHECK(!handle_watcher_); 260 DCHECK(!handle_watcher_);
221 261
222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); 262 handle_watcher_.reset(
263 new Watcher(FROM_HERE, Watcher::ArmingPolicy::MANUAL, task_runner_));
223 if (heap_profiler_tag_) 264 if (heap_profiler_tag_)
224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); 265 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
225 MojoResult rv = handle_watcher_->Start( 266 MojoResult rv = handle_watcher_->Start(
226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 267 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); 268 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
228 269
229 if (rv != MOJO_RESULT_OK) { 270 if (rv != MOJO_RESULT_OK) {
230 // If the watch failed because the handle is invalid or its conditions can 271 // If the watch failed because the handle is invalid or its conditions can
231 // no longer be met, we signal the error asynchronously to avoid reentry. 272 // no longer be met, we signal the error asynchronously to avoid reentry.
232 task_runner_->PostTask( 273 task_runner_->PostTask(
233 FROM_HERE, 274 FROM_HERE,
234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); 275 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
276 } else {
277 handle_watcher_->ArmOrNotify();
235 } 278 }
236 279
237 if (allow_woken_up_by_others_) { 280 if (allow_woken_up_by_others_) {
238 EnsureSyncWatcherExists(); 281 EnsureSyncWatcherExists();
239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 282 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
240 } 283 }
241 } 284 }
242 285
243 bool Connector::ReadSingleMessage(MojoResult* read_result) { 286 bool Connector::ReadSingleMessage(MojoResult* read_result) {
244 CHECK(!paused_); 287 CHECK(!paused_);
245 288
246 bool receiver_result = false; 289 bool receiver_result = false;
247 290
248 // Detect if |this| was destroyed or the message pipe was closed/transferred 291 // Detect if |this| was destroyed or the message pipe was closed/transferred
249 // during message dispatch. 292 // during message dispatch.
250 base::WeakPtr<Connector> weak_self = weak_self_; 293 base::WeakPtr<Connector> weak_self = weak_self_;
251 294
252 Message message; 295 Message message;
253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 296 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
254 *read_result = rv; 297 *read_result = rv;
255 298
299 if (nested_dispatch_enabled_) {
300 // When supporting nested dispatch, we have to rearm the Watcher immediately
301 // after reading each message (i.e. before dispatch) to ensure that the next
302 // inbound message can trigger OnHandleReady on the nested loop.
303 handle_watcher_->ArmOrNotify();
304 }
305
256 if (rv == MOJO_RESULT_OK) { 306 if (rv == MOJO_RESULT_OK) {
257 receiver_result = 307 receiver_result =
258 incoming_receiver_ && incoming_receiver_->Accept(&message); 308 incoming_receiver_ && incoming_receiver_->Accept(&message);
259 } 309 }
260 310
261 if (!weak_self) 311 if (!weak_self)
262 return false; 312 return false;
263 313
264 if (rv == MOJO_RESULT_SHOULD_WAIT) 314 if (rv == MOJO_RESULT_SHOULD_WAIT)
265 return true; 315 return true;
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
336 void Connector::EnsureSyncWatcherExists() { 386 void Connector::EnsureSyncWatcherExists() {
337 if (sync_watcher_) 387 if (sync_watcher_)
338 return; 388 return;
339 sync_watcher_.reset(new SyncHandleWatcher( 389 sync_watcher_.reset(new SyncHandleWatcher(
340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 390 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, 391 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
342 base::Unretained(this)))); 392 base::Unretained(this))));
343 } 393 }
344 394
345 } // namespace mojo 395 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698