OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | 90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
91 return Type::DATA_PIPE_PRODUCER; | 91 return Type::DATA_PIPE_PRODUCER; |
92 } | 92 } |
93 | 93 |
94 MojoResult DataPipeProducerDispatcher::Close() { | 94 MojoResult DataPipeProducerDispatcher::Close() { |
95 base::AutoLock lock(lock_); | 95 base::AutoLock lock(lock_); |
96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; | 96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; |
97 return CloseNoLock(); | 97 return CloseNoLock(); |
98 } | 98 } |
99 | 99 |
100 MojoResult DataPipeProducerDispatcher::Watch( | 100 MojoResult DataPipeProducerDispatcher::RegisterWatcher( |
101 MojoHandleSignals signals, | 101 MojoHandleSignals signals, |
102 const Watcher::WatchCallback& callback, | 102 const Watcher::WatchCallback& callback, |
103 uintptr_t context) { | 103 uintptr_t context) { |
104 base::AutoLock lock(lock_); | 104 base::AutoLock lock(lock_); |
105 | 105 |
106 if (is_closed_ || in_transit_) | 106 if (is_closed_ || in_transit_) |
107 return MOJO_RESULT_INVALID_ARGUMENT; | 107 return MOJO_RESULT_INVALID_ARGUMENT; |
108 | 108 |
109 return awakable_list_.AddWatcher( | 109 return watchers_.Add(signals, callback, context, |
110 signals, callback, context, GetHandleSignalsStateNoLock()); | 110 GetHandleSignalsStateNoLock()); |
111 } | 111 } |
112 | 112 |
113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { | 113 MojoResult DataPipeProducerDispatcher::ArmWatcher(uintptr_t context) { |
114 base::AutoLock lock(lock_); | 114 base::AutoLock lock(lock_); |
115 | 115 |
116 if (is_closed_ || in_transit_) | 116 if (is_closed_ || in_transit_) |
117 return MOJO_RESULT_INVALID_ARGUMENT; | 117 return MOJO_RESULT_INVALID_ARGUMENT; |
118 | 118 |
119 return awakable_list_.RemoveWatcher(context); | 119 return watchers_.Arm(context, GetHandleSignalsStateNoLock()); |
| 120 } |
| 121 |
| 122 MojoResult DataPipeProducerDispatcher::UnregisterWatcher(uintptr_t context) { |
| 123 base::AutoLock lock(lock_); |
| 124 |
| 125 if (is_closed_ || in_transit_) |
| 126 return MOJO_RESULT_INVALID_ARGUMENT; |
| 127 |
| 128 return watchers_.Remove(context); |
120 } | 129 } |
121 | 130 |
122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, | 131 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
123 uint32_t* num_bytes, | 132 uint32_t* num_bytes, |
124 MojoWriteDataFlags flags) { | 133 MojoWriteDataFlags flags) { |
125 base::AutoLock lock(lock_); | 134 base::AutoLock lock(lock_); |
126 if (!shared_ring_buffer_ || in_transit_) | 135 if (!shared_ring_buffer_ || in_transit_) |
127 return MOJO_RESULT_INVALID_ARGUMENT; | 136 return MOJO_RESULT_INVALID_ARGUMENT; |
128 | 137 |
129 if (in_two_phase_write_) | 138 if (in_two_phase_write_) |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
170 memcpy(data + write_offset_, source, tail_bytes_to_write); | 179 memcpy(data + write_offset_, source, tail_bytes_to_write); |
171 if (head_bytes_to_write > 0) | 180 if (head_bytes_to_write > 0) |
172 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); | 181 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
173 | 182 |
174 DCHECK_LE(num_bytes_to_write, available_capacity_); | 183 DCHECK_LE(num_bytes_to_write, available_capacity_); |
175 available_capacity_ -= num_bytes_to_write; | 184 available_capacity_ -= num_bytes_to_write; |
176 write_offset_ = (write_offset_ + num_bytes_to_write) % | 185 write_offset_ = (write_offset_ + num_bytes_to_write) % |
177 options_.capacity_num_bytes; | 186 options_.capacity_num_bytes; |
178 | 187 |
179 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 188 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
180 if (!new_state.equals(old_state)) | 189 if (!new_state.equals(old_state)) { |
181 awakable_list_.AwakeForStateChange(new_state); | 190 awakable_list_.AwakeForStateChange(new_state); |
| 191 watchers_.NotifyState(new_state); |
| 192 } |
182 | 193 |
183 base::AutoUnlock unlock(lock_); | 194 base::AutoUnlock unlock(lock_); |
184 NotifyWrite(num_bytes_to_write); | 195 NotifyWrite(num_bytes_to_write); |
185 | 196 |
186 return MOJO_RESULT_OK; | 197 return MOJO_RESULT_OK; |
187 } | 198 } |
188 | 199 |
189 MojoResult DataPipeProducerDispatcher::BeginWriteData( | 200 MojoResult DataPipeProducerDispatcher::BeginWriteData( |
190 void** buffer, | 201 void** buffer, |
191 uint32_t* buffer_num_bytes, | 202 uint32_t* buffer_num_bytes, |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
242 | 253 |
243 base::AutoUnlock unlock(lock_); | 254 base::AutoUnlock unlock(lock_); |
244 NotifyWrite(num_bytes_written); | 255 NotifyWrite(num_bytes_written); |
245 } | 256 } |
246 | 257 |
247 in_two_phase_write_ = false; | 258 in_two_phase_write_ = false; |
248 | 259 |
249 // If we're now writable, we *became* writable (since we weren't writable | 260 // If we're now writable, we *became* writable (since we weren't writable |
250 // during the two-phase write), so awake producer awakables. | 261 // during the two-phase write), so awake producer awakables. |
251 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 262 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
252 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | 263 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) { |
253 awakable_list_.AwakeForStateChange(new_state); | 264 awakable_list_.AwakeForStateChange(new_state); |
| 265 watchers_.NotifyState(new_state); |
| 266 } |
254 | 267 |
255 return rv; | 268 return rv; |
256 } | 269 } |
257 | 270 |
258 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { | 271 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
259 base::AutoLock lock(lock_); | 272 base::AutoLock lock(lock_); |
260 return GetHandleSignalsStateNoLock(); | 273 return GetHandleSignalsStateNoLock(); |
261 } | 274 } |
262 | 275 |
263 MojoResult DataPipeProducerDispatcher::AddAwakable( | 276 MojoResult DataPipeProducerDispatcher::AddAwakable( |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
349 in_transit_ = false; | 362 in_transit_ = false; |
350 ignore_result(buffer_handle_for_transit_.release()); | 363 ignore_result(buffer_handle_for_transit_.release()); |
351 CloseNoLock(); | 364 CloseNoLock(); |
352 } | 365 } |
353 | 366 |
354 void DataPipeProducerDispatcher::CancelTransit() { | 367 void DataPipeProducerDispatcher::CancelTransit() { |
355 base::AutoLock lock(lock_); | 368 base::AutoLock lock(lock_); |
356 DCHECK(in_transit_); | 369 DCHECK(in_transit_); |
357 in_transit_ = false; | 370 in_transit_ = false; |
358 buffer_handle_for_transit_.reset(); | 371 buffer_handle_for_transit_.reset(); |
359 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 372 |
| 373 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 374 awakable_list_.AwakeForStateChange(state); |
| 375 watchers_.NotifyState(state); |
360 } | 376 } |
361 | 377 |
362 // static | 378 // static |
363 scoped_refptr<DataPipeProducerDispatcher> | 379 scoped_refptr<DataPipeProducerDispatcher> |
364 DataPipeProducerDispatcher::Deserialize(const void* data, | 380 DataPipeProducerDispatcher::Deserialize(const void* data, |
365 size_t num_bytes, | 381 size_t num_bytes, |
366 const ports::PortName* ports, | 382 const ports::PortName* ports, |
367 size_t num_ports, | 383 size_t num_ports, |
368 PlatformHandle* handles, | 384 PlatformHandle* handles, |
369 size_t num_handles) { | 385 size_t num_handles) { |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
433 | 449 |
434 MojoResult DataPipeProducerDispatcher::CloseNoLock() { | 450 MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
435 lock_.AssertAcquired(); | 451 lock_.AssertAcquired(); |
436 if (is_closed_ || in_transit_) | 452 if (is_closed_ || in_transit_) |
437 return MOJO_RESULT_INVALID_ARGUMENT; | 453 return MOJO_RESULT_INVALID_ARGUMENT; |
438 is_closed_ = true; | 454 is_closed_ = true; |
439 ring_buffer_mapping_.reset(); | 455 ring_buffer_mapping_.reset(); |
440 shared_ring_buffer_ = nullptr; | 456 shared_ring_buffer_ = nullptr; |
441 | 457 |
442 awakable_list_.CancelAll(); | 458 awakable_list_.CancelAll(); |
| 459 watchers_.NotifyClosed(); |
443 if (!transferred_) { | 460 if (!transferred_) { |
444 base::AutoUnlock unlock(lock_); | 461 base::AutoUnlock unlock(lock_); |
445 node_controller_->ClosePort(control_port_); | 462 node_controller_->ClosePort(control_port_); |
446 } | 463 } |
447 | 464 |
448 return MOJO_RESULT_OK; | 465 return MOJO_RESULT_OK; |
449 } | 466 } |
450 | 467 |
451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() | 468 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
452 const { | 469 const { |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
534 << m->num_bytes << " bytes were read. [control_port=" | 551 << m->num_bytes << " bytes were read. [control_port=" |
535 << control_port_.name() << "]"; | 552 << control_port_.name() << "]"; |
536 | 553 |
537 available_capacity_ += m->num_bytes; | 554 available_capacity_ += m->num_bytes; |
538 } | 555 } |
539 } while (message); | 556 } while (message); |
540 } | 557 } |
541 | 558 |
542 if (peer_closed_ != was_peer_closed || | 559 if (peer_closed_ != was_peer_closed || |
543 available_capacity_ != previous_capacity) { | 560 available_capacity_ != previous_capacity) { |
544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 561 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 562 awakable_list_.AwakeForStateChange(state); |
| 563 watchers_.NotifyState(state); |
545 } | 564 } |
546 } | 565 } |
547 | 566 |
548 } // namespace edk | 567 } // namespace edk |
549 } // namespace mojo | 568 } // namespace mojo |
OLD | NEW |