OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
241 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { | 241 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { |
242 base::AutoLock lock(lock_); | 242 base::AutoLock lock(lock_); |
243 if (!in_two_phase_read_) | 243 if (!in_two_phase_read_) |
244 return MOJO_RESULT_FAILED_PRECONDITION; | 244 return MOJO_RESULT_FAILED_PRECONDITION; |
245 | 245 |
246 if (in_transit_) | 246 if (in_transit_) |
247 return MOJO_RESULT_INVALID_ARGUMENT; | 247 return MOJO_RESULT_INVALID_ARGUMENT; |
248 | 248 |
249 CHECK(shared_ring_buffer_); | 249 CHECK(shared_ring_buffer_); |
250 | 250 |
251 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); | |
252 MojoResult rv; | 251 MojoResult rv; |
253 if (num_bytes_read > two_phase_max_bytes_read_ || | 252 if (num_bytes_read > two_phase_max_bytes_read_ || |
254 num_bytes_read % options_.element_num_bytes != 0) { | 253 num_bytes_read % options_.element_num_bytes != 0) { |
255 rv = MOJO_RESULT_INVALID_ARGUMENT; | 254 rv = MOJO_RESULT_INVALID_ARGUMENT; |
256 } else { | 255 } else { |
257 rv = MOJO_RESULT_OK; | 256 rv = MOJO_RESULT_OK; |
258 read_offset_ = | 257 read_offset_ = |
259 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; | 258 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; |
260 | 259 |
261 DCHECK_GE(bytes_available_, num_bytes_read); | 260 DCHECK_GE(bytes_available_, num_bytes_read); |
262 bytes_available_ -= num_bytes_read; | 261 bytes_available_ -= num_bytes_read; |
263 | 262 |
264 base::AutoUnlock unlock(lock_); | 263 base::AutoUnlock unlock(lock_); |
265 NotifyRead(num_bytes_read); | 264 NotifyRead(num_bytes_read); |
266 } | 265 } |
267 | 266 |
268 in_two_phase_read_ = false; | 267 in_two_phase_read_ = false; |
269 two_phase_max_bytes_read_ = 0; | 268 two_phase_max_bytes_read_ = 0; |
270 | 269 |
271 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 270 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
272 if (!new_state.equals(old_state)) | |
273 awakable_list_.AwakeForStateChange(new_state); | |
274 watchers_.NotifyState(new_state); | |
275 | 271 |
276 return rv; | 272 return rv; |
277 } | 273 } |
278 | 274 |
279 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { | 275 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
280 base::AutoLock lock(lock_); | 276 base::AutoLock lock(lock_); |
281 return GetHandleSignalsStateNoLock(); | 277 return GetHandleSignalsStateNoLock(); |
282 } | 278 } |
283 | 279 |
284 MojoResult DataPipeConsumerDispatcher::AddWatcherRef( | 280 MojoResult DataPipeConsumerDispatcher::AddWatcherRef( |
285 const scoped_refptr<WatcherDispatcher>& watcher, | 281 const scoped_refptr<WatcherDispatcher>& watcher, |
286 uintptr_t context) { | 282 uintptr_t context) { |
287 base::AutoLock lock(lock_); | 283 base::AutoLock lock(lock_); |
288 if (is_closed_ || in_transit_) | 284 if (is_closed_ || in_transit_) |
289 return MOJO_RESULT_INVALID_ARGUMENT; | 285 return MOJO_RESULT_INVALID_ARGUMENT; |
290 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | 286 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
291 } | 287 } |
292 | 288 |
293 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( | 289 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( |
294 WatcherDispatcher* watcher, | 290 WatcherDispatcher* watcher, |
295 uintptr_t context) { | 291 uintptr_t context) { |
296 base::AutoLock lock(lock_); | 292 base::AutoLock lock(lock_); |
297 if (is_closed_ || in_transit_) | 293 if (is_closed_ || in_transit_) |
298 return MOJO_RESULT_INVALID_ARGUMENT; | 294 return MOJO_RESULT_INVALID_ARGUMENT; |
299 return watchers_.Remove(watcher, context); | 295 return watchers_.Remove(watcher, context); |
300 } | 296 } |
301 | 297 |
302 MojoResult DataPipeConsumerDispatcher::AddAwakable( | |
303 Awakable* awakable, | |
304 MojoHandleSignals signals, | |
305 uintptr_t context, | |
306 HandleSignalsState* signals_state) { | |
307 base::AutoLock lock(lock_); | |
308 if (!shared_ring_buffer_ || in_transit_) { | |
309 if (signals_state) | |
310 *signals_state = HandleSignalsState(); | |
311 return MOJO_RESULT_INVALID_ARGUMENT; | |
312 } | |
313 UpdateSignalsStateNoLock(); | |
314 HandleSignalsState state = GetHandleSignalsStateNoLock(); | |
315 if (state.satisfies(signals)) { | |
316 if (signals_state) | |
317 *signals_state = state; | |
318 return MOJO_RESULT_ALREADY_EXISTS; | |
319 } | |
320 if (!state.can_satisfy(signals)) { | |
321 if (signals_state) | |
322 *signals_state = state; | |
323 return MOJO_RESULT_FAILED_PRECONDITION; | |
324 } | |
325 | |
326 awakable_list_.Add(awakable, signals, context); | |
327 return MOJO_RESULT_OK; | |
328 } | |
329 | |
330 void DataPipeConsumerDispatcher::RemoveAwakable( | |
331 Awakable* awakable, | |
332 HandleSignalsState* signals_state) { | |
333 base::AutoLock lock(lock_); | |
334 if ((!shared_ring_buffer_ || in_transit_) && signals_state) | |
335 *signals_state = HandleSignalsState(); | |
336 else if (signals_state) | |
337 *signals_state = GetHandleSignalsStateNoLock(); | |
338 awakable_list_.Remove(awakable); | |
339 } | |
340 | |
341 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, | 298 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, |
342 uint32_t* num_ports, | 299 uint32_t* num_ports, |
343 uint32_t* num_handles) { | 300 uint32_t* num_handles) { |
344 base::AutoLock lock(lock_); | 301 base::AutoLock lock(lock_); |
345 DCHECK(in_transit_); | 302 DCHECK(in_transit_); |
346 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); | 303 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); |
347 *num_ports = 1; | 304 *num_ports = 1; |
348 *num_handles = 1; | 305 *num_handles = 1; |
349 } | 306 } |
350 | 307 |
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
473 } | 430 } |
474 | 431 |
475 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { | 432 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
476 lock_.AssertAcquired(); | 433 lock_.AssertAcquired(); |
477 if (is_closed_ || in_transit_) | 434 if (is_closed_ || in_transit_) |
478 return MOJO_RESULT_INVALID_ARGUMENT; | 435 return MOJO_RESULT_INVALID_ARGUMENT; |
479 is_closed_ = true; | 436 is_closed_ = true; |
480 ring_buffer_mapping_.reset(); | 437 ring_buffer_mapping_.reset(); |
481 shared_ring_buffer_ = nullptr; | 438 shared_ring_buffer_ = nullptr; |
482 | 439 |
483 awakable_list_.CancelAll(); | |
484 watchers_.NotifyClosed(); | 440 watchers_.NotifyClosed(); |
485 if (!transferred_) { | 441 if (!transferred_) { |
486 base::AutoUnlock unlock(lock_); | 442 base::AutoUnlock unlock(lock_); |
487 node_controller_->ClosePort(control_port_); | 443 node_controller_->ClosePort(control_port_); |
488 } | 444 } |
489 | 445 |
490 return MOJO_RESULT_OK; | 446 return MOJO_RESULT_OK; |
491 } | 447 } |
492 | 448 |
493 HandleSignalsState | 449 HandleSignalsState |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
591 | 547 |
592 bytes_available_ += m->num_bytes; | 548 bytes_available_ += m->num_bytes; |
593 } | 549 } |
594 } while (message); | 550 } while (message); |
595 } | 551 } |
596 | 552 |
597 bool has_new_data = bytes_available_ != previous_bytes_available; | 553 bool has_new_data = bytes_available_ != previous_bytes_available; |
598 if (has_new_data) | 554 if (has_new_data) |
599 new_data_available_ = true; | 555 new_data_available_ = true; |
600 | 556 |
601 if (peer_closed_ != was_peer_closed || has_new_data) { | 557 if (peer_closed_ != was_peer_closed || has_new_data) |
602 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 558 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
603 awakable_list_.AwakeForStateChange(state); | |
604 watchers_.NotifyState(state); | |
605 } | |
606 } | 559 } |
607 | 560 |
608 } // namespace edk | 561 } // namespace edk |
609 } // namespace mojo | 562 } // namespace mojo |
OLD | NEW |