OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
117 if (is_closed_ || in_transit_) | 117 if (is_closed_ || in_transit_) |
118 return MOJO_RESULT_INVALID_ARGUMENT; | 118 return MOJO_RESULT_INVALID_ARGUMENT; |
119 | 119 |
120 return awakable_list_.RemoveWatcher(context); | 120 return awakable_list_.RemoveWatcher(context); |
121 } | 121 } |
122 | 122 |
123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, | 123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
124 uint32_t* num_bytes, | 124 uint32_t* num_bytes, |
125 MojoReadDataFlags flags) { | 125 MojoReadDataFlags flags) { |
126 base::AutoLock lock(lock_); | 126 base::AutoLock lock(lock_); |
| 127 new_data_available_ = false; |
| 128 |
127 if (!shared_ring_buffer_ || in_transit_) | 129 if (!shared_ring_buffer_ || in_transit_) |
128 return MOJO_RESULT_INVALID_ARGUMENT; | 130 return MOJO_RESULT_INVALID_ARGUMENT; |
129 | 131 |
130 if (in_two_phase_read_) | 132 if (in_two_phase_read_) |
131 return MOJO_RESULT_BUSY; | 133 return MOJO_RESULT_BUSY; |
132 | 134 |
133 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { | 135 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { |
134 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || | 136 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || |
135 (flags & MOJO_READ_DATA_FLAG_DISCARD)) | 137 (flags & MOJO_READ_DATA_FLAG_DISCARD)) |
136 return MOJO_RESULT_INVALID_ARGUMENT; | 138 return MOJO_RESULT_INVALID_ARGUMENT; |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
197 NotifyRead(bytes_to_read); | 199 NotifyRead(bytes_to_read); |
198 } | 200 } |
199 | 201 |
200 return MOJO_RESULT_OK; | 202 return MOJO_RESULT_OK; |
201 } | 203 } |
202 | 204 |
203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, | 205 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
204 uint32_t* buffer_num_bytes, | 206 uint32_t* buffer_num_bytes, |
205 MojoReadDataFlags flags) { | 207 MojoReadDataFlags flags) { |
206 base::AutoLock lock(lock_); | 208 base::AutoLock lock(lock_); |
| 209 new_data_available_ = false; |
207 if (!shared_ring_buffer_ || in_transit_) | 210 if (!shared_ring_buffer_ || in_transit_) |
208 return MOJO_RESULT_INVALID_ARGUMENT; | 211 return MOJO_RESULT_INVALID_ARGUMENT; |
209 | 212 |
210 if (in_two_phase_read_) | 213 if (in_two_phase_read_) |
211 return MOJO_RESULT_BUSY; | 214 return MOJO_RESULT_BUSY; |
212 | 215 |
213 // These flags may not be used in two-phase mode. | 216 // These flags may not be used in two-phase mode. |
214 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || | 217 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
215 (flags & MOJO_READ_DATA_FLAG_QUERY) || | 218 (flags & MOJO_READ_DATA_FLAG_QUERY) || |
216 (flags & MOJO_READ_DATA_FLAG_PEEK)) | 219 (flags & MOJO_READ_DATA_FLAG_PEEK)) |
(...skipping 195 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
412 | 415 |
413 scoped_refptr<DataPipeConsumerDispatcher> dispatcher = | 416 scoped_refptr<DataPipeConsumerDispatcher> dispatcher = |
414 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, | 417 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, |
415 state->options, false /* initialized */, | 418 state->options, false /* initialized */, |
416 state->pipe_id); | 419 state->pipe_id); |
417 | 420 |
418 { | 421 { |
419 base::AutoLock lock(dispatcher->lock_); | 422 base::AutoLock lock(dispatcher->lock_); |
420 dispatcher->read_offset_ = state->read_offset; | 423 dispatcher->read_offset_ = state->read_offset; |
421 dispatcher->bytes_available_ = state->bytes_available; | 424 dispatcher->bytes_available_ = state->bytes_available; |
| 425 dispatcher->new_data_available_ = state->bytes_available > 0; |
422 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; | 426 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; |
423 dispatcher->InitializeNoLock(); | 427 dispatcher->InitializeNoLock(); |
424 dispatcher->UpdateSignalsStateNoLock(); | 428 dispatcher->UpdateSignalsStateNoLock(); |
425 } | 429 } |
426 | 430 |
427 return dispatcher; | 431 return dispatcher; |
428 } | 432 } |
429 | 433 |
430 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { | 434 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { |
431 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && | 435 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
467 | 471 |
468 return MOJO_RESULT_OK; | 472 return MOJO_RESULT_OK; |
469 } | 473 } |
470 | 474 |
471 HandleSignalsState | 475 HandleSignalsState |
472 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { | 476 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
473 lock_.AssertAcquired(); | 477 lock_.AssertAcquired(); |
474 | 478 |
475 HandleSignalsState rv; | 479 HandleSignalsState rv; |
476 if (shared_ring_buffer_ && bytes_available_) { | 480 if (shared_ring_buffer_ && bytes_available_) { |
477 if (!in_two_phase_read_) | 481 if (!in_two_phase_read_) { |
478 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 482 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 483 if (new_data_available_) |
| 484 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; |
| 485 } |
479 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
480 } else if (!peer_closed_ && shared_ring_buffer_) { | 487 } else if (!peer_closed_ && shared_ring_buffer_) { |
481 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 488 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
482 } | 489 } |
483 | 490 |
| 491 if (shared_ring_buffer_) { |
| 492 if (new_data_available_ || !peer_closed_) |
| 493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; |
| 494 } |
| 495 |
484 if (peer_closed_) | 496 if (peer_closed_) |
485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 497 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 498 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 499 |
487 return rv; | 500 return rv; |
488 } | 501 } |
489 | 502 |
490 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { | 503 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { |
491 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " | 504 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " |
492 << num_bytes << " bytes read. [control_port=" | 505 << num_bytes << " bytes read. [control_port=" |
493 << control_port_.name() << "]"; | 506 << control_port_.name() << "]"; |
494 | 507 |
495 SendDataPipeControlMessage(node_controller_, control_port_, | 508 SendDataPipeControlMessage(node_controller_, control_port_, |
496 DataPipeCommand::DATA_WAS_READ, num_bytes); | 509 DataPipeCommand::DATA_WAS_READ, num_bytes); |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
556 | 569 |
557 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " | 570 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " |
558 << m->num_bytes << " bytes were written. [control_port=" | 571 << m->num_bytes << " bytes were written. [control_port=" |
559 << control_port_.name() << "]"; | 572 << control_port_.name() << "]"; |
560 | 573 |
561 bytes_available_ += m->num_bytes; | 574 bytes_available_ += m->num_bytes; |
562 } | 575 } |
563 } while (message); | 576 } while (message); |
564 } | 577 } |
565 | 578 |
566 if (peer_closed_ != was_peer_closed || | 579 bool has_new_data = bytes_available_ != previous_bytes_available; |
567 bytes_available_ != previous_bytes_available) { | 580 if (has_new_data) |
| 581 new_data_available_ = true; |
| 582 |
| 583 if (peer_closed_ != was_peer_closed || has_new_data) { |
568 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 584 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
569 } | 585 } |
570 } | 586 } |
571 | 587 |
572 } // namespace edk | 588 } // namespace edk |
573 } // namespace mojo | 589 } // namespace mojo |
OLD | NEW |