Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/connector.h" 5 #include "mojo/public/cpp/bindings/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
181 return sync_watcher_->SyncWatch(should_stop); 181 return sync_watcher_->SyncWatch(should_stop);
182 } 182 }
183 183
184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) {
185 heap_profiler_tag_ = tag; 185 heap_profiler_tag_ = tag;
186 if (handle_watcher_) { 186 if (handle_watcher_) {
187 handle_watcher_->set_heap_profiler_tag(tag); 187 handle_watcher_->set_heap_profiler_tag(tag);
188 } 188 }
189 } 189 }
190 190
191 void Connector::EnableNestedDispatch(bool enabled) {
192 nested_dispatch_enabled_ = enabled;
193 handle_watcher_.reset();
194 WaitToReadMore();
195 }
196
191 void Connector::OnWatcherHandleReady(MojoResult result) { 197 void Connector::OnWatcherHandleReady(MojoResult result) {
192 OnHandleReadyInternal(result); 198 OnHandleReadyInternal(result);
193 } 199 }
194 200
195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { 201 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
196 base::WeakPtr<Connector> weak_self(weak_self_); 202 base::WeakPtr<Connector> weak_self(weak_self_);
197 203
198 sync_handle_watcher_callback_count_++; 204 sync_handle_watcher_callback_count_++;
199 OnHandleReadyInternal(result); 205 OnHandleReadyInternal(result);
200 // At this point, this object might have been deleted. 206 // At this point, this object might have been deleted.
201 if (weak_self) { 207 if (weak_self) {
202 DCHECK_LT(0u, sync_handle_watcher_callback_count_); 208 DCHECK_LT(0u, sync_handle_watcher_callback_count_);
203 sync_handle_watcher_callback_count_--; 209 sync_handle_watcher_callback_count_--;
204 } 210 }
205 } 211 }
206 212
207 void Connector::OnHandleReadyInternal(MojoResult result) { 213 void Connector::OnHandleReadyInternal(MojoResult result) {
208 DCHECK(thread_checker_.CalledOnValidThread()); 214 DCHECK(thread_checker_.CalledOnValidThread());
209 215
210 if (result != MOJO_RESULT_OK) { 216 if (result != MOJO_RESULT_OK) {
211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 217 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
212 return; 218 return;
213 } 219 }
220
214 ReadAllAvailableMessages(); 221 ReadAllAvailableMessages();
215 // At this point, this object might have been deleted. Return. 222 // At this point, this object might have been deleted. Return.
216 } 223 }
217 224
218 void Connector::WaitToReadMore() { 225 void Connector::WaitToReadMore() {
219 CHECK(!paused_); 226 CHECK(!paused_);
220 DCHECK(!handle_watcher_); 227 DCHECK(!handle_watcher_);
221 228
222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); 229 handle_watcher_.reset(new SimpleWatcher(
230 FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_));
223 if (heap_profiler_tag_) 231 if (heap_profiler_tag_)
224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); 232 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
225 MojoResult rv = handle_watcher_->Start( 233 MojoResult rv = handle_watcher_->Watch(
226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 234 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); 235 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
228 236
229 if (rv != MOJO_RESULT_OK) { 237 if (rv != MOJO_RESULT_OK) {
230 // If the watch failed because the handle is invalid or its conditions can 238 // If the watch failed because the handle is invalid or its conditions can
231 // no longer be met, we signal the error asynchronously to avoid reentry. 239 // no longer be met, we signal the error asynchronously to avoid reentry.
232 task_runner_->PostTask( 240 task_runner_->PostTask(
233 FROM_HERE, 241 FROM_HERE,
234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); 242 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
243 } else {
244 handle_watcher_->ArmOrNotify();
235 } 245 }
236 246
237 if (allow_woken_up_by_others_) { 247 if (allow_woken_up_by_others_) {
238 EnsureSyncWatcherExists(); 248 EnsureSyncWatcherExists();
239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 249 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
240 } 250 }
241 } 251 }
242 252
243 bool Connector::ReadSingleMessage(MojoResult* read_result) { 253 bool Connector::ReadSingleMessage(MojoResult* read_result) {
244 CHECK(!paused_); 254 CHECK(!paused_);
245 255
246 bool receiver_result = false; 256 bool receiver_result = false;
247 257
248 // Detect if |this| was destroyed or the message pipe was closed/transferred 258 // Detect if |this| was destroyed or the message pipe was closed/transferred
249 // during message dispatch. 259 // during message dispatch.
250 base::WeakPtr<Connector> weak_self = weak_self_; 260 base::WeakPtr<Connector> weak_self = weak_self_;
251 261
252 Message message; 262 Message message;
253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 263 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
254 *read_result = rv; 264 *read_result = rv;
255 265
266 if (nested_dispatch_enabled_) {
267 // When supporting nested dispatch, we have to rearm the Watcher immediately
268 // after reading each message (i.e. before dispatch) to ensure that the next
269 // inbound message can trigger OnHandleReady on the nested loop.
270 handle_watcher_->ArmOrNotify();
271 }
272
256 if (rv == MOJO_RESULT_OK) { 273 if (rv == MOJO_RESULT_OK) {
257 receiver_result = 274 receiver_result =
258 incoming_receiver_ && incoming_receiver_->Accept(&message); 275 incoming_receiver_ && incoming_receiver_->Accept(&message);
259 } 276 }
260 277
261 if (!weak_self) 278 if (!weak_self)
262 return false; 279 return false;
263 280
264 if (rv == MOJO_RESULT_SHOULD_WAIT) 281 if (rv == MOJO_RESULT_SHOULD_WAIT)
265 return true; 282 return true;
266 283
267 if (rv != MOJO_RESULT_OK) { 284 if (rv != MOJO_RESULT_OK) {
268 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 285 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
269 return false; 286 return false;
270 } 287 }
271 288
272 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { 289 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
273 HandleError(true, false); 290 HandleError(true, false);
274 return false; 291 return false;
275 } 292 }
276 return true; 293 return true;
277 } 294 }
278 295
279 void Connector::ReadAllAvailableMessages() { 296 void Connector::ReadAllAvailableMessages() {
280 while (!error_) { 297 while (!error_) {
298 base::WeakPtr<Connector> weak_self = weak_self_;
281 MojoResult rv; 299 MojoResult rv;
282 300
283 if (!ReadSingleMessage(&rv)) { 301 // May delete |this.|
284 // Return immediately without touching any members. |this| may have been 302 if (!ReadSingleMessage(&rv))
285 // destroyed.
286 return;
287 }
288
289 if (paused_)
290 return; 303 return;
291 304
292 if (rv == MOJO_RESULT_SHOULD_WAIT) 305 if (!weak_self || paused_)
293 break; 306 return;
307
308 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT);
309
310 if (rv == MOJO_RESULT_SHOULD_WAIT) {
311 // Attempt to re-arm the Watcher.
312 MojoResult ready_result;
313 MojoResult arm_result = handle_watcher_->Arm(&ready_result);
314 if (arm_result == MOJO_RESULT_OK)
315 return;
316
317 // The watcher is already ready to notify again.
318 DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result);
319
320 if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) {
321 HandleError(false, false);
322 return;
323 }
324
325 // There's more to read now, so we'll just keep looping.
326 DCHECK_EQ(MOJO_RESULT_OK, ready_result);
327 }
294 } 328 }
295 } 329 }
296 330
297 void Connector::CancelWait() { 331 void Connector::CancelWait() {
298 handle_watcher_.reset(); 332 handle_watcher_.reset();
299 sync_watcher_.reset(); 333 sync_watcher_.reset();
300 } 334 }
301 335
302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { 336 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
303 if (error_ || !message_pipe_.is_valid()) 337 if (error_ || !message_pipe_.is_valid())
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
336 void Connector::EnsureSyncWatcherExists() { 370 void Connector::EnsureSyncWatcherExists() {
337 if (sync_watcher_) 371 if (sync_watcher_)
338 return; 372 return;
339 sync_watcher_.reset(new SyncHandleWatcher( 373 sync_watcher_.reset(new SyncHandleWatcher(
340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 374 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, 375 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
342 base::Unretained(this)))); 376 base::Unretained(this))));
343 } 377 }
344 378
345 } // namespace mojo 379 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698