OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
72 NodeController* node_controller, | 72 NodeController* node_controller, |
73 const ports::PortRef& control_port, | 73 const ports::PortRef& control_port, |
74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, | 74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
75 const MojoCreateDataPipeOptions& options, | 75 const MojoCreateDataPipeOptions& options, |
76 bool initialized, | 76 bool initialized, |
77 uint64_t pipe_id) | 77 uint64_t pipe_id) |
78 : options_(options), | 78 : options_(options), |
79 node_controller_(node_controller), | 79 node_controller_(node_controller), |
80 control_port_(control_port), | 80 control_port_(control_port), |
81 pipe_id_(pipe_id), | 81 pipe_id_(pipe_id), |
| 82 watchers_(this), |
82 shared_ring_buffer_(shared_ring_buffer), | 83 shared_ring_buffer_(shared_ring_buffer), |
83 available_capacity_(options_.capacity_num_bytes) { | 84 available_capacity_(options_.capacity_num_bytes) { |
84 if (initialized) { | 85 if (initialized) { |
85 base::AutoLock lock(lock_); | 86 base::AutoLock lock(lock_); |
86 InitializeNoLock(); | 87 InitializeNoLock(); |
87 } | 88 } |
88 } | 89 } |
89 | 90 |
90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | 91 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
91 return Type::DATA_PIPE_PRODUCER; | 92 return Type::DATA_PIPE_PRODUCER; |
92 } | 93 } |
93 | 94 |
94 MojoResult DataPipeProducerDispatcher::Close() { | 95 MojoResult DataPipeProducerDispatcher::Close() { |
95 base::AutoLock lock(lock_); | 96 base::AutoLock lock(lock_); |
96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; | 97 DVLOG(1) << "Closing data pipe producer " << pipe_id_; |
97 return CloseNoLock(); | 98 return CloseNoLock(); |
98 } | 99 } |
99 | 100 |
100 MojoResult DataPipeProducerDispatcher::Watch( | |
101 MojoHandleSignals signals, | |
102 const Watcher::WatchCallback& callback, | |
103 uintptr_t context) { | |
104 base::AutoLock lock(lock_); | |
105 | |
106 if (is_closed_ || in_transit_) | |
107 return MOJO_RESULT_INVALID_ARGUMENT; | |
108 | |
109 return awakable_list_.AddWatcher( | |
110 signals, callback, context, GetHandleSignalsStateNoLock()); | |
111 } | |
112 | |
113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { | |
114 base::AutoLock lock(lock_); | |
115 | |
116 if (is_closed_ || in_transit_) | |
117 return MOJO_RESULT_INVALID_ARGUMENT; | |
118 | |
119 return awakable_list_.RemoveWatcher(context); | |
120 } | |
121 | |
122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, | 101 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
123 uint32_t* num_bytes, | 102 uint32_t* num_bytes, |
124 MojoWriteDataFlags flags) { | 103 MojoWriteDataFlags flags) { |
125 base::AutoLock lock(lock_); | 104 base::AutoLock lock(lock_); |
126 if (!shared_ring_buffer_ || in_transit_) | 105 if (!shared_ring_buffer_ || in_transit_) |
127 return MOJO_RESULT_INVALID_ARGUMENT; | 106 return MOJO_RESULT_INVALID_ARGUMENT; |
128 | 107 |
129 if (in_two_phase_write_) | 108 if (in_two_phase_write_) |
130 return MOJO_RESULT_BUSY; | 109 return MOJO_RESULT_BUSY; |
131 | 110 |
132 if (peer_closed_) | 111 if (peer_closed_) |
133 return MOJO_RESULT_FAILED_PRECONDITION; | 112 return MOJO_RESULT_FAILED_PRECONDITION; |
134 | 113 |
135 if (*num_bytes % options_.element_num_bytes != 0) | 114 if (*num_bytes % options_.element_num_bytes != 0) |
136 return MOJO_RESULT_INVALID_ARGUMENT; | 115 return MOJO_RESULT_INVALID_ARGUMENT; |
137 if (*num_bytes == 0) | 116 if (*num_bytes == 0) |
138 return MOJO_RESULT_OK; // Nothing to do. | 117 return MOJO_RESULT_OK; // Nothing to do. |
139 | 118 |
140 if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) && | 119 if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) && |
141 (*num_bytes > available_capacity_)) { | 120 (*num_bytes > available_capacity_)) { |
| 121 if (flags & MOJO_WRITE_DATA_FLAG_CLEAR_SIGNAL) { |
| 122 suppress_writable_signal_ = true; |
| 123 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 124 } |
142 // Don't return "should wait" since you can't wait for a specified amount of | 125 // Don't return "should wait" since you can't wait for a specified amount of |
143 // data. | 126 // data. |
| 127 // |
| 128 // TODO(rockot): Reconsider this case now that we have support for minimum |
| 129 // signaling capacity. |
144 return MOJO_RESULT_OUT_OF_RANGE; | 130 return MOJO_RESULT_OUT_OF_RANGE; |
145 } | 131 } |
146 | 132 |
147 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); | 133 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); |
148 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); | 134 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); |
149 if (num_bytes_to_write == 0) | 135 if (num_bytes_to_write == 0) |
150 return MOJO_RESULT_SHOULD_WAIT; | 136 return MOJO_RESULT_SHOULD_WAIT; |
151 | 137 |
152 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); | 138 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); |
153 | 139 |
(...skipping 18 matching lines...) Expand all Loading... |
172 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); | 158 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
173 | 159 |
174 DCHECK_LE(num_bytes_to_write, available_capacity_); | 160 DCHECK_LE(num_bytes_to_write, available_capacity_); |
175 available_capacity_ -= num_bytes_to_write; | 161 available_capacity_ -= num_bytes_to_write; |
176 write_offset_ = (write_offset_ + num_bytes_to_write) % | 162 write_offset_ = (write_offset_ + num_bytes_to_write) % |
177 options_.capacity_num_bytes; | 163 options_.capacity_num_bytes; |
178 | 164 |
179 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 165 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
180 if (!new_state.equals(old_state)) | 166 if (!new_state.equals(old_state)) |
181 awakable_list_.AwakeForStateChange(new_state); | 167 awakable_list_.AwakeForStateChange(new_state); |
| 168 watchers_.NotifyState(new_state); |
182 | 169 |
183 base::AutoUnlock unlock(lock_); | 170 base::AutoUnlock unlock(lock_); |
184 NotifyWrite(num_bytes_to_write); | 171 NotifyWrite(num_bytes_to_write); |
185 | 172 |
186 return MOJO_RESULT_OK; | 173 return MOJO_RESULT_OK; |
187 } | 174 } |
188 | 175 |
189 MojoResult DataPipeProducerDispatcher::BeginWriteData( | 176 MojoResult DataPipeProducerDispatcher::BeginWriteData( |
190 void** buffer, | 177 void** buffer, |
191 uint32_t* buffer_num_bytes, | 178 uint32_t* buffer_num_bytes, |
192 MojoWriteDataFlags flags) { | 179 MojoWriteDataFlags flags) { |
193 base::AutoLock lock(lock_); | 180 base::AutoLock lock(lock_); |
194 if (!shared_ring_buffer_ || in_transit_) | 181 if (!shared_ring_buffer_ || in_transit_) |
195 return MOJO_RESULT_INVALID_ARGUMENT; | 182 return MOJO_RESULT_INVALID_ARGUMENT; |
| 183 |
| 184 // These flags may not be used in two-phase mode. |
| 185 if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) || |
| 186 (flags & MOJO_WRITE_DATA_FLAG_CLEAR_SIGNAL)) |
| 187 return MOJO_RESULT_INVALID_ARGUMENT; |
| 188 |
196 if (in_two_phase_write_) | 189 if (in_two_phase_write_) |
197 return MOJO_RESULT_BUSY; | 190 return MOJO_RESULT_BUSY; |
198 if (peer_closed_) | 191 if (peer_closed_) |
199 return MOJO_RESULT_FAILED_PRECONDITION; | 192 return MOJO_RESULT_FAILED_PRECONDITION; |
200 | 193 |
201 if (available_capacity_ == 0) { | 194 if (available_capacity_ == 0) { |
202 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 195 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
203 : MOJO_RESULT_SHOULD_WAIT; | 196 : MOJO_RESULT_SHOULD_WAIT; |
204 } | 197 } |
205 | 198 |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
244 NotifyWrite(num_bytes_written); | 237 NotifyWrite(num_bytes_written); |
245 } | 238 } |
246 | 239 |
247 in_two_phase_write_ = false; | 240 in_two_phase_write_ = false; |
248 | 241 |
249 // If we're now writable, we *became* writable (since we weren't writable | 242 // If we're now writable, we *became* writable (since we weren't writable |
250 // during the two-phase write), so awake producer awakables. | 243 // during the two-phase write), so awake producer awakables. |
251 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 244 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
252 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | 245 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
253 awakable_list_.AwakeForStateChange(new_state); | 246 awakable_list_.AwakeForStateChange(new_state); |
| 247 watchers_.NotifyState(new_state); |
254 | 248 |
255 return rv; | 249 return rv; |
256 } | 250 } |
257 | 251 |
258 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { | 252 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
259 base::AutoLock lock(lock_); | 253 base::AutoLock lock(lock_); |
260 return GetHandleSignalsStateNoLock(); | 254 return GetHandleSignalsStateNoLock(); |
261 } | 255 } |
262 | 256 |
| 257 MojoResult DataPipeProducerDispatcher::AddWatcherRef( |
| 258 const scoped_refptr<WatcherDispatcher>& watcher, |
| 259 uintptr_t context) { |
| 260 base::AutoLock lock(lock_); |
| 261 if (is_closed_ || in_transit_) |
| 262 return MOJO_RESULT_INVALID_ARGUMENT; |
| 263 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
| 264 } |
| 265 |
| 266 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( |
| 267 WatcherDispatcher* watcher, |
| 268 uintptr_t context) { |
| 269 base::AutoLock lock(lock_); |
| 270 if (is_closed_ || in_transit_) |
| 271 return MOJO_RESULT_INVALID_ARGUMENT; |
| 272 return watchers_.Remove(watcher, context); |
| 273 } |
| 274 |
263 MojoResult DataPipeProducerDispatcher::AddAwakable( | 275 MojoResult DataPipeProducerDispatcher::AddAwakable( |
264 Awakable* awakable, | 276 Awakable* awakable, |
265 MojoHandleSignals signals, | 277 MojoHandleSignals signals, |
266 uintptr_t context, | 278 uintptr_t context, |
267 HandleSignalsState* signals_state) { | 279 HandleSignalsState* signals_state) { |
268 base::AutoLock lock(lock_); | 280 base::AutoLock lock(lock_); |
269 if (!shared_ring_buffer_ || in_transit_) { | 281 if (!shared_ring_buffer_ || in_transit_) { |
270 if (signals_state) | 282 if (signals_state) |
271 *signals_state = HandleSignalsState(); | 283 *signals_state = HandleSignalsState(); |
272 return MOJO_RESULT_INVALID_ARGUMENT; | 284 return MOJO_RESULT_INVALID_ARGUMENT; |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
349 in_transit_ = false; | 361 in_transit_ = false; |
350 ignore_result(buffer_handle_for_transit_.release()); | 362 ignore_result(buffer_handle_for_transit_.release()); |
351 CloseNoLock(); | 363 CloseNoLock(); |
352 } | 364 } |
353 | 365 |
354 void DataPipeProducerDispatcher::CancelTransit() { | 366 void DataPipeProducerDispatcher::CancelTransit() { |
355 base::AutoLock lock(lock_); | 367 base::AutoLock lock(lock_); |
356 DCHECK(in_transit_); | 368 DCHECK(in_transit_); |
357 in_transit_ = false; | 369 in_transit_ = false; |
358 buffer_handle_for_transit_.reset(); | 370 buffer_handle_for_transit_.reset(); |
359 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 371 |
| 372 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 373 awakable_list_.AwakeForStateChange(state); |
| 374 watchers_.NotifyState(state); |
360 } | 375 } |
361 | 376 |
362 // static | 377 // static |
363 scoped_refptr<DataPipeProducerDispatcher> | 378 scoped_refptr<DataPipeProducerDispatcher> |
364 DataPipeProducerDispatcher::Deserialize(const void* data, | 379 DataPipeProducerDispatcher::Deserialize(const void* data, |
365 size_t num_bytes, | 380 size_t num_bytes, |
366 const ports::PortName* ports, | 381 const ports::PortName* ports, |
367 size_t num_ports, | 382 size_t num_ports, |
368 PlatformHandle* handles, | 383 PlatformHandle* handles, |
369 size_t num_handles) { | 384 size_t num_handles) { |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
433 | 448 |
434 MojoResult DataPipeProducerDispatcher::CloseNoLock() { | 449 MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
435 lock_.AssertAcquired(); | 450 lock_.AssertAcquired(); |
436 if (is_closed_ || in_transit_) | 451 if (is_closed_ || in_transit_) |
437 return MOJO_RESULT_INVALID_ARGUMENT; | 452 return MOJO_RESULT_INVALID_ARGUMENT; |
438 is_closed_ = true; | 453 is_closed_ = true; |
439 ring_buffer_mapping_.reset(); | 454 ring_buffer_mapping_.reset(); |
440 shared_ring_buffer_ = nullptr; | 455 shared_ring_buffer_ = nullptr; |
441 | 456 |
442 awakable_list_.CancelAll(); | 457 awakable_list_.CancelAll(); |
| 458 watchers_.NotifyClosed(); |
443 if (!transferred_) { | 459 if (!transferred_) { |
444 base::AutoUnlock unlock(lock_); | 460 base::AutoUnlock unlock(lock_); |
445 node_controller_->ClosePort(control_port_); | 461 node_controller_->ClosePort(control_port_); |
446 } | 462 } |
447 | 463 |
448 return MOJO_RESULT_OK; | 464 return MOJO_RESULT_OK; |
449 } | 465 } |
450 | 466 |
451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() | 467 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
452 const { | 468 const { |
453 lock_.AssertAcquired(); | 469 lock_.AssertAcquired(); |
454 HandleSignalsState rv; | 470 HandleSignalsState rv; |
455 if (!peer_closed_) { | 471 if (!peer_closed_) { |
456 if (!in_two_phase_write_ && shared_ring_buffer_ && | 472 if (!in_two_phase_write_ && shared_ring_buffer_ && |
457 available_capacity_ > 0) | 473 available_capacity_ > 0 && !suppress_writable_signal_) |
458 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 474 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
459 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 475 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
460 } else { | 476 } else { |
461 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 477 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
462 } | 478 } |
463 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 479 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
464 return rv; | 480 return rv; |
465 } | 481 } |
466 | 482 |
467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { | 483 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
534 << m->num_bytes << " bytes were read. [control_port=" | 550 << m->num_bytes << " bytes were read. [control_port=" |
535 << control_port_.name() << "]"; | 551 << control_port_.name() << "]"; |
536 | 552 |
537 available_capacity_ += m->num_bytes; | 553 available_capacity_ += m->num_bytes; |
538 } | 554 } |
539 } while (message); | 555 } while (message); |
540 } | 556 } |
541 | 557 |
542 if (peer_closed_ != was_peer_closed || | 558 if (peer_closed_ != was_peer_closed || |
543 available_capacity_ != previous_capacity) { | 559 available_capacity_ != previous_capacity) { |
544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 560 suppress_writable_signal_ = false; |
| 561 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 562 awakable_list_.AwakeForStateChange(state); |
| 563 watchers_.NotifyState(state); |
545 } | 564 } |
546 } | 565 } |
547 | 566 |
548 } // namespace edk | 567 } // namespace edk |
549 } // namespace mojo | 568 } // namespace mojo |
OLD | NEW |