OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
73 NodeController* node_controller, | 73 NodeController* node_controller, |
74 const ports::PortRef& control_port, | 74 const ports::PortRef& control_port, |
75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, | 75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
76 const MojoCreateDataPipeOptions& options, | 76 const MojoCreateDataPipeOptions& options, |
77 bool initialized, | 77 bool initialized, |
78 uint64_t pipe_id) | 78 uint64_t pipe_id) |
79 : options_(options), | 79 : options_(options), |
80 node_controller_(node_controller), | 80 node_controller_(node_controller), |
81 control_port_(control_port), | 81 control_port_(control_port), |
82 pipe_id_(pipe_id), | 82 pipe_id_(pipe_id), |
83 watchers_(this), | |
83 shared_ring_buffer_(shared_ring_buffer) { | 84 shared_ring_buffer_(shared_ring_buffer) { |
84 if (initialized) { | 85 if (initialized) { |
85 base::AutoLock lock(lock_); | 86 base::AutoLock lock(lock_); |
86 InitializeNoLock(); | 87 InitializeNoLock(); |
87 } | 88 } |
88 } | 89 } |
89 | 90 |
90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { | 91 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { |
91 return Type::DATA_PIPE_CONSUMER; | 92 return Type::DATA_PIPE_CONSUMER; |
92 } | 93 } |
93 | 94 |
94 MojoResult DataPipeConsumerDispatcher::Close() { | 95 MojoResult DataPipeConsumerDispatcher::Close() { |
95 base::AutoLock lock(lock_); | 96 base::AutoLock lock(lock_); |
96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; | 97 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; |
97 return CloseNoLock(); | 98 return CloseNoLock(); |
98 } | 99 } |
99 | 100 |
100 | |
101 MojoResult DataPipeConsumerDispatcher::Watch( | |
102 MojoHandleSignals signals, | |
103 const Watcher::WatchCallback& callback, | |
104 uintptr_t context) { | |
105 base::AutoLock lock(lock_); | |
106 | |
107 if (is_closed_ || in_transit_) | |
108 return MOJO_RESULT_INVALID_ARGUMENT; | |
109 | |
110 return awakable_list_.AddWatcher( | |
111 signals, callback, context, GetHandleSignalsStateNoLock()); | |
112 } | |
113 | |
114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { | |
115 base::AutoLock lock(lock_); | |
116 | |
117 if (is_closed_ || in_transit_) | |
118 return MOJO_RESULT_INVALID_ARGUMENT; | |
119 | |
120 return awakable_list_.RemoveWatcher(context); | |
121 } | |
122 | |
123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, | 101 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
124 uint32_t* num_bytes, | 102 uint32_t* num_bytes, |
125 MojoReadDataFlags flags) { | 103 MojoReadDataFlags flags) { |
126 base::AutoLock lock(lock_); | 104 base::AutoLock lock(lock_); |
127 if (!shared_ring_buffer_ || in_transit_) | 105 if (!shared_ring_buffer_ || in_transit_) |
128 return MOJO_RESULT_INVALID_ARGUMENT; | 106 return MOJO_RESULT_INVALID_ARGUMENT; |
129 | 107 |
130 if (in_two_phase_read_) | 108 if (in_two_phase_read_) |
131 return MOJO_RESULT_BUSY; | 109 return MOJO_RESULT_BUSY; |
132 | 110 |
(...skipping 20 matching lines...) Expand all Loading... | |
153 | 131 |
154 uint32_t max_num_bytes_to_read = *num_bytes; | 132 uint32_t max_num_bytes_to_read = *num_bytes; |
155 if (max_num_bytes_to_read % options_.element_num_bytes != 0) | 133 if (max_num_bytes_to_read % options_.element_num_bytes != 0) |
156 return MOJO_RESULT_INVALID_ARGUMENT; | 134 return MOJO_RESULT_INVALID_ARGUMENT; |
157 | 135 |
158 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; | 136 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; |
159 uint32_t min_num_bytes_to_read = | 137 uint32_t min_num_bytes_to_read = |
160 all_or_none ? max_num_bytes_to_read : 0; | 138 all_or_none ? max_num_bytes_to_read : 0; |
161 | 139 |
162 if (min_num_bytes_to_read > bytes_available_) { | 140 if (min_num_bytes_to_read > bytes_available_) { |
141 if (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL) { | |
142 suppress_readable_signal_ = true; | |
143 watchers_.NotifyState(GetHandleSignalsStateNoLock()); | |
144 } | |
163 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 145 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
164 : MOJO_RESULT_OUT_OF_RANGE; | 146 : MOJO_RESULT_OUT_OF_RANGE; |
165 } | 147 } |
166 | 148 |
167 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); | 149 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); |
168 if (bytes_to_read == 0) { | 150 if (bytes_to_read == 0) { |
169 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 151 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
170 : MOJO_RESULT_SHOULD_WAIT; | 152 : MOJO_RESULT_SHOULD_WAIT; |
171 } | 153 } |
172 | 154 |
(...skipping 17 matching lines...) Expand all Loading... | |
190 | 172 |
191 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); | 173 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); |
192 if (discard || !peek) { | 174 if (discard || !peek) { |
193 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; | 175 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; |
194 bytes_available_ -= bytes_to_read; | 176 bytes_available_ -= bytes_to_read; |
195 | 177 |
196 base::AutoUnlock unlock(lock_); | 178 base::AutoUnlock unlock(lock_); |
197 NotifyRead(bytes_to_read); | 179 NotifyRead(bytes_to_read); |
198 } | 180 } |
199 | 181 |
182 // We may have just read the last available data and thus changed the signals | |
183 // state. | |
184 watchers_.NotifyState(GetHandleSignalsStateNoLock()); | |
185 | |
200 return MOJO_RESULT_OK; | 186 return MOJO_RESULT_OK; |
201 } | 187 } |
202 | 188 |
203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, | 189 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
204 uint32_t* buffer_num_bytes, | 190 uint32_t* buffer_num_bytes, |
205 MojoReadDataFlags flags) { | 191 MojoReadDataFlags flags) { |
206 base::AutoLock lock(lock_); | 192 base::AutoLock lock(lock_); |
207 if (!shared_ring_buffer_ || in_transit_) | 193 if (!shared_ring_buffer_ || in_transit_) |
208 return MOJO_RESULT_INVALID_ARGUMENT; | 194 return MOJO_RESULT_INVALID_ARGUMENT; |
209 | 195 |
210 if (in_two_phase_read_) | 196 if (in_two_phase_read_) |
211 return MOJO_RESULT_BUSY; | 197 return MOJO_RESULT_BUSY; |
212 | 198 |
213 // These flags may not be used in two-phase mode. | 199 // These flags may not be used in two-phase mode. |
214 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || | 200 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
215 (flags & MOJO_READ_DATA_FLAG_QUERY) || | 201 (flags & MOJO_READ_DATA_FLAG_QUERY) || |
216 (flags & MOJO_READ_DATA_FLAG_PEEK)) | 202 (flags & MOJO_READ_DATA_FLAG_PEEK) || |
203 (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL)) | |
217 return MOJO_RESULT_INVALID_ARGUMENT; | 204 return MOJO_RESULT_INVALID_ARGUMENT; |
218 | 205 |
219 if (bytes_available_ == 0) { | 206 if (bytes_available_ == 0) { |
220 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | 207 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
221 : MOJO_RESULT_SHOULD_WAIT; | 208 : MOJO_RESULT_SHOULD_WAIT; |
222 } | 209 } |
223 | 210 |
224 DCHECK_LT(read_offset_, options_.capacity_num_bytes); | 211 DCHECK_LT(read_offset_, options_.capacity_num_bytes); |
225 uint32_t bytes_to_read = std::min(bytes_available_, | 212 uint32_t bytes_to_read = std::min(bytes_available_, |
226 options_.capacity_num_bytes - read_offset_); | 213 options_.capacity_num_bytes - read_offset_); |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
263 base::AutoUnlock unlock(lock_); | 250 base::AutoUnlock unlock(lock_); |
264 NotifyRead(num_bytes_read); | 251 NotifyRead(num_bytes_read); |
265 } | 252 } |
266 | 253 |
267 in_two_phase_read_ = false; | 254 in_two_phase_read_ = false; |
268 two_phase_max_bytes_read_ = 0; | 255 two_phase_max_bytes_read_ = 0; |
269 | 256 |
270 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 257 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
271 if (!new_state.equals(old_state)) | 258 if (!new_state.equals(old_state)) |
272 awakable_list_.AwakeForStateChange(new_state); | 259 awakable_list_.AwakeForStateChange(new_state); |
260 watchers_.NotifyState(new_state); | |
273 | 261 |
274 return rv; | 262 return rv; |
275 } | 263 } |
276 | 264 |
277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { | 265 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
278 base::AutoLock lock(lock_); | 266 base::AutoLock lock(lock_); |
279 return GetHandleSignalsStateNoLock(); | 267 return GetHandleSignalsStateNoLock(); |
280 } | 268 } |
281 | 269 |
270 MojoResult DataPipeConsumerDispatcher::AddWatcherRef( | |
271 const scoped_refptr<WatcherDispatcher>& watcher, | |
272 uintptr_t context) { | |
273 base::AutoLock lock(lock_); | |
274 if (is_closed_ || in_transit_) | |
275 return MOJO_RESULT_INVALID_ARGUMENT; | |
276 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | |
277 } | |
278 | |
279 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( | |
280 WatcherDispatcher* watcher, | |
281 uintptr_t context) { | |
282 base::AutoLock lock(lock_); | |
283 if (is_closed_ || in_transit_) | |
284 return MOJO_RESULT_INVALID_ARGUMENT; | |
285 return watchers_.Remove(watcher, context); | |
286 } | |
287 | |
282 MojoResult DataPipeConsumerDispatcher::AddAwakable( | 288 MojoResult DataPipeConsumerDispatcher::AddAwakable( |
283 Awakable* awakable, | 289 Awakable* awakable, |
284 MojoHandleSignals signals, | 290 MojoHandleSignals signals, |
285 uintptr_t context, | 291 uintptr_t context, |
286 HandleSignalsState* signals_state) { | 292 HandleSignalsState* signals_state) { |
287 base::AutoLock lock(lock_); | 293 base::AutoLock lock(lock_); |
288 if (!shared_ring_buffer_ || in_transit_) { | 294 if (!shared_ring_buffer_ || in_transit_) { |
289 if (signals_state) | 295 if (signals_state) |
290 *signals_state = HandleSignalsState(); | 296 *signals_state = HandleSignalsState(); |
291 return MOJO_RESULT_INVALID_ARGUMENT; | 297 return MOJO_RESULT_INVALID_ARGUMENT; |
(...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
453 | 459 |
454 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { | 460 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
455 lock_.AssertAcquired(); | 461 lock_.AssertAcquired(); |
456 if (is_closed_ || in_transit_) | 462 if (is_closed_ || in_transit_) |
457 return MOJO_RESULT_INVALID_ARGUMENT; | 463 return MOJO_RESULT_INVALID_ARGUMENT; |
458 is_closed_ = true; | 464 is_closed_ = true; |
459 ring_buffer_mapping_.reset(); | 465 ring_buffer_mapping_.reset(); |
460 shared_ring_buffer_ = nullptr; | 466 shared_ring_buffer_ = nullptr; |
461 | 467 |
462 awakable_list_.CancelAll(); | 468 awakable_list_.CancelAll(); |
469 watchers_.NotifyClosed(); | |
463 if (!transferred_) { | 470 if (!transferred_) { |
464 base::AutoUnlock unlock(lock_); | 471 base::AutoUnlock unlock(lock_); |
465 node_controller_->ClosePort(control_port_); | 472 node_controller_->ClosePort(control_port_); |
466 } | 473 } |
467 | 474 |
468 return MOJO_RESULT_OK; | 475 return MOJO_RESULT_OK; |
469 } | 476 } |
470 | 477 |
471 HandleSignalsState | 478 HandleSignalsState |
472 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { | 479 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
473 lock_.AssertAcquired(); | 480 lock_.AssertAcquired(); |
474 | 481 |
475 HandleSignalsState rv; | 482 HandleSignalsState rv; |
476 if (shared_ring_buffer_ && bytes_available_) { | 483 if (shared_ring_buffer_ && bytes_available_) { |
477 if (!in_two_phase_read_) | 484 if (!in_two_phase_read_ && bytes_available_ && !suppress_readable_signal_) |
yzshen1
2017/03/11 00:44:58
nit: there is no need to check |bytes_available_|
Ken Rockot(use gerrit already)
2017/03/12 22:24:13
Done
| |
478 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
479 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
480 } else if (!peer_closed_ && shared_ring_buffer_) { | 487 } else if (!peer_closed_ && shared_ring_buffer_) { |
481 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 488 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
482 } | 489 } |
483 | 490 |
484 if (peer_closed_) | 491 if (peer_closed_) |
485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 492 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
487 return rv; | 494 return rv; |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
558 << m->num_bytes << " bytes were written. [control_port=" | 565 << m->num_bytes << " bytes were written. [control_port=" |
559 << control_port_.name() << "]"; | 566 << control_port_.name() << "]"; |
560 | 567 |
561 bytes_available_ += m->num_bytes; | 568 bytes_available_ += m->num_bytes; |
562 } | 569 } |
563 } while (message); | 570 } while (message); |
564 } | 571 } |
565 | 572 |
566 if (peer_closed_ != was_peer_closed || | 573 if (peer_closed_ != was_peer_closed || |
567 bytes_available_ != previous_bytes_available) { | 574 bytes_available_ != previous_bytes_available) { |
568 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 575 suppress_readable_signal_ = false; |
576 HandleSignalsState state = GetHandleSignalsStateNoLock(); | |
577 awakable_list_.AwakeForStateChange(state); | |
578 watchers_.NotifyState(state); | |
569 } | 579 } |
570 } | 580 } |
571 | 581 |
572 } // namespace edk | 582 } // namespace edk |
573 } // namespace mojo | 583 } // namespace mojo |
OLD | NEW |