Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <algorithm> | 10 #include <algorithm> |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 73 NodeController* node_controller, | 73 NodeController* node_controller, |
| 74 const ports::PortRef& control_port, | 74 const ports::PortRef& control_port, |
| 75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, | 75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
| 76 const MojoCreateDataPipeOptions& options, | 76 const MojoCreateDataPipeOptions& options, |
| 77 bool initialized, | 77 bool initialized, |
| 78 uint64_t pipe_id) | 78 uint64_t pipe_id) |
| 79 : options_(options), | 79 : options_(options), |
| 80 node_controller_(node_controller), | 80 node_controller_(node_controller), |
| 81 control_port_(control_port), | 81 control_port_(control_port), |
| 82 pipe_id_(pipe_id), | 82 pipe_id_(pipe_id), |
| 83 watchers_(this), | |
| 83 shared_ring_buffer_(shared_ring_buffer) { | 84 shared_ring_buffer_(shared_ring_buffer) { |
| 84 if (initialized) { | 85 if (initialized) { |
| 85 base::AutoLock lock(lock_); | 86 base::AutoLock lock(lock_); |
| 86 InitializeNoLock(); | 87 InitializeNoLock(); |
| 87 } | 88 } |
| 88 } | 89 } |
| 89 | 90 |
| 90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { | 91 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { |
| 91 return Type::DATA_PIPE_CONSUMER; | 92 return Type::DATA_PIPE_CONSUMER; |
| 92 } | 93 } |
| 93 | 94 |
| 94 MojoResult DataPipeConsumerDispatcher::Close() { | 95 MojoResult DataPipeConsumerDispatcher::Close() { |
| 95 base::AutoLock lock(lock_); | 96 base::AutoLock lock(lock_); |
| 96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; | 97 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; |
| 97 return CloseNoLock(); | 98 return CloseNoLock(); |
| 98 } | 99 } |
| 99 | 100 |
| 100 | |
| 101 MojoResult DataPipeConsumerDispatcher::Watch( | |
| 102 MojoHandleSignals signals, | |
| 103 const Watcher::WatchCallback& callback, | |
| 104 uintptr_t context) { | |
| 105 base::AutoLock lock(lock_); | |
| 106 | |
| 107 if (is_closed_ || in_transit_) | |
| 108 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 109 | |
| 110 return awakable_list_.AddWatcher( | |
| 111 signals, callback, context, GetHandleSignalsStateNoLock()); | |
| 112 } | |
| 113 | |
| 114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { | |
| 115 base::AutoLock lock(lock_); | |
| 116 | |
| 117 if (is_closed_ || in_transit_) | |
| 118 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 119 | |
| 120 return awakable_list_.RemoveWatcher(context); | |
| 121 } | |
| 122 | |
| 123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, | 101 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
| 124 uint32_t* num_bytes, | 102 uint32_t* num_bytes, |
| 125 MojoReadDataFlags flags) { | 103 MojoReadDataFlags flags) { |
| 126 base::AutoLock lock(lock_); | 104 base::AutoLock lock(lock_); |
| 127 if (!shared_ring_buffer_ || in_transit_) | 105 if (!shared_ring_buffer_ || in_transit_) |
| 128 return MOJO_RESULT_INVALID_ARGUMENT; | 106 return MOJO_RESULT_INVALID_ARGUMENT; |
| 129 | 107 |
| 130 if (in_two_phase_read_) | 108 if (in_two_phase_read_) |
| 131 return MOJO_RESULT_BUSY; | 109 return MOJO_RESULT_BUSY; |
| 132 | 110 |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 153 | 131 |
| 154 uint32_t max_num_bytes_to_read = *num_bytes; | 132 uint32_t max_num_bytes_to_read = *num_bytes; |
| 155 if (max_num_bytes_to_read % options_.element_num_bytes != 0) | 133 if (max_num_bytes_to_read % options_.element_num_bytes != 0) |
| 156 return MOJO_RESULT_INVALID_ARGUMENT; | 134 return MOJO_RESULT_INVALID_ARGUMENT; |
| 157 | 135 |
| 158 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; | 136 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; |
| 159 uint32_t min_num_bytes_to_read = | 137 uint32_t min_num_bytes_to_read = |
| 160 all_or_none ? max_num_bytes_to_read : 0; | 138 all_or_none ? max_num_bytes_to_read : 0; |
| 161 | 139 |
| 162 if (min_num_bytes_to_read > bytes_available_) { | 140 if (min_num_bytes_to_read > bytes_available_) { |
| 141 if (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL) { | |
| 142 suppress_readable_signal_ = true; | |
| 143 watchers_.NotifyState(GetHandleSignalsStateNoLock()); | |
| 144 } | |
| 163 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 145 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 164 : MOJO_RESULT_OUT_OF_RANGE; | 146 : MOJO_RESULT_OUT_OF_RANGE; |
| 165 } | 147 } |
| 166 | 148 |
| 167 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); | 149 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); |
| 168 if (bytes_to_read == 0) { | 150 if (bytes_to_read == 0) { |
| 169 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 151 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 170 : MOJO_RESULT_SHOULD_WAIT; | 152 : MOJO_RESULT_SHOULD_WAIT; |
| 171 } | 153 } |
| 172 | 154 |
| (...skipping 17 matching lines...) Expand all Loading... | |
| 190 | 172 |
| 191 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); | 173 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); |
| 192 if (discard || !peek) { | 174 if (discard || !peek) { |
| 193 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; | 175 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; |
| 194 bytes_available_ -= bytes_to_read; | 176 bytes_available_ -= bytes_to_read; |
| 195 | 177 |
| 196 base::AutoUnlock unlock(lock_); | 178 base::AutoUnlock unlock(lock_); |
| 197 NotifyRead(bytes_to_read); | 179 NotifyRead(bytes_to_read); |
| 198 } | 180 } |
| 199 | 181 |
| 182 // We may have just read the last available data and thus changed the signals | |
| 183 // state. | |
| 184 watchers_.NotifyState(GetHandleSignalsStateNoLock()); | |
| 185 | |
| 200 return MOJO_RESULT_OK; | 186 return MOJO_RESULT_OK; |
| 201 } | 187 } |
| 202 | 188 |
| 203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, | 189 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
| 204 uint32_t* buffer_num_bytes, | 190 uint32_t* buffer_num_bytes, |
| 205 MojoReadDataFlags flags) { | 191 MojoReadDataFlags flags) { |
| 206 base::AutoLock lock(lock_); | 192 base::AutoLock lock(lock_); |
| 207 if (!shared_ring_buffer_ || in_transit_) | 193 if (!shared_ring_buffer_ || in_transit_) |
| 208 return MOJO_RESULT_INVALID_ARGUMENT; | 194 return MOJO_RESULT_INVALID_ARGUMENT; |
| 209 | 195 |
| 210 if (in_two_phase_read_) | 196 if (in_two_phase_read_) |
| 211 return MOJO_RESULT_BUSY; | 197 return MOJO_RESULT_BUSY; |
| 212 | 198 |
| 213 // These flags may not be used in two-phase mode. | 199 // These flags may not be used in two-phase mode. |
| 214 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || | 200 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
| 215 (flags & MOJO_READ_DATA_FLAG_QUERY) || | 201 (flags & MOJO_READ_DATA_FLAG_QUERY) || |
| 216 (flags & MOJO_READ_DATA_FLAG_PEEK)) | 202 (flags & MOJO_READ_DATA_FLAG_PEEK) || |
| 203 (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL)) | |
| 217 return MOJO_RESULT_INVALID_ARGUMENT; | 204 return MOJO_RESULT_INVALID_ARGUMENT; |
| 218 | 205 |
| 219 if (bytes_available_ == 0) { | 206 if (bytes_available_ == 0) { |
| 220 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 207 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 221 : MOJO_RESULT_SHOULD_WAIT; | 208 : MOJO_RESULT_SHOULD_WAIT; |
| 222 } | 209 } |
| 223 | 210 |
| 224 DCHECK_LT(read_offset_, options_.capacity_num_bytes); | 211 DCHECK_LT(read_offset_, options_.capacity_num_bytes); |
| 225 uint32_t bytes_to_read = std::min(bytes_available_, | 212 uint32_t bytes_to_read = std::min(bytes_available_, |
| 226 options_.capacity_num_bytes - read_offset_); | 213 options_.capacity_num_bytes - read_offset_); |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 263 base::AutoUnlock unlock(lock_); | 250 base::AutoUnlock unlock(lock_); |
| 264 NotifyRead(num_bytes_read); | 251 NotifyRead(num_bytes_read); |
| 265 } | 252 } |
| 266 | 253 |
| 267 in_two_phase_read_ = false; | 254 in_two_phase_read_ = false; |
| 268 two_phase_max_bytes_read_ = 0; | 255 two_phase_max_bytes_read_ = 0; |
| 269 | 256 |
| 270 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 257 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
| 271 if (!new_state.equals(old_state)) | 258 if (!new_state.equals(old_state)) |
| 272 awakable_list_.AwakeForStateChange(new_state); | 259 awakable_list_.AwakeForStateChange(new_state); |
| 260 watchers_.NotifyState(new_state); | |
| 273 | 261 |
| 274 return rv; | 262 return rv; |
| 275 } | 263 } |
| 276 | 264 |
| 277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { | 265 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
| 278 base::AutoLock lock(lock_); | 266 base::AutoLock lock(lock_); |
| 279 return GetHandleSignalsStateNoLock(); | 267 return GetHandleSignalsStateNoLock(); |
| 280 } | 268 } |
| 281 | 269 |
| 270 MojoResult DataPipeConsumerDispatcher::AddWatcherRef( | |
| 271 const scoped_refptr<WatcherDispatcher>& watcher, | |
| 272 uintptr_t context) { | |
| 273 base::AutoLock lock(lock_); | |
| 274 if (is_closed_ || in_transit_) | |
| 275 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 276 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | |
| 277 } | |
| 278 | |
| 279 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( | |
| 280 WatcherDispatcher* watcher, | |
| 281 uintptr_t context) { | |
| 282 base::AutoLock lock(lock_); | |
| 283 if (is_closed_ || in_transit_) | |
| 284 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 285 return watchers_.Remove(watcher, context); | |
| 286 } | |
| 287 | |
| 282 MojoResult DataPipeConsumerDispatcher::AddAwakable( | 288 MojoResult DataPipeConsumerDispatcher::AddAwakable( |
| 283 Awakable* awakable, | 289 Awakable* awakable, |
| 284 MojoHandleSignals signals, | 290 MojoHandleSignals signals, |
| 285 uintptr_t context, | 291 uintptr_t context, |
| 286 HandleSignalsState* signals_state) { | 292 HandleSignalsState* signals_state) { |
| 287 base::AutoLock lock(lock_); | 293 base::AutoLock lock(lock_); |
| 288 if (!shared_ring_buffer_ || in_transit_) { | 294 if (!shared_ring_buffer_ || in_transit_) { |
| 289 if (signals_state) | 295 if (signals_state) |
| 290 *signals_state = HandleSignalsState(); | 296 *signals_state = HandleSignalsState(); |
| 291 return MOJO_RESULT_INVALID_ARGUMENT; | 297 return MOJO_RESULT_INVALID_ARGUMENT; |
| (...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 453 | 459 |
| 454 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { | 460 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
| 455 lock_.AssertAcquired(); | 461 lock_.AssertAcquired(); |
| 456 if (is_closed_ || in_transit_) | 462 if (is_closed_ || in_transit_) |
| 457 return MOJO_RESULT_INVALID_ARGUMENT; | 463 return MOJO_RESULT_INVALID_ARGUMENT; |
| 458 is_closed_ = true; | 464 is_closed_ = true; |
| 459 ring_buffer_mapping_.reset(); | 465 ring_buffer_mapping_.reset(); |
| 460 shared_ring_buffer_ = nullptr; | 466 shared_ring_buffer_ = nullptr; |
| 461 | 467 |
| 462 awakable_list_.CancelAll(); | 468 awakable_list_.CancelAll(); |
| 469 watchers_.NotifyClosed(); | |
| 463 if (!transferred_) { | 470 if (!transferred_) { |
| 464 base::AutoUnlock unlock(lock_); | 471 base::AutoUnlock unlock(lock_); |
| 465 node_controller_->ClosePort(control_port_); | 472 node_controller_->ClosePort(control_port_); |
| 466 } | 473 } |
| 467 | 474 |
| 468 return MOJO_RESULT_OK; | 475 return MOJO_RESULT_OK; |
| 469 } | 476 } |
| 470 | 477 |
| 471 HandleSignalsState | 478 HandleSignalsState |
| 472 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { | 479 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
| 473 lock_.AssertAcquired(); | 480 lock_.AssertAcquired(); |
| 474 | 481 |
| 475 HandleSignalsState rv; | 482 HandleSignalsState rv; |
| 476 if (shared_ring_buffer_ && bytes_available_) { | 483 if (shared_ring_buffer_ && bytes_available_) { |
| 477 if (!in_two_phase_read_) | 484 if (!in_two_phase_read_ && bytes_available_ && !suppress_readable_signal_) |
|
yzshen1
2017/03/11 00:44:58
nit: there is no need to check |bytes_available_|
Ken Rockot(use gerrit already)
2017/03/12 22:24:13
Done
| |
| 478 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 479 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 480 } else if (!peer_closed_ && shared_ring_buffer_) { | 487 } else if (!peer_closed_ && shared_ring_buffer_) { |
| 481 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 488 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 482 } | 489 } |
| 483 | 490 |
| 484 if (peer_closed_) | 491 if (peer_closed_) |
| 485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 492 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 487 return rv; | 494 return rv; |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 558 << m->num_bytes << " bytes were written. [control_port=" | 565 << m->num_bytes << " bytes were written. [control_port=" |
| 559 << control_port_.name() << "]"; | 566 << control_port_.name() << "]"; |
| 560 | 567 |
| 561 bytes_available_ += m->num_bytes; | 568 bytes_available_ += m->num_bytes; |
| 562 } | 569 } |
| 563 } while (message); | 570 } while (message); |
| 564 } | 571 } |
| 565 | 572 |
| 566 if (peer_closed_ != was_peer_closed || | 573 if (peer_closed_ != was_peer_closed || |
| 567 bytes_available_ != previous_bytes_available) { | 574 bytes_available_ != previous_bytes_available) { |
| 568 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 575 suppress_readable_signal_ = false; |
| 576 HandleSignalsState state = GetHandleSignalsStateNoLock(); | |
| 577 awakable_list_.AwakeForStateChange(state); | |
| 578 watchers_.NotifyState(state); | |
| 569 } | 579 } |
| 570 } | 580 } |
| 571 | 581 |
| 572 } // namespace edk | 582 } // namespace edk |
| 573 } // namespace mojo | 583 } // namespace mojo |
| OLD | NEW |