Chromium Code Reviews| Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| index ec8d2d855e4bbb6f62940b91f5176b99e2ab1d46..c908e3ae8bf7d4f9668d9fa3836d1e8b79f8f97b 100644 |
| --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| @@ -124,6 +124,8 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
| uint32_t* num_bytes, |
| MojoReadDataFlags flags) { |
| base::AutoLock lock(lock_); |
| + new_data_available_ = false; |
|
Ken Rockot(use gerrit already)
2017/03/13 15:31:03
Note: No Awakable updates here since we're only ca
Ken Rockot(use gerrit already)
2017/03/13 15:31:03
Note: No Awakable updates here since we're only ca
|
| + |
| if (!shared_ring_buffer_ || in_transit_) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| @@ -204,6 +206,7 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
| uint32_t* buffer_num_bytes, |
| MojoReadDataFlags flags) { |
| base::AutoLock lock(lock_); |
| + new_data_available_ = false; |
| if (!shared_ring_buffer_ || in_transit_) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| @@ -419,6 +422,7 @@ DataPipeConsumerDispatcher::Deserialize(const void* data, |
| base::AutoLock lock(dispatcher->lock_); |
| dispatcher->read_offset_ = state->read_offset; |
| dispatcher->bytes_available_ = state->bytes_available; |
| + dispatcher->new_data_available_ = state->bytes_available > 0; |
| dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; |
| dispatcher->InitializeNoLock(); |
| dispatcher->UpdateSignalsStateNoLock(); |
| @@ -474,16 +478,25 @@ DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
| HandleSignalsState rv; |
| if (shared_ring_buffer_ && bytes_available_) { |
| - if (!in_two_phase_read_) |
| + if (!in_two_phase_read_) { |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| + if (new_data_available_) |
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; |
| + } |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| } else if (!peer_closed_ && shared_ring_buffer_) { |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| } |
| + if (shared_ring_buffer_) { |
| + if (new_data_available_ || !peer_closed_) |
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; |
| + } |
| + |
| if (peer_closed_) |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| + |
| return rv; |
| } |
| @@ -563,8 +576,11 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { |
| } while (message); |
| } |
| - if (peer_closed_ != was_peer_closed || |
| - bytes_available_ != previous_bytes_available) { |
| + bool has_new_data = bytes_available_ != previous_bytes_available; |
| + if (has_new_data) |
| + new_data_available_ = true; |
| + |
| + if (peer_closed_ != was_peer_closed || has_new_data) { |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| } |
| } |