| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 72 NodeController* node_controller, | 72 NodeController* node_controller, |
| 73 const ports::PortRef& control_port, | 73 const ports::PortRef& control_port, |
| 74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, | 74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
| 75 const MojoCreateDataPipeOptions& options, | 75 const MojoCreateDataPipeOptions& options, |
| 76 bool initialized, | 76 bool initialized, |
| 77 uint64_t pipe_id) | 77 uint64_t pipe_id) |
| 78 : options_(options), | 78 : options_(options), |
| 79 node_controller_(node_controller), | 79 node_controller_(node_controller), |
| 80 control_port_(control_port), | 80 control_port_(control_port), |
| 81 pipe_id_(pipe_id), | 81 pipe_id_(pipe_id), |
| 82 watchers_(this), |
| 82 shared_ring_buffer_(shared_ring_buffer), | 83 shared_ring_buffer_(shared_ring_buffer), |
| 83 available_capacity_(options_.capacity_num_bytes) { | 84 available_capacity_(options_.capacity_num_bytes) { |
| 84 if (initialized) { | 85 if (initialized) { |
| 85 base::AutoLock lock(lock_); | 86 base::AutoLock lock(lock_); |
| 86 InitializeNoLock(); | 87 InitializeNoLock(); |
| 87 } | 88 } |
| 88 } | 89 } |
| 89 | 90 |
| 90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | 91 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
| 91 return Type::DATA_PIPE_PRODUCER; | 92 return Type::DATA_PIPE_PRODUCER; |
| 92 } | 93 } |
| 93 | 94 |
| 94 MojoResult DataPipeProducerDispatcher::Close() { | 95 MojoResult DataPipeProducerDispatcher::Close() { |
| 95 base::AutoLock lock(lock_); | 96 base::AutoLock lock(lock_); |
| 96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; | 97 DVLOG(1) << "Closing data pipe producer " << pipe_id_; |
| 97 return CloseNoLock(); | 98 return CloseNoLock(); |
| 98 } | 99 } |
| 99 | 100 |
| 100 MojoResult DataPipeProducerDispatcher::Watch( | |
| 101 MojoHandleSignals signals, | |
| 102 const Watcher::WatchCallback& callback, | |
| 103 uintptr_t context) { | |
| 104 base::AutoLock lock(lock_); | |
| 105 | |
| 106 if (is_closed_ || in_transit_) | |
| 107 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 108 | |
| 109 return awakable_list_.AddWatcher( | |
| 110 signals, callback, context, GetHandleSignalsStateNoLock()); | |
| 111 } | |
| 112 | |
| 113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { | |
| 114 base::AutoLock lock(lock_); | |
| 115 | |
| 116 if (is_closed_ || in_transit_) | |
| 117 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 118 | |
| 119 return awakable_list_.RemoveWatcher(context); | |
| 120 } | |
| 121 | |
| 122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, | 101 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
| 123 uint32_t* num_bytes, | 102 uint32_t* num_bytes, |
| 124 MojoWriteDataFlags flags) { | 103 MojoWriteDataFlags flags) { |
| 125 base::AutoLock lock(lock_); | 104 base::AutoLock lock(lock_); |
| 126 if (!shared_ring_buffer_ || in_transit_) | 105 if (!shared_ring_buffer_ || in_transit_) |
| 127 return MOJO_RESULT_INVALID_ARGUMENT; | 106 return MOJO_RESULT_INVALID_ARGUMENT; |
| 128 | 107 |
| 129 if (in_two_phase_write_) | 108 if (in_two_phase_write_) |
| 130 return MOJO_RESULT_BUSY; | 109 return MOJO_RESULT_BUSY; |
| 131 | 110 |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 172 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); | 151 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
| 173 | 152 |
| 174 DCHECK_LE(num_bytes_to_write, available_capacity_); | 153 DCHECK_LE(num_bytes_to_write, available_capacity_); |
| 175 available_capacity_ -= num_bytes_to_write; | 154 available_capacity_ -= num_bytes_to_write; |
| 176 write_offset_ = (write_offset_ + num_bytes_to_write) % | 155 write_offset_ = (write_offset_ + num_bytes_to_write) % |
| 177 options_.capacity_num_bytes; | 156 options_.capacity_num_bytes; |
| 178 | 157 |
| 179 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 158 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
| 180 if (!new_state.equals(old_state)) | 159 if (!new_state.equals(old_state)) |
| 181 awakable_list_.AwakeForStateChange(new_state); | 160 awakable_list_.AwakeForStateChange(new_state); |
| 161 watchers_.NotifyState(new_state); |
| 182 | 162 |
| 183 base::AutoUnlock unlock(lock_); | 163 base::AutoUnlock unlock(lock_); |
| 184 NotifyWrite(num_bytes_to_write); | 164 NotifyWrite(num_bytes_to_write); |
| 185 | 165 |
| 186 return MOJO_RESULT_OK; | 166 return MOJO_RESULT_OK; |
| 187 } | 167 } |
| 188 | 168 |
| 189 MojoResult DataPipeProducerDispatcher::BeginWriteData( | 169 MojoResult DataPipeProducerDispatcher::BeginWriteData( |
| 190 void** buffer, | 170 void** buffer, |
| 191 uint32_t* buffer_num_bytes, | 171 uint32_t* buffer_num_bytes, |
| 192 MojoWriteDataFlags flags) { | 172 MojoWriteDataFlags flags) { |
| 193 base::AutoLock lock(lock_); | 173 base::AutoLock lock(lock_); |
| 194 if (!shared_ring_buffer_ || in_transit_) | 174 if (!shared_ring_buffer_ || in_transit_) |
| 195 return MOJO_RESULT_INVALID_ARGUMENT; | 175 return MOJO_RESULT_INVALID_ARGUMENT; |
| 176 |
| 177 // These flags may not be used in two-phase mode. |
| 178 if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) |
| 179 return MOJO_RESULT_INVALID_ARGUMENT; |
| 180 |
| 196 if (in_two_phase_write_) | 181 if (in_two_phase_write_) |
| 197 return MOJO_RESULT_BUSY; | 182 return MOJO_RESULT_BUSY; |
| 198 if (peer_closed_) | 183 if (peer_closed_) |
| 199 return MOJO_RESULT_FAILED_PRECONDITION; | 184 return MOJO_RESULT_FAILED_PRECONDITION; |
| 200 | 185 |
| 201 if (available_capacity_ == 0) { | 186 if (available_capacity_ == 0) { |
| 202 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 187 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 203 : MOJO_RESULT_SHOULD_WAIT; | 188 : MOJO_RESULT_SHOULD_WAIT; |
| 204 } | 189 } |
| 205 | 190 |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 244 NotifyWrite(num_bytes_written); | 229 NotifyWrite(num_bytes_written); |
| 245 } | 230 } |
| 246 | 231 |
| 247 in_two_phase_write_ = false; | 232 in_two_phase_write_ = false; |
| 248 | 233 |
| 249 // If we're now writable, we *became* writable (since we weren't writable | 234 // If we're now writable, we *became* writable (since we weren't writable |
| 250 // during the two-phase write), so awake producer awakables. | 235 // during the two-phase write), so awake producer awakables. |
| 251 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 236 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
| 252 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | 237 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
| 253 awakable_list_.AwakeForStateChange(new_state); | 238 awakable_list_.AwakeForStateChange(new_state); |
| 239 watchers_.NotifyState(new_state); |
| 254 | 240 |
| 255 return rv; | 241 return rv; |
| 256 } | 242 } |
| 257 | 243 |
| 258 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { | 244 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
| 259 base::AutoLock lock(lock_); | 245 base::AutoLock lock(lock_); |
| 260 return GetHandleSignalsStateNoLock(); | 246 return GetHandleSignalsStateNoLock(); |
| 261 } | 247 } |
| 262 | 248 |
| 249 MojoResult DataPipeProducerDispatcher::AddWatcherRef( |
| 250 const scoped_refptr<WatcherDispatcher>& watcher, |
| 251 uintptr_t context) { |
| 252 base::AutoLock lock(lock_); |
| 253 if (is_closed_ || in_transit_) |
| 254 return MOJO_RESULT_INVALID_ARGUMENT; |
| 255 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
| 256 } |
| 257 |
| 258 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( |
| 259 WatcherDispatcher* watcher, |
| 260 uintptr_t context) { |
| 261 base::AutoLock lock(lock_); |
| 262 if (is_closed_ || in_transit_) |
| 263 return MOJO_RESULT_INVALID_ARGUMENT; |
| 264 return watchers_.Remove(watcher, context); |
| 265 } |
| 266 |
| 263 MojoResult DataPipeProducerDispatcher::AddAwakable( | 267 MojoResult DataPipeProducerDispatcher::AddAwakable( |
| 264 Awakable* awakable, | 268 Awakable* awakable, |
| 265 MojoHandleSignals signals, | 269 MojoHandleSignals signals, |
| 266 uintptr_t context, | 270 uintptr_t context, |
| 267 HandleSignalsState* signals_state) { | 271 HandleSignalsState* signals_state) { |
| 268 base::AutoLock lock(lock_); | 272 base::AutoLock lock(lock_); |
| 269 if (!shared_ring_buffer_ || in_transit_) { | 273 if (!shared_ring_buffer_ || in_transit_) { |
| 270 if (signals_state) | 274 if (signals_state) |
| 271 *signals_state = HandleSignalsState(); | 275 *signals_state = HandleSignalsState(); |
| 272 return MOJO_RESULT_INVALID_ARGUMENT; | 276 return MOJO_RESULT_INVALID_ARGUMENT; |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 349 in_transit_ = false; | 353 in_transit_ = false; |
| 350 ignore_result(buffer_handle_for_transit_.release()); | 354 ignore_result(buffer_handle_for_transit_.release()); |
| 351 CloseNoLock(); | 355 CloseNoLock(); |
| 352 } | 356 } |
| 353 | 357 |
| 354 void DataPipeProducerDispatcher::CancelTransit() { | 358 void DataPipeProducerDispatcher::CancelTransit() { |
| 355 base::AutoLock lock(lock_); | 359 base::AutoLock lock(lock_); |
| 356 DCHECK(in_transit_); | 360 DCHECK(in_transit_); |
| 357 in_transit_ = false; | 361 in_transit_ = false; |
| 358 buffer_handle_for_transit_.reset(); | 362 buffer_handle_for_transit_.reset(); |
| 359 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 363 |
| 364 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 365 awakable_list_.AwakeForStateChange(state); |
| 366 watchers_.NotifyState(state); |
| 360 } | 367 } |
| 361 | 368 |
| 362 // static | 369 // static |
| 363 scoped_refptr<DataPipeProducerDispatcher> | 370 scoped_refptr<DataPipeProducerDispatcher> |
| 364 DataPipeProducerDispatcher::Deserialize(const void* data, | 371 DataPipeProducerDispatcher::Deserialize(const void* data, |
| 365 size_t num_bytes, | 372 size_t num_bytes, |
| 366 const ports::PortName* ports, | 373 const ports::PortName* ports, |
| 367 size_t num_ports, | 374 size_t num_ports, |
| 368 PlatformHandle* handles, | 375 PlatformHandle* handles, |
| 369 size_t num_handles) { | 376 size_t num_handles) { |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 433 | 440 |
| 434 MojoResult DataPipeProducerDispatcher::CloseNoLock() { | 441 MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
| 435 lock_.AssertAcquired(); | 442 lock_.AssertAcquired(); |
| 436 if (is_closed_ || in_transit_) | 443 if (is_closed_ || in_transit_) |
| 437 return MOJO_RESULT_INVALID_ARGUMENT; | 444 return MOJO_RESULT_INVALID_ARGUMENT; |
| 438 is_closed_ = true; | 445 is_closed_ = true; |
| 439 ring_buffer_mapping_.reset(); | 446 ring_buffer_mapping_.reset(); |
| 440 shared_ring_buffer_ = nullptr; | 447 shared_ring_buffer_ = nullptr; |
| 441 | 448 |
| 442 awakable_list_.CancelAll(); | 449 awakable_list_.CancelAll(); |
| 450 watchers_.NotifyClosed(); |
| 443 if (!transferred_) { | 451 if (!transferred_) { |
| 444 base::AutoUnlock unlock(lock_); | 452 base::AutoUnlock unlock(lock_); |
| 445 node_controller_->ClosePort(control_port_); | 453 node_controller_->ClosePort(control_port_); |
| 446 } | 454 } |
| 447 | 455 |
| 448 return MOJO_RESULT_OK; | 456 return MOJO_RESULT_OK; |
| 449 } | 457 } |
| 450 | 458 |
| 451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() | 459 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
| 452 const { | 460 const { |
| 453 lock_.AssertAcquired(); | 461 lock_.AssertAcquired(); |
| 454 HandleSignalsState rv; | 462 HandleSignalsState rv; |
| 455 if (!peer_closed_) { | 463 if (!peer_closed_) { |
| 456 if (!in_two_phase_write_ && shared_ring_buffer_ && | 464 if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0) |
| 457 available_capacity_ > 0) | |
| 458 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 465 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 459 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 466 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 460 } else { | 467 } else { |
| 461 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 468 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 462 } | 469 } |
| 463 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 470 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 464 return rv; | 471 return rv; |
| 465 } | 472 } |
| 466 | 473 |
| 467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { | 474 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 534 << m->num_bytes << " bytes were read. [control_port=" | 541 << m->num_bytes << " bytes were read. [control_port=" |
| 535 << control_port_.name() << "]"; | 542 << control_port_.name() << "]"; |
| 536 | 543 |
| 537 available_capacity_ += m->num_bytes; | 544 available_capacity_ += m->num_bytes; |
| 538 } | 545 } |
| 539 } while (message); | 546 } while (message); |
| 540 } | 547 } |
| 541 | 548 |
| 542 if (peer_closed_ != was_peer_closed || | 549 if (peer_closed_ != was_peer_closed || |
| 543 available_capacity_ != previous_capacity) { | 550 available_capacity_ != previous_capacity) { |
| 544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 551 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 552 awakable_list_.AwakeForStateChange(state); |
| 553 watchers_.NotifyState(state); |
| 545 } | 554 } |
| 546 } | 555 } |
| 547 | 556 |
| 548 } // namespace edk | 557 } // namespace edk |
| 549 } // namespace mojo | 558 } // namespace mojo |
| OLD | NEW |