OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/connector.h" | 5 #include "mojo/public/cpp/bindings/connector.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
181 return sync_watcher_->SyncWatch(should_stop); | 181 return sync_watcher_->SyncWatch(should_stop); |
182 } | 182 } |
183 | 183 |
184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { | 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { |
185 heap_profiler_tag_ = tag; | 185 heap_profiler_tag_ = tag; |
186 if (handle_watcher_) { | 186 if (handle_watcher_) { |
187 handle_watcher_->set_heap_profiler_tag(tag); | 187 handle_watcher_->set_heap_profiler_tag(tag); |
188 } | 188 } |
189 } | 189 } |
190 | 190 |
191 void Connector::EnableNestedDispatch(bool enabled) { | |
192 nested_dispatch_enabled_ = enabled; | |
193 handle_watcher_.reset(); | |
194 WaitToReadMore(); | |
195 } | |
196 | |
197 void Connector::OnWatcherHandleReady(MojoResult result) { | 191 void Connector::OnWatcherHandleReady(MojoResult result) { |
198 OnHandleReadyInternal(result); | 192 OnHandleReadyInternal(result); |
199 } | 193 } |
200 | 194 |
201 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { | 195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
202 base::WeakPtr<Connector> weak_self(weak_self_); | 196 base::WeakPtr<Connector> weak_self(weak_self_); |
203 | 197 |
204 sync_handle_watcher_callback_count_++; | 198 sync_handle_watcher_callback_count_++; |
205 OnHandleReadyInternal(result); | 199 OnHandleReadyInternal(result); |
206 // At this point, this object might have been deleted. | 200 // At this point, this object might have been deleted. |
207 if (weak_self) { | 201 if (weak_self) { |
208 DCHECK_LT(0u, sync_handle_watcher_callback_count_); | 202 DCHECK_LT(0u, sync_handle_watcher_callback_count_); |
209 sync_handle_watcher_callback_count_--; | 203 sync_handle_watcher_callback_count_--; |
210 } | 204 } |
211 } | 205 } |
212 | 206 |
213 void Connector::OnHandleReadyInternal(MojoResult result) { | 207 void Connector::OnHandleReadyInternal(MojoResult result) { |
214 DCHECK(thread_checker_.CalledOnValidThread()); | 208 DCHECK(thread_checker_.CalledOnValidThread()); |
215 | 209 |
216 if (result != MOJO_RESULT_OK) { | 210 if (result != MOJO_RESULT_OK) { |
217 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
218 return; | 212 return; |
219 } | 213 } |
220 | |
221 ReadAllAvailableMessages(); | 214 ReadAllAvailableMessages(); |
222 // At this point, this object might have been deleted. Return. | 215 // At this point, this object might have been deleted. Return. |
223 } | 216 } |
224 | 217 |
225 void Connector::WaitToReadMore() { | 218 void Connector::WaitToReadMore() { |
226 CHECK(!paused_); | 219 CHECK(!paused_); |
227 DCHECK(!handle_watcher_); | 220 DCHECK(!handle_watcher_); |
228 | 221 |
229 handle_watcher_.reset(new SimpleWatcher( | 222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); |
230 FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_)); | |
231 if (heap_profiler_tag_) | 223 if (heap_profiler_tag_) |
232 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); | 224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); |
233 MojoResult rv = handle_watcher_->Watch( | 225 MojoResult rv = handle_watcher_->Start( |
234 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
235 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); | 227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); |
236 | 228 |
237 if (rv != MOJO_RESULT_OK) { | 229 if (rv != MOJO_RESULT_OK) { |
238 // If the watch failed because the handle is invalid or its conditions can | 230 // If the watch failed because the handle is invalid or its conditions can |
239 // no longer be met, we signal the error asynchronously to avoid reentry. | 231 // no longer be met, we signal the error asynchronously to avoid reentry. |
240 task_runner_->PostTask( | 232 task_runner_->PostTask( |
241 FROM_HERE, | 233 FROM_HERE, |
242 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
243 } else { | |
244 handle_watcher_->ArmOrNotify(); | |
245 } | 235 } |
246 | 236 |
247 if (allow_woken_up_by_others_) { | 237 if (allow_woken_up_by_others_) { |
248 EnsureSyncWatcherExists(); | 238 EnsureSyncWatcherExists(); |
249 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
250 } | 240 } |
251 } | 241 } |
252 | 242 |
253 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 243 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
254 CHECK(!paused_); | 244 CHECK(!paused_); |
255 | 245 |
256 bool receiver_result = false; | 246 bool receiver_result = false; |
257 | 247 |
258 // Detect if |this| was destroyed or the message pipe was closed/transferred | 248 // Detect if |this| was destroyed or the message pipe was closed/transferred |
259 // during message dispatch. | 249 // during message dispatch. |
260 base::WeakPtr<Connector> weak_self = weak_self_; | 250 base::WeakPtr<Connector> weak_self = weak_self_; |
261 | 251 |
262 Message message; | 252 Message message; |
263 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
264 *read_result = rv; | 254 *read_result = rv; |
265 | 255 |
266 if (nested_dispatch_enabled_) { | |
267 // When supporting nested dispatch, we have to rearm the Watcher immediately | |
268 // after reading each message (i.e. before dispatch) to ensure that the next | |
269 // inbound message can trigger OnHandleReady on the nested loop. | |
270 handle_watcher_->ArmOrNotify(); | |
271 } | |
272 | |
273 if (rv == MOJO_RESULT_OK) { | 256 if (rv == MOJO_RESULT_OK) { |
274 receiver_result = | 257 receiver_result = |
275 incoming_receiver_ && incoming_receiver_->Accept(&message); | 258 incoming_receiver_ && incoming_receiver_->Accept(&message); |
276 } | 259 } |
277 | 260 |
278 if (!weak_self) | 261 if (!weak_self) |
279 return false; | 262 return false; |
280 | 263 |
281 if (rv == MOJO_RESULT_SHOULD_WAIT) | 264 if (rv == MOJO_RESULT_SHOULD_WAIT) |
282 return true; | 265 return true; |
283 | 266 |
284 if (rv != MOJO_RESULT_OK) { | 267 if (rv != MOJO_RESULT_OK) { |
285 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); | 268 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); |
286 return false; | 269 return false; |
287 } | 270 } |
288 | 271 |
289 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { | 272 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { |
290 HandleError(true, false); | 273 HandleError(true, false); |
291 return false; | 274 return false; |
292 } | 275 } |
293 return true; | 276 return true; |
294 } | 277 } |
295 | 278 |
296 void Connector::ReadAllAvailableMessages() { | 279 void Connector::ReadAllAvailableMessages() { |
297 while (!error_) { | 280 while (!error_) { |
298 base::WeakPtr<Connector> weak_self = weak_self_; | |
299 MojoResult rv; | 281 MojoResult rv; |
300 | 282 |
301 // May delete |this.| | 283 if (!ReadSingleMessage(&rv)) { |
302 if (!ReadSingleMessage(&rv)) | 284 // Return immediately without touching any members. |this| may have been |
| 285 // destroyed. |
| 286 return; |
| 287 } |
| 288 |
| 289 if (paused_) |
303 return; | 290 return; |
304 | 291 |
305 if (!weak_self || paused_) | 292 if (rv == MOJO_RESULT_SHOULD_WAIT) |
306 return; | 293 break; |
307 | |
308 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT); | |
309 | |
310 if (rv == MOJO_RESULT_SHOULD_WAIT) { | |
311 // Attempt to re-arm the Watcher. | |
312 MojoResult ready_result; | |
313 MojoResult arm_result = handle_watcher_->Arm(&ready_result); | |
314 if (arm_result == MOJO_RESULT_OK) | |
315 return; | |
316 | |
317 // The watcher is already ready to notify again. | |
318 DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result); | |
319 | |
320 if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) { | |
321 HandleError(false, false); | |
322 return; | |
323 } | |
324 | |
325 // There's more to read now, so we'll just keep looping. | |
326 DCHECK_EQ(MOJO_RESULT_OK, ready_result); | |
327 } | |
328 } | 294 } |
329 } | 295 } |
330 | 296 |
331 void Connector::CancelWait() { | 297 void Connector::CancelWait() { |
332 handle_watcher_.reset(); | 298 handle_watcher_.reset(); |
333 sync_watcher_.reset(); | 299 sync_watcher_.reset(); |
334 } | 300 } |
335 | 301 |
336 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { | 302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
337 if (error_ || !message_pipe_.is_valid()) | 303 if (error_ || !message_pipe_.is_valid()) |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
370 void Connector::EnsureSyncWatcherExists() { | 336 void Connector::EnsureSyncWatcherExists() { |
371 if (sync_watcher_) | 337 if (sync_watcher_) |
372 return; | 338 return; |
373 sync_watcher_.reset(new SyncHandleWatcher( | 339 sync_watcher_.reset(new SyncHandleWatcher( |
374 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
375 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, | 341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
376 base::Unretained(this)))); | 342 base::Unretained(this)))); |
377 } | 343 } |
378 | 344 |
379 } // namespace mojo | 345 } // namespace mojo |
OLD | NEW |