OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
72 NodeController* node_controller, | 72 NodeController* node_controller, |
73 const ports::PortRef& control_port, | 73 const ports::PortRef& control_port, |
74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, | 74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
75 const MojoCreateDataPipeOptions& options, | 75 const MojoCreateDataPipeOptions& options, |
76 bool initialized, | 76 bool initialized, |
77 uint64_t pipe_id) | 77 uint64_t pipe_id) |
78 : options_(options), | 78 : options_(options), |
79 node_controller_(node_controller), | 79 node_controller_(node_controller), |
80 control_port_(control_port), | 80 control_port_(control_port), |
81 pipe_id_(pipe_id), | 81 pipe_id_(pipe_id), |
82 watchers_(this), | |
83 shared_ring_buffer_(shared_ring_buffer), | 82 shared_ring_buffer_(shared_ring_buffer), |
84 available_capacity_(options_.capacity_num_bytes) { | 83 available_capacity_(options_.capacity_num_bytes) { |
85 if (initialized) { | 84 if (initialized) { |
86 base::AutoLock lock(lock_); | 85 base::AutoLock lock(lock_); |
87 InitializeNoLock(); | 86 InitializeNoLock(); |
88 } | 87 } |
89 } | 88 } |
90 | 89 |
91 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | 90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
92 return Type::DATA_PIPE_PRODUCER; | 91 return Type::DATA_PIPE_PRODUCER; |
93 } | 92 } |
94 | 93 |
95 MojoResult DataPipeProducerDispatcher::Close() { | 94 MojoResult DataPipeProducerDispatcher::Close() { |
96 base::AutoLock lock(lock_); | 95 base::AutoLock lock(lock_); |
97 DVLOG(1) << "Closing data pipe producer " << pipe_id_; | 96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; |
98 return CloseNoLock(); | 97 return CloseNoLock(); |
99 } | 98 } |
100 | 99 |
| 100 MojoResult DataPipeProducerDispatcher::Watch( |
| 101 MojoHandleSignals signals, |
| 102 const Watcher::WatchCallback& callback, |
| 103 uintptr_t context) { |
| 104 base::AutoLock lock(lock_); |
| 105 |
| 106 if (is_closed_ || in_transit_) |
| 107 return MOJO_RESULT_INVALID_ARGUMENT; |
| 108 |
| 109 return awakable_list_.AddWatcher( |
| 110 signals, callback, context, GetHandleSignalsStateNoLock()); |
| 111 } |
| 112 |
| 113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { |
| 114 base::AutoLock lock(lock_); |
| 115 |
| 116 if (is_closed_ || in_transit_) |
| 117 return MOJO_RESULT_INVALID_ARGUMENT; |
| 118 |
| 119 return awakable_list_.RemoveWatcher(context); |
| 120 } |
| 121 |
101 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, | 122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
102 uint32_t* num_bytes, | 123 uint32_t* num_bytes, |
103 MojoWriteDataFlags flags) { | 124 MojoWriteDataFlags flags) { |
104 base::AutoLock lock(lock_); | 125 base::AutoLock lock(lock_); |
105 if (!shared_ring_buffer_ || in_transit_) | 126 if (!shared_ring_buffer_ || in_transit_) |
106 return MOJO_RESULT_INVALID_ARGUMENT; | 127 return MOJO_RESULT_INVALID_ARGUMENT; |
107 | 128 |
108 if (in_two_phase_write_) | 129 if (in_two_phase_write_) |
109 return MOJO_RESULT_BUSY; | 130 return MOJO_RESULT_BUSY; |
110 | 131 |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
151 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); | 172 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
152 | 173 |
153 DCHECK_LE(num_bytes_to_write, available_capacity_); | 174 DCHECK_LE(num_bytes_to_write, available_capacity_); |
154 available_capacity_ -= num_bytes_to_write; | 175 available_capacity_ -= num_bytes_to_write; |
155 write_offset_ = (write_offset_ + num_bytes_to_write) % | 176 write_offset_ = (write_offset_ + num_bytes_to_write) % |
156 options_.capacity_num_bytes; | 177 options_.capacity_num_bytes; |
157 | 178 |
158 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 179 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
159 if (!new_state.equals(old_state)) | 180 if (!new_state.equals(old_state)) |
160 awakable_list_.AwakeForStateChange(new_state); | 181 awakable_list_.AwakeForStateChange(new_state); |
161 watchers_.NotifyState(new_state); | |
162 | 182 |
163 base::AutoUnlock unlock(lock_); | 183 base::AutoUnlock unlock(lock_); |
164 NotifyWrite(num_bytes_to_write); | 184 NotifyWrite(num_bytes_to_write); |
165 | 185 |
166 return MOJO_RESULT_OK; | 186 return MOJO_RESULT_OK; |
167 } | 187 } |
168 | 188 |
169 MojoResult DataPipeProducerDispatcher::BeginWriteData( | 189 MojoResult DataPipeProducerDispatcher::BeginWriteData( |
170 void** buffer, | 190 void** buffer, |
171 uint32_t* buffer_num_bytes, | 191 uint32_t* buffer_num_bytes, |
172 MojoWriteDataFlags flags) { | 192 MojoWriteDataFlags flags) { |
173 base::AutoLock lock(lock_); | 193 base::AutoLock lock(lock_); |
174 if (!shared_ring_buffer_ || in_transit_) | 194 if (!shared_ring_buffer_ || in_transit_) |
175 return MOJO_RESULT_INVALID_ARGUMENT; | 195 return MOJO_RESULT_INVALID_ARGUMENT; |
176 | |
177 // These flags may not be used in two-phase mode. | |
178 if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) | |
179 return MOJO_RESULT_INVALID_ARGUMENT; | |
180 | |
181 if (in_two_phase_write_) | 196 if (in_two_phase_write_) |
182 return MOJO_RESULT_BUSY; | 197 return MOJO_RESULT_BUSY; |
183 if (peer_closed_) | 198 if (peer_closed_) |
184 return MOJO_RESULT_FAILED_PRECONDITION; | 199 return MOJO_RESULT_FAILED_PRECONDITION; |
185 | 200 |
186 if (available_capacity_ == 0) { | 201 if (available_capacity_ == 0) { |
187 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 202 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
188 : MOJO_RESULT_SHOULD_WAIT; | 203 : MOJO_RESULT_SHOULD_WAIT; |
189 } | 204 } |
190 | 205 |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
229 NotifyWrite(num_bytes_written); | 244 NotifyWrite(num_bytes_written); |
230 } | 245 } |
231 | 246 |
232 in_two_phase_write_ = false; | 247 in_two_phase_write_ = false; |
233 | 248 |
234 // If we're now writable, we *became* writable (since we weren't writable | 249 // If we're now writable, we *became* writable (since we weren't writable |
235 // during the two-phase write), so awake producer awakables. | 250 // during the two-phase write), so awake producer awakables. |
236 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 251 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
237 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | 252 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
238 awakable_list_.AwakeForStateChange(new_state); | 253 awakable_list_.AwakeForStateChange(new_state); |
239 watchers_.NotifyState(new_state); | |
240 | 254 |
241 return rv; | 255 return rv; |
242 } | 256 } |
243 | 257 |
244 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { | 258 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
245 base::AutoLock lock(lock_); | 259 base::AutoLock lock(lock_); |
246 return GetHandleSignalsStateNoLock(); | 260 return GetHandleSignalsStateNoLock(); |
247 } | 261 } |
248 | 262 |
249 MojoResult DataPipeProducerDispatcher::AddWatcherRef( | |
250 const scoped_refptr<WatcherDispatcher>& watcher, | |
251 uintptr_t context) { | |
252 base::AutoLock lock(lock_); | |
253 if (is_closed_ || in_transit_) | |
254 return MOJO_RESULT_INVALID_ARGUMENT; | |
255 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | |
256 } | |
257 | |
258 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( | |
259 WatcherDispatcher* watcher, | |
260 uintptr_t context) { | |
261 base::AutoLock lock(lock_); | |
262 if (is_closed_ || in_transit_) | |
263 return MOJO_RESULT_INVALID_ARGUMENT; | |
264 return watchers_.Remove(watcher, context); | |
265 } | |
266 | |
267 MojoResult DataPipeProducerDispatcher::AddAwakable( | 263 MojoResult DataPipeProducerDispatcher::AddAwakable( |
268 Awakable* awakable, | 264 Awakable* awakable, |
269 MojoHandleSignals signals, | 265 MojoHandleSignals signals, |
270 uintptr_t context, | 266 uintptr_t context, |
271 HandleSignalsState* signals_state) { | 267 HandleSignalsState* signals_state) { |
272 base::AutoLock lock(lock_); | 268 base::AutoLock lock(lock_); |
273 if (!shared_ring_buffer_ || in_transit_) { | 269 if (!shared_ring_buffer_ || in_transit_) { |
274 if (signals_state) | 270 if (signals_state) |
275 *signals_state = HandleSignalsState(); | 271 *signals_state = HandleSignalsState(); |
276 return MOJO_RESULT_INVALID_ARGUMENT; | 272 return MOJO_RESULT_INVALID_ARGUMENT; |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
353 in_transit_ = false; | 349 in_transit_ = false; |
354 ignore_result(buffer_handle_for_transit_.release()); | 350 ignore_result(buffer_handle_for_transit_.release()); |
355 CloseNoLock(); | 351 CloseNoLock(); |
356 } | 352 } |
357 | 353 |
358 void DataPipeProducerDispatcher::CancelTransit() { | 354 void DataPipeProducerDispatcher::CancelTransit() { |
359 base::AutoLock lock(lock_); | 355 base::AutoLock lock(lock_); |
360 DCHECK(in_transit_); | 356 DCHECK(in_transit_); |
361 in_transit_ = false; | 357 in_transit_ = false; |
362 buffer_handle_for_transit_.reset(); | 358 buffer_handle_for_transit_.reset(); |
363 | 359 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
364 HandleSignalsState state = GetHandleSignalsStateNoLock(); | |
365 awakable_list_.AwakeForStateChange(state); | |
366 watchers_.NotifyState(state); | |
367 } | 360 } |
368 | 361 |
369 // static | 362 // static |
370 scoped_refptr<DataPipeProducerDispatcher> | 363 scoped_refptr<DataPipeProducerDispatcher> |
371 DataPipeProducerDispatcher::Deserialize(const void* data, | 364 DataPipeProducerDispatcher::Deserialize(const void* data, |
372 size_t num_bytes, | 365 size_t num_bytes, |
373 const ports::PortName* ports, | 366 const ports::PortName* ports, |
374 size_t num_ports, | 367 size_t num_ports, |
375 PlatformHandle* handles, | 368 PlatformHandle* handles, |
376 size_t num_handles) { | 369 size_t num_handles) { |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
440 | 433 |
441 MojoResult DataPipeProducerDispatcher::CloseNoLock() { | 434 MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
442 lock_.AssertAcquired(); | 435 lock_.AssertAcquired(); |
443 if (is_closed_ || in_transit_) | 436 if (is_closed_ || in_transit_) |
444 return MOJO_RESULT_INVALID_ARGUMENT; | 437 return MOJO_RESULT_INVALID_ARGUMENT; |
445 is_closed_ = true; | 438 is_closed_ = true; |
446 ring_buffer_mapping_.reset(); | 439 ring_buffer_mapping_.reset(); |
447 shared_ring_buffer_ = nullptr; | 440 shared_ring_buffer_ = nullptr; |
448 | 441 |
449 awakable_list_.CancelAll(); | 442 awakable_list_.CancelAll(); |
450 watchers_.NotifyClosed(); | |
451 if (!transferred_) { | 443 if (!transferred_) { |
452 base::AutoUnlock unlock(lock_); | 444 base::AutoUnlock unlock(lock_); |
453 node_controller_->ClosePort(control_port_); | 445 node_controller_->ClosePort(control_port_); |
454 } | 446 } |
455 | 447 |
456 return MOJO_RESULT_OK; | 448 return MOJO_RESULT_OK; |
457 } | 449 } |
458 | 450 |
459 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() | 451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
460 const { | 452 const { |
461 lock_.AssertAcquired(); | 453 lock_.AssertAcquired(); |
462 HandleSignalsState rv; | 454 HandleSignalsState rv; |
463 if (!peer_closed_) { | 455 if (!peer_closed_) { |
464 if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0) | 456 if (!in_two_phase_write_ && shared_ring_buffer_ && |
| 457 available_capacity_ > 0) |
465 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 458 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
466 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 459 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
467 } else { | 460 } else { |
468 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 461 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
469 } | 462 } |
470 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 463 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
471 return rv; | 464 return rv; |
472 } | 465 } |
473 | 466 |
474 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { | 467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
541 << m->num_bytes << " bytes were read. [control_port=" | 534 << m->num_bytes << " bytes were read. [control_port=" |
542 << control_port_.name() << "]"; | 535 << control_port_.name() << "]"; |
543 | 536 |
544 available_capacity_ += m->num_bytes; | 537 available_capacity_ += m->num_bytes; |
545 } | 538 } |
546 } while (message); | 539 } while (message); |
547 } | 540 } |
548 | 541 |
549 if (peer_closed_ != was_peer_closed || | 542 if (peer_closed_ != was_peer_closed || |
550 available_capacity_ != previous_capacity) { | 543 available_capacity_ != previous_capacity) { |
551 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
552 awakable_list_.AwakeForStateChange(state); | |
553 watchers_.NotifyState(state); | |
554 } | 545 } |
555 } | 546 } |
556 | 547 |
557 } // namespace edk | 548 } // namespace edk |
558 } // namespace mojo | 549 } // namespace mojo |
OLD | NEW |