| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <algorithm> | 10 #include <algorithm> |
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 117 if (is_closed_ || in_transit_) | 117 if (is_closed_ || in_transit_) |
| 118 return MOJO_RESULT_INVALID_ARGUMENT; | 118 return MOJO_RESULT_INVALID_ARGUMENT; |
| 119 | 119 |
| 120 return awakable_list_.RemoveWatcher(context); | 120 return awakable_list_.RemoveWatcher(context); |
| 121 } | 121 } |
| 122 | 122 |
| 123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, | 123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
| 124 uint32_t* num_bytes, | 124 uint32_t* num_bytes, |
| 125 MojoReadDataFlags flags) { | 125 MojoReadDataFlags flags) { |
| 126 base::AutoLock lock(lock_); | 126 base::AutoLock lock(lock_); |
| 127 new_data_available_ = false; |
| 128 |
| 127 if (!shared_ring_buffer_ || in_transit_) | 129 if (!shared_ring_buffer_ || in_transit_) |
| 128 return MOJO_RESULT_INVALID_ARGUMENT; | 130 return MOJO_RESULT_INVALID_ARGUMENT; |
| 129 | 131 |
| 130 if (in_two_phase_read_) | 132 if (in_two_phase_read_) |
| 131 return MOJO_RESULT_BUSY; | 133 return MOJO_RESULT_BUSY; |
| 132 | 134 |
| 133 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { | 135 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { |
| 134 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || | 136 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || |
| 135 (flags & MOJO_READ_DATA_FLAG_DISCARD)) | 137 (flags & MOJO_READ_DATA_FLAG_DISCARD)) |
| 136 return MOJO_RESULT_INVALID_ARGUMENT; | 138 return MOJO_RESULT_INVALID_ARGUMENT; |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 197 NotifyRead(bytes_to_read); | 199 NotifyRead(bytes_to_read); |
| 198 } | 200 } |
| 199 | 201 |
| 200 return MOJO_RESULT_OK; | 202 return MOJO_RESULT_OK; |
| 201 } | 203 } |
| 202 | 204 |
| 203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, | 205 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
| 204 uint32_t* buffer_num_bytes, | 206 uint32_t* buffer_num_bytes, |
| 205 MojoReadDataFlags flags) { | 207 MojoReadDataFlags flags) { |
| 206 base::AutoLock lock(lock_); | 208 base::AutoLock lock(lock_); |
| 209 new_data_available_ = false; |
| 207 if (!shared_ring_buffer_ || in_transit_) | 210 if (!shared_ring_buffer_ || in_transit_) |
| 208 return MOJO_RESULT_INVALID_ARGUMENT; | 211 return MOJO_RESULT_INVALID_ARGUMENT; |
| 209 | 212 |
| 210 if (in_two_phase_read_) | 213 if (in_two_phase_read_) |
| 211 return MOJO_RESULT_BUSY; | 214 return MOJO_RESULT_BUSY; |
| 212 | 215 |
| 213 // These flags may not be used in two-phase mode. | 216 // These flags may not be used in two-phase mode. |
| 214 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || | 217 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
| 215 (flags & MOJO_READ_DATA_FLAG_QUERY) || | 218 (flags & MOJO_READ_DATA_FLAG_QUERY) || |
| 216 (flags & MOJO_READ_DATA_FLAG_PEEK)) | 219 (flags & MOJO_READ_DATA_FLAG_PEEK)) |
| (...skipping 195 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 412 | 415 |
| 413 scoped_refptr<DataPipeConsumerDispatcher> dispatcher = | 416 scoped_refptr<DataPipeConsumerDispatcher> dispatcher = |
| 414 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, | 417 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, |
| 415 state->options, false /* initialized */, | 418 state->options, false /* initialized */, |
| 416 state->pipe_id); | 419 state->pipe_id); |
| 417 | 420 |
| 418 { | 421 { |
| 419 base::AutoLock lock(dispatcher->lock_); | 422 base::AutoLock lock(dispatcher->lock_); |
| 420 dispatcher->read_offset_ = state->read_offset; | 423 dispatcher->read_offset_ = state->read_offset; |
| 421 dispatcher->bytes_available_ = state->bytes_available; | 424 dispatcher->bytes_available_ = state->bytes_available; |
| 425 dispatcher->new_data_available_ = state->bytes_available > 0; |
| 422 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; | 426 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; |
| 423 dispatcher->InitializeNoLock(); | 427 dispatcher->InitializeNoLock(); |
| 424 dispatcher->UpdateSignalsStateNoLock(); | 428 dispatcher->UpdateSignalsStateNoLock(); |
| 425 } | 429 } |
| 426 | 430 |
| 427 return dispatcher; | 431 return dispatcher; |
| 428 } | 432 } |
| 429 | 433 |
| 430 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { | 434 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { |
| 431 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && | 435 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 467 | 471 |
| 468 return MOJO_RESULT_OK; | 472 return MOJO_RESULT_OK; |
| 469 } | 473 } |
| 470 | 474 |
| 471 HandleSignalsState | 475 HandleSignalsState |
| 472 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { | 476 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
| 473 lock_.AssertAcquired(); | 477 lock_.AssertAcquired(); |
| 474 | 478 |
| 475 HandleSignalsState rv; | 479 HandleSignalsState rv; |
| 476 if (shared_ring_buffer_ && bytes_available_) { | 480 if (shared_ring_buffer_ && bytes_available_) { |
| 477 if (!in_two_phase_read_) | 481 if (!in_two_phase_read_) { |
| 478 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 482 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 483 if (new_data_available_) |
| 484 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; |
| 485 } |
| 479 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 480 } else if (!peer_closed_ && shared_ring_buffer_) { | 487 } else if (!peer_closed_ && shared_ring_buffer_) { |
| 481 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 488 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 482 } | 489 } |
| 483 | 490 |
| 491 if (shared_ring_buffer_) { |
| 492 if (new_data_available_ || !peer_closed_) |
| 493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; |
| 494 } |
| 495 |
| 484 if (peer_closed_) | 496 if (peer_closed_) |
| 485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 497 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 498 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 499 |
| 487 return rv; | 500 return rv; |
| 488 } | 501 } |
| 489 | 502 |
| 490 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { | 503 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { |
| 491 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " | 504 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " |
| 492 << num_bytes << " bytes read. [control_port=" | 505 << num_bytes << " bytes read. [control_port=" |
| 493 << control_port_.name() << "]"; | 506 << control_port_.name() << "]"; |
| 494 | 507 |
| 495 SendDataPipeControlMessage(node_controller_, control_port_, | 508 SendDataPipeControlMessage(node_controller_, control_port_, |
| 496 DataPipeCommand::DATA_WAS_READ, num_bytes); | 509 DataPipeCommand::DATA_WAS_READ, num_bytes); |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 556 | 569 |
| 557 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " | 570 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " |
| 558 << m->num_bytes << " bytes were written. [control_port=" | 571 << m->num_bytes << " bytes were written. [control_port=" |
| 559 << control_port_.name() << "]"; | 572 << control_port_.name() << "]"; |
| 560 | 573 |
| 561 bytes_available_ += m->num_bytes; | 574 bytes_available_ += m->num_bytes; |
| 562 } | 575 } |
| 563 } while (message); | 576 } while (message); |
| 564 } | 577 } |
| 565 | 578 |
| 566 if (peer_closed_ != was_peer_closed || | 579 bool has_new_data = bytes_available_ != previous_bytes_available; |
| 567 bytes_available_ != previous_bytes_available) { | 580 if (has_new_data) |
| 581 new_data_available_ = true; |
| 582 |
| 583 if (peer_closed_ != was_peer_closed || has_new_data) { |
| 568 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 584 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 569 } | 585 } |
| 570 } | 586 } |
| 571 | 587 |
| 572 } // namespace edk | 588 } // namespace edk |
| 573 } // namespace mojo | 589 } // namespace mojo |
| OLD | NEW |