Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(508)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 2750373002: Revert of Mojo: Armed Watchers (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/connector.h" 5 #include "mojo/public/cpp/bindings/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
181 return sync_watcher_->SyncWatch(should_stop); 181 return sync_watcher_->SyncWatch(should_stop);
182 } 182 }
183 183
184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) {
185 heap_profiler_tag_ = tag; 185 heap_profiler_tag_ = tag;
186 if (handle_watcher_) { 186 if (handle_watcher_) {
187 handle_watcher_->set_heap_profiler_tag(tag); 187 handle_watcher_->set_heap_profiler_tag(tag);
188 } 188 }
189 } 189 }
190 190
191 void Connector::EnableNestedDispatch(bool enabled) {
192 nested_dispatch_enabled_ = enabled;
193 handle_watcher_.reset();
194 WaitToReadMore();
195 }
196
197 void Connector::OnWatcherHandleReady(MojoResult result) { 191 void Connector::OnWatcherHandleReady(MojoResult result) {
198 OnHandleReadyInternal(result); 192 OnHandleReadyInternal(result);
199 } 193 }
200 194
201 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { 195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
202 base::WeakPtr<Connector> weak_self(weak_self_); 196 base::WeakPtr<Connector> weak_self(weak_self_);
203 197
204 sync_handle_watcher_callback_count_++; 198 sync_handle_watcher_callback_count_++;
205 OnHandleReadyInternal(result); 199 OnHandleReadyInternal(result);
206 // At this point, this object might have been deleted. 200 // At this point, this object might have been deleted.
207 if (weak_self) { 201 if (weak_self) {
208 DCHECK_LT(0u, sync_handle_watcher_callback_count_); 202 DCHECK_LT(0u, sync_handle_watcher_callback_count_);
209 sync_handle_watcher_callback_count_--; 203 sync_handle_watcher_callback_count_--;
210 } 204 }
211 } 205 }
212 206
213 void Connector::OnHandleReadyInternal(MojoResult result) { 207 void Connector::OnHandleReadyInternal(MojoResult result) {
214 DCHECK(thread_checker_.CalledOnValidThread()); 208 DCHECK(thread_checker_.CalledOnValidThread());
215 209
216 if (result != MOJO_RESULT_OK) { 210 if (result != MOJO_RESULT_OK) {
217 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
218 return; 212 return;
219 } 213 }
220
221 ReadAllAvailableMessages(); 214 ReadAllAvailableMessages();
222 // At this point, this object might have been deleted. Return. 215 // At this point, this object might have been deleted. Return.
223 } 216 }
224 217
225 void Connector::WaitToReadMore() { 218 void Connector::WaitToReadMore() {
226 CHECK(!paused_); 219 CHECK(!paused_);
227 DCHECK(!handle_watcher_); 220 DCHECK(!handle_watcher_);
228 221
229 handle_watcher_.reset(new SimpleWatcher( 222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_));
230 FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_));
231 if (heap_profiler_tag_) 223 if (heap_profiler_tag_)
232 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); 224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
233 MojoResult rv = handle_watcher_->Watch( 225 MojoResult rv = handle_watcher_->Start(
234 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
235 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); 227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
236 228
237 if (rv != MOJO_RESULT_OK) { 229 if (rv != MOJO_RESULT_OK) {
238 // If the watch failed because the handle is invalid or its conditions can 230 // If the watch failed because the handle is invalid or its conditions can
239 // no longer be met, we signal the error asynchronously to avoid reentry. 231 // no longer be met, we signal the error asynchronously to avoid reentry.
240 task_runner_->PostTask( 232 task_runner_->PostTask(
241 FROM_HERE, 233 FROM_HERE,
242 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); 234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
243 } else {
244 handle_watcher_->ArmOrNotify();
245 } 235 }
246 236
247 if (allow_woken_up_by_others_) { 237 if (allow_woken_up_by_others_) {
248 EnsureSyncWatcherExists(); 238 EnsureSyncWatcherExists();
249 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
250 } 240 }
251 } 241 }
252 242
253 bool Connector::ReadSingleMessage(MojoResult* read_result) { 243 bool Connector::ReadSingleMessage(MojoResult* read_result) {
254 CHECK(!paused_); 244 CHECK(!paused_);
255 245
256 bool receiver_result = false; 246 bool receiver_result = false;
257 247
258 // Detect if |this| was destroyed or the message pipe was closed/transferred 248 // Detect if |this| was destroyed or the message pipe was closed/transferred
259 // during message dispatch. 249 // during message dispatch.
260 base::WeakPtr<Connector> weak_self = weak_self_; 250 base::WeakPtr<Connector> weak_self = weak_self_;
261 251
262 Message message; 252 Message message;
263 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
264 *read_result = rv; 254 *read_result = rv;
265 255
266 if (nested_dispatch_enabled_) {
267 // When supporting nested dispatch, we have to rearm the Watcher immediately
268 // after reading each message (i.e. before dispatch) to ensure that the next
269 // inbound message can trigger OnHandleReady on the nested loop.
270 handle_watcher_->ArmOrNotify();
271 }
272
273 if (rv == MOJO_RESULT_OK) { 256 if (rv == MOJO_RESULT_OK) {
274 receiver_result = 257 receiver_result =
275 incoming_receiver_ && incoming_receiver_->Accept(&message); 258 incoming_receiver_ && incoming_receiver_->Accept(&message);
276 } 259 }
277 260
278 if (!weak_self) 261 if (!weak_self)
279 return false; 262 return false;
280 263
281 if (rv == MOJO_RESULT_SHOULD_WAIT) 264 if (rv == MOJO_RESULT_SHOULD_WAIT)
282 return true; 265 return true;
283 266
284 if (rv != MOJO_RESULT_OK) { 267 if (rv != MOJO_RESULT_OK) {
285 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 268 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
286 return false; 269 return false;
287 } 270 }
288 271
289 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { 272 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
290 HandleError(true, false); 273 HandleError(true, false);
291 return false; 274 return false;
292 } 275 }
293 return true; 276 return true;
294 } 277 }
295 278
296 void Connector::ReadAllAvailableMessages() { 279 void Connector::ReadAllAvailableMessages() {
297 while (!error_) { 280 while (!error_) {
298 base::WeakPtr<Connector> weak_self = weak_self_;
299 MojoResult rv; 281 MojoResult rv;
300 282
301 // May delete |this.| 283 if (!ReadSingleMessage(&rv)) {
302 if (!ReadSingleMessage(&rv)) 284 // Return immediately without touching any members. |this| may have been
285 // destroyed.
286 return;
287 }
288
289 if (paused_)
303 return; 290 return;
304 291
305 if (!weak_self || paused_) 292 if (rv == MOJO_RESULT_SHOULD_WAIT)
306 return; 293 break;
307
308 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT);
309
310 if (rv == MOJO_RESULT_SHOULD_WAIT) {
311 // Attempt to re-arm the Watcher.
312 MojoResult ready_result;
313 MojoResult arm_result = handle_watcher_->Arm(&ready_result);
314 if (arm_result == MOJO_RESULT_OK)
315 return;
316
317 // The watcher is already ready to notify again.
318 DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result);
319
320 if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) {
321 HandleError(false, false);
322 return;
323 }
324
325 // There's more to read now, so we'll just keep looping.
326 DCHECK_EQ(MOJO_RESULT_OK, ready_result);
327 }
328 } 294 }
329 } 295 }
330 296
331 void Connector::CancelWait() { 297 void Connector::CancelWait() {
332 handle_watcher_.reset(); 298 handle_watcher_.reset();
333 sync_watcher_.reset(); 299 sync_watcher_.reset();
334 } 300 }
335 301
336 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { 302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
337 if (error_ || !message_pipe_.is_valid()) 303 if (error_ || !message_pipe_.is_valid())
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
370 void Connector::EnsureSyncWatcherExists() { 336 void Connector::EnsureSyncWatcherExists() {
371 if (sync_watcher_) 337 if (sync_watcher_)
372 return; 338 return;
373 sync_watcher_.reset(new SyncHandleWatcher( 339 sync_watcher_.reset(new SyncHandleWatcher(
374 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
375 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, 341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
376 base::Unretained(this)))); 342 base::Unretained(this))));
377 } 343 }
378 344
379 } // namespace mojo 345 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/binding_state.cc ('k') | mojo/public/cpp/bindings/lib/interface_ptr_state.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698