| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 121 // Don't return "should wait" since you can't wait for a specified amount of | 121 // Don't return "should wait" since you can't wait for a specified amount of |
| 122 // data. | 122 // data. |
| 123 return MOJO_RESULT_OUT_OF_RANGE; | 123 return MOJO_RESULT_OUT_OF_RANGE; |
| 124 } | 124 } |
| 125 | 125 |
| 126 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); | 126 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); |
| 127 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); | 127 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); |
| 128 if (num_bytes_to_write == 0) | 128 if (num_bytes_to_write == 0) |
| 129 return MOJO_RESULT_SHOULD_WAIT; | 129 return MOJO_RESULT_SHOULD_WAIT; |
| 130 | 130 |
| 131 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); | |
| 132 | |
| 133 *num_bytes = num_bytes_to_write; | 131 *num_bytes = num_bytes_to_write; |
| 134 | 132 |
| 135 CHECK(ring_buffer_mapping_); | 133 CHECK(ring_buffer_mapping_); |
| 136 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); | 134 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
| 137 CHECK(data); | 135 CHECK(data); |
| 138 | 136 |
| 139 const uint8_t* source = static_cast<const uint8_t*>(elements); | 137 const uint8_t* source = static_cast<const uint8_t*>(elements); |
| 140 CHECK(source); | 138 CHECK(source); |
| 141 | 139 |
| 142 DCHECK_LE(write_offset_, options_.capacity_num_bytes); | 140 DCHECK_LE(write_offset_, options_.capacity_num_bytes); |
| 143 uint32_t tail_bytes_to_write = | 141 uint32_t tail_bytes_to_write = |
| 144 std::min(options_.capacity_num_bytes - write_offset_, | 142 std::min(options_.capacity_num_bytes - write_offset_, |
| 145 num_bytes_to_write); | 143 num_bytes_to_write); |
| 146 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; | 144 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; |
| 147 | 145 |
| 148 DCHECK_GT(tail_bytes_to_write, 0u); | 146 DCHECK_GT(tail_bytes_to_write, 0u); |
| 149 memcpy(data + write_offset_, source, tail_bytes_to_write); | 147 memcpy(data + write_offset_, source, tail_bytes_to_write); |
| 150 if (head_bytes_to_write > 0) | 148 if (head_bytes_to_write > 0) |
| 151 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); | 149 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
| 152 | 150 |
| 153 DCHECK_LE(num_bytes_to_write, available_capacity_); | 151 DCHECK_LE(num_bytes_to_write, available_capacity_); |
| 154 available_capacity_ -= num_bytes_to_write; | 152 available_capacity_ -= num_bytes_to_write; |
| 155 write_offset_ = (write_offset_ + num_bytes_to_write) % | 153 write_offset_ = (write_offset_ + num_bytes_to_write) % |
| 156 options_.capacity_num_bytes; | 154 options_.capacity_num_bytes; |
| 157 | 155 |
| 158 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 156 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 159 if (!new_state.equals(old_state)) | |
| 160 awakable_list_.AwakeForStateChange(new_state); | |
| 161 watchers_.NotifyState(new_state); | |
| 162 | 157 |
| 163 base::AutoUnlock unlock(lock_); | 158 base::AutoUnlock unlock(lock_); |
| 164 NotifyWrite(num_bytes_to_write); | 159 NotifyWrite(num_bytes_to_write); |
| 165 | 160 |
| 166 return MOJO_RESULT_OK; | 161 return MOJO_RESULT_OK; |
| 167 } | 162 } |
| 168 | 163 |
| 169 MojoResult DataPipeProducerDispatcher::BeginWriteData( | 164 MojoResult DataPipeProducerDispatcher::BeginWriteData( |
| 170 void** buffer, | 165 void** buffer, |
| 171 uint32_t* buffer_num_bytes, | 166 uint32_t* buffer_num_bytes, |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 225 write_offset_ = (write_offset_ + num_bytes_written) % | 220 write_offset_ = (write_offset_ + num_bytes_written) % |
| 226 options_.capacity_num_bytes; | 221 options_.capacity_num_bytes; |
| 227 | 222 |
| 228 base::AutoUnlock unlock(lock_); | 223 base::AutoUnlock unlock(lock_); |
| 229 NotifyWrite(num_bytes_written); | 224 NotifyWrite(num_bytes_written); |
| 230 } | 225 } |
| 231 | 226 |
| 232 in_two_phase_write_ = false; | 227 in_two_phase_write_ = false; |
| 233 | 228 |
| 234 // If we're now writable, we *became* writable (since we weren't writable | 229 // If we're now writable, we *became* writable (since we weren't writable |
| 235 // during the two-phase write), so awake producer awakables. | 230 // during the two-phase write), so notify watchers. |
| 236 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 231 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 237 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | |
| 238 awakable_list_.AwakeForStateChange(new_state); | |
| 239 watchers_.NotifyState(new_state); | |
| 240 | 232 |
| 241 return rv; | 233 return rv; |
| 242 } | 234 } |
| 243 | 235 |
| 244 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { | 236 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
| 245 base::AutoLock lock(lock_); | 237 base::AutoLock lock(lock_); |
| 246 return GetHandleSignalsStateNoLock(); | 238 return GetHandleSignalsStateNoLock(); |
| 247 } | 239 } |
| 248 | 240 |
| 249 MojoResult DataPipeProducerDispatcher::AddWatcherRef( | 241 MojoResult DataPipeProducerDispatcher::AddWatcherRef( |
| 250 const scoped_refptr<WatcherDispatcher>& watcher, | 242 const scoped_refptr<WatcherDispatcher>& watcher, |
| 251 uintptr_t context) { | 243 uintptr_t context) { |
| 252 base::AutoLock lock(lock_); | 244 base::AutoLock lock(lock_); |
| 253 if (is_closed_ || in_transit_) | 245 if (is_closed_ || in_transit_) |
| 254 return MOJO_RESULT_INVALID_ARGUMENT; | 246 return MOJO_RESULT_INVALID_ARGUMENT; |
| 255 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | 247 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
| 256 } | 248 } |
| 257 | 249 |
| 258 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( | 250 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( |
| 259 WatcherDispatcher* watcher, | 251 WatcherDispatcher* watcher, |
| 260 uintptr_t context) { | 252 uintptr_t context) { |
| 261 base::AutoLock lock(lock_); | 253 base::AutoLock lock(lock_); |
| 262 if (is_closed_ || in_transit_) | 254 if (is_closed_ || in_transit_) |
| 263 return MOJO_RESULT_INVALID_ARGUMENT; | 255 return MOJO_RESULT_INVALID_ARGUMENT; |
| 264 return watchers_.Remove(watcher, context); | 256 return watchers_.Remove(watcher, context); |
| 265 } | 257 } |
| 266 | 258 |
| 267 MojoResult DataPipeProducerDispatcher::AddAwakable( | |
| 268 Awakable* awakable, | |
| 269 MojoHandleSignals signals, | |
| 270 uintptr_t context, | |
| 271 HandleSignalsState* signals_state) { | |
| 272 base::AutoLock lock(lock_); | |
| 273 if (!shared_ring_buffer_ || in_transit_) { | |
| 274 if (signals_state) | |
| 275 *signals_state = HandleSignalsState(); | |
| 276 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 277 } | |
| 278 UpdateSignalsStateNoLock(); | |
| 279 HandleSignalsState state = GetHandleSignalsStateNoLock(); | |
| 280 if (state.satisfies(signals)) { | |
| 281 if (signals_state) | |
| 282 *signals_state = state; | |
| 283 return MOJO_RESULT_ALREADY_EXISTS; | |
| 284 } | |
| 285 if (!state.can_satisfy(signals)) { | |
| 286 if (signals_state) | |
| 287 *signals_state = state; | |
| 288 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 289 } | |
| 290 | |
| 291 awakable_list_.Add(awakable, signals, context); | |
| 292 return MOJO_RESULT_OK; | |
| 293 } | |
| 294 | |
| 295 void DataPipeProducerDispatcher::RemoveAwakable( | |
| 296 Awakable* awakable, | |
| 297 HandleSignalsState* signals_state) { | |
| 298 base::AutoLock lock(lock_); | |
| 299 if ((!shared_ring_buffer_ || in_transit_) && signals_state) | |
| 300 *signals_state = HandleSignalsState(); | |
| 301 else if (signals_state) | |
| 302 *signals_state = GetHandleSignalsStateNoLock(); | |
| 303 awakable_list_.Remove(awakable); | |
| 304 } | |
| 305 | |
| 306 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, | 259 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, |
| 307 uint32_t* num_ports, | 260 uint32_t* num_ports, |
| 308 uint32_t* num_handles) { | 261 uint32_t* num_handles) { |
| 309 base::AutoLock lock(lock_); | 262 base::AutoLock lock(lock_); |
| 310 DCHECK(in_transit_); | 263 DCHECK(in_transit_); |
| 311 *num_bytes = sizeof(SerializedState); | 264 *num_bytes = sizeof(SerializedState); |
| 312 *num_ports = 1; | 265 *num_ports = 1; |
| 313 *num_handles = 1; | 266 *num_handles = 1; |
| 314 } | 267 } |
| 315 | 268 |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 355 CloseNoLock(); | 308 CloseNoLock(); |
| 356 } | 309 } |
| 357 | 310 |
| 358 void DataPipeProducerDispatcher::CancelTransit() { | 311 void DataPipeProducerDispatcher::CancelTransit() { |
| 359 base::AutoLock lock(lock_); | 312 base::AutoLock lock(lock_); |
| 360 DCHECK(in_transit_); | 313 DCHECK(in_transit_); |
| 361 in_transit_ = false; | 314 in_transit_ = false; |
| 362 buffer_handle_for_transit_.reset(); | 315 buffer_handle_for_transit_.reset(); |
| 363 | 316 |
| 364 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 317 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 365 awakable_list_.AwakeForStateChange(state); | |
| 366 watchers_.NotifyState(state); | 318 watchers_.NotifyState(state); |
| 367 } | 319 } |
| 368 | 320 |
| 369 // static | 321 // static |
| 370 scoped_refptr<DataPipeProducerDispatcher> | 322 scoped_refptr<DataPipeProducerDispatcher> |
| 371 DataPipeProducerDispatcher::Deserialize(const void* data, | 323 DataPipeProducerDispatcher::Deserialize(const void* data, |
| 372 size_t num_bytes, | 324 size_t num_bytes, |
| 373 const ports::PortName* ports, | 325 const ports::PortName* ports, |
| 374 size_t num_ports, | 326 size_t num_ports, |
| 375 PlatformHandle* handles, | 327 PlatformHandle* handles, |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 439 } | 391 } |
| 440 | 392 |
| 441 MojoResult DataPipeProducerDispatcher::CloseNoLock() { | 393 MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
| 442 lock_.AssertAcquired(); | 394 lock_.AssertAcquired(); |
| 443 if (is_closed_ || in_transit_) | 395 if (is_closed_ || in_transit_) |
| 444 return MOJO_RESULT_INVALID_ARGUMENT; | 396 return MOJO_RESULT_INVALID_ARGUMENT; |
| 445 is_closed_ = true; | 397 is_closed_ = true; |
| 446 ring_buffer_mapping_.reset(); | 398 ring_buffer_mapping_.reset(); |
| 447 shared_ring_buffer_ = nullptr; | 399 shared_ring_buffer_ = nullptr; |
| 448 | 400 |
| 449 awakable_list_.CancelAll(); | |
| 450 watchers_.NotifyClosed(); | 401 watchers_.NotifyClosed(); |
| 451 if (!transferred_) { | 402 if (!transferred_) { |
| 452 base::AutoUnlock unlock(lock_); | 403 base::AutoUnlock unlock(lock_); |
| 453 node_controller_->ClosePort(control_port_); | 404 node_controller_->ClosePort(control_port_); |
| 454 } | 405 } |
| 455 | 406 |
| 456 return MOJO_RESULT_OK; | 407 return MOJO_RESULT_OK; |
| 457 } | 408 } |
| 458 | 409 |
| 459 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() | 410 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 541 << m->num_bytes << " bytes were read. [control_port=" | 492 << m->num_bytes << " bytes were read. [control_port=" |
| 542 << control_port_.name() << "]"; | 493 << control_port_.name() << "]"; |
| 543 | 494 |
| 544 available_capacity_ += m->num_bytes; | 495 available_capacity_ += m->num_bytes; |
| 545 } | 496 } |
| 546 } while (message); | 497 } while (message); |
| 547 } | 498 } |
| 548 | 499 |
| 549 if (peer_closed_ != was_peer_closed || | 500 if (peer_closed_ != was_peer_closed || |
| 550 available_capacity_ != previous_capacity) { | 501 available_capacity_ != previous_capacity) { |
| 551 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 502 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 552 awakable_list_.AwakeForStateChange(state); | |
| 553 watchers_.NotifyState(state); | |
| 554 } | 503 } |
| 555 } | 504 } |
| 556 | 505 |
| 557 } // namespace edk | 506 } // namespace edk |
| 558 } // namespace mojo | 507 } // namespace mojo |
| OLD | NEW |