Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(911)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm> 10 #include <algorithm>
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 NodeController* node_controller, 73 NodeController* node_controller,
74 const ports::PortRef& control_port, 74 const ports::PortRef& control_port,
75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
76 const MojoCreateDataPipeOptions& options, 76 const MojoCreateDataPipeOptions& options,
77 bool initialized, 77 bool initialized,
78 uint64_t pipe_id) 78 uint64_t pipe_id)
79 : options_(options), 79 : options_(options),
80 node_controller_(node_controller), 80 node_controller_(node_controller),
81 control_port_(control_port), 81 control_port_(control_port),
82 pipe_id_(pipe_id), 82 pipe_id_(pipe_id),
83 watchers_(this),
83 shared_ring_buffer_(shared_ring_buffer) { 84 shared_ring_buffer_(shared_ring_buffer) {
84 if (initialized) { 85 if (initialized) {
85 base::AutoLock lock(lock_); 86 base::AutoLock lock(lock_);
86 InitializeNoLock(); 87 InitializeNoLock();
87 } 88 }
88 } 89 }
89 90
90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { 91 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
91 return Type::DATA_PIPE_CONSUMER; 92 return Type::DATA_PIPE_CONSUMER;
92 } 93 }
93 94
94 MojoResult DataPipeConsumerDispatcher::Close() { 95 MojoResult DataPipeConsumerDispatcher::Close() {
95 base::AutoLock lock(lock_); 96 base::AutoLock lock(lock_);
96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; 97 DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
97 return CloseNoLock(); 98 return CloseNoLock();
98 } 99 }
99 100
100
101 MojoResult DataPipeConsumerDispatcher::Watch(
102 MojoHandleSignals signals,
103 const Watcher::WatchCallback& callback,
104 uintptr_t context) {
105 base::AutoLock lock(lock_);
106
107 if (is_closed_ || in_transit_)
108 return MOJO_RESULT_INVALID_ARGUMENT;
109
110 return awakable_list_.AddWatcher(
111 signals, callback, context, GetHandleSignalsStateNoLock());
112 }
113
114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
115 base::AutoLock lock(lock_);
116
117 if (is_closed_ || in_transit_)
118 return MOJO_RESULT_INVALID_ARGUMENT;
119
120 return awakable_list_.RemoveWatcher(context);
121 }
122
123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, 101 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
124 uint32_t* num_bytes, 102 uint32_t* num_bytes,
125 MojoReadDataFlags flags) { 103 MojoReadDataFlags flags) {
126 base::AutoLock lock(lock_); 104 base::AutoLock lock(lock_);
127 if (!shared_ring_buffer_ || in_transit_) 105 if (!shared_ring_buffer_ || in_transit_)
128 return MOJO_RESULT_INVALID_ARGUMENT; 106 return MOJO_RESULT_INVALID_ARGUMENT;
129 107
130 if (in_two_phase_read_) 108 if (in_two_phase_read_)
131 return MOJO_RESULT_BUSY; 109 return MOJO_RESULT_BUSY;
132 110
(...skipping 20 matching lines...) Expand all
153 131
154 uint32_t max_num_bytes_to_read = *num_bytes; 132 uint32_t max_num_bytes_to_read = *num_bytes;
155 if (max_num_bytes_to_read % options_.element_num_bytes != 0) 133 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
156 return MOJO_RESULT_INVALID_ARGUMENT; 134 return MOJO_RESULT_INVALID_ARGUMENT;
157 135
158 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; 136 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
159 uint32_t min_num_bytes_to_read = 137 uint32_t min_num_bytes_to_read =
160 all_or_none ? max_num_bytes_to_read : 0; 138 all_or_none ? max_num_bytes_to_read : 0;
161 139
162 if (min_num_bytes_to_read > bytes_available_) { 140 if (min_num_bytes_to_read > bytes_available_) {
141 if (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL) {
142 suppress_readable_signal_ = true;
143 watchers_.NotifyState(GetHandleSignalsStateNoLock());
144 }
163 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 145 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
164 : MOJO_RESULT_OUT_OF_RANGE; 146 : MOJO_RESULT_OUT_OF_RANGE;
165 } 147 }
166 148
167 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); 149 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
168 if (bytes_to_read == 0) { 150 if (bytes_to_read == 0) {
169 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 151 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
170 : MOJO_RESULT_SHOULD_WAIT; 152 : MOJO_RESULT_SHOULD_WAIT;
171 } 153 }
172 154
(...skipping 17 matching lines...) Expand all
190 172
191 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); 173 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
192 if (discard || !peek) { 174 if (discard || !peek) {
193 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; 175 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
194 bytes_available_ -= bytes_to_read; 176 bytes_available_ -= bytes_to_read;
195 177
196 base::AutoUnlock unlock(lock_); 178 base::AutoUnlock unlock(lock_);
197 NotifyRead(bytes_to_read); 179 NotifyRead(bytes_to_read);
198 } 180 }
199 181
182 // We may have just read the last available data and thus changed the signals
183 // state.
184 watchers_.NotifyState(GetHandleSignalsStateNoLock());
185
200 return MOJO_RESULT_OK; 186 return MOJO_RESULT_OK;
201 } 187 }
202 188
203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, 189 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
204 uint32_t* buffer_num_bytes, 190 uint32_t* buffer_num_bytes,
205 MojoReadDataFlags flags) { 191 MojoReadDataFlags flags) {
206 base::AutoLock lock(lock_); 192 base::AutoLock lock(lock_);
207 if (!shared_ring_buffer_ || in_transit_) 193 if (!shared_ring_buffer_ || in_transit_)
208 return MOJO_RESULT_INVALID_ARGUMENT; 194 return MOJO_RESULT_INVALID_ARGUMENT;
209 195
210 if (in_two_phase_read_) 196 if (in_two_phase_read_)
211 return MOJO_RESULT_BUSY; 197 return MOJO_RESULT_BUSY;
212 198
213 // These flags may not be used in two-phase mode. 199 // These flags may not be used in two-phase mode.
214 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || 200 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
215 (flags & MOJO_READ_DATA_FLAG_QUERY) || 201 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
216 (flags & MOJO_READ_DATA_FLAG_PEEK)) 202 (flags & MOJO_READ_DATA_FLAG_PEEK) ||
203 (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL))
217 return MOJO_RESULT_INVALID_ARGUMENT; 204 return MOJO_RESULT_INVALID_ARGUMENT;
218 205
219 if (bytes_available_ == 0) { 206 if (bytes_available_ == 0) {
220 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 207 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
221 : MOJO_RESULT_SHOULD_WAIT; 208 : MOJO_RESULT_SHOULD_WAIT;
222 } 209 }
223 210
224 DCHECK_LT(read_offset_, options_.capacity_num_bytes); 211 DCHECK_LT(read_offset_, options_.capacity_num_bytes);
225 uint32_t bytes_to_read = std::min(bytes_available_, 212 uint32_t bytes_to_read = std::min(bytes_available_,
226 options_.capacity_num_bytes - read_offset_); 213 options_.capacity_num_bytes - read_offset_);
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
263 base::AutoUnlock unlock(lock_); 250 base::AutoUnlock unlock(lock_);
264 NotifyRead(num_bytes_read); 251 NotifyRead(num_bytes_read);
265 } 252 }
266 253
267 in_two_phase_read_ = false; 254 in_two_phase_read_ = false;
268 two_phase_max_bytes_read_ = 0; 255 two_phase_max_bytes_read_ = 0;
269 256
270 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 257 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
271 if (!new_state.equals(old_state)) 258 if (!new_state.equals(old_state))
272 awakable_list_.AwakeForStateChange(new_state); 259 awakable_list_.AwakeForStateChange(new_state);
260 watchers_.NotifyState(new_state);
273 261
274 return rv; 262 return rv;
275 } 263 }
276 264
277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { 265 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
278 base::AutoLock lock(lock_); 266 base::AutoLock lock(lock_);
279 return GetHandleSignalsStateNoLock(); 267 return GetHandleSignalsStateNoLock();
280 } 268 }
281 269
270 MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
271 const scoped_refptr<WatcherDispatcher>& watcher,
272 uintptr_t context) {
273 base::AutoLock lock(lock_);
274 if (is_closed_ || in_transit_)
275 return MOJO_RESULT_INVALID_ARGUMENT;
276 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
277 }
278
279 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
280 WatcherDispatcher* watcher,
281 uintptr_t context) {
282 base::AutoLock lock(lock_);
283 if (is_closed_ || in_transit_)
284 return MOJO_RESULT_INVALID_ARGUMENT;
285 return watchers_.Remove(watcher, context);
286 }
287
282 MojoResult DataPipeConsumerDispatcher::AddAwakable( 288 MojoResult DataPipeConsumerDispatcher::AddAwakable(
283 Awakable* awakable, 289 Awakable* awakable,
284 MojoHandleSignals signals, 290 MojoHandleSignals signals,
285 uintptr_t context, 291 uintptr_t context,
286 HandleSignalsState* signals_state) { 292 HandleSignalsState* signals_state) {
287 base::AutoLock lock(lock_); 293 base::AutoLock lock(lock_);
288 if (!shared_ring_buffer_ || in_transit_) { 294 if (!shared_ring_buffer_ || in_transit_) {
289 if (signals_state) 295 if (signals_state)
290 *signals_state = HandleSignalsState(); 296 *signals_state = HandleSignalsState();
291 return MOJO_RESULT_INVALID_ARGUMENT; 297 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after
453 459
454 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { 460 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
455 lock_.AssertAcquired(); 461 lock_.AssertAcquired();
456 if (is_closed_ || in_transit_) 462 if (is_closed_ || in_transit_)
457 return MOJO_RESULT_INVALID_ARGUMENT; 463 return MOJO_RESULT_INVALID_ARGUMENT;
458 is_closed_ = true; 464 is_closed_ = true;
459 ring_buffer_mapping_.reset(); 465 ring_buffer_mapping_.reset();
460 shared_ring_buffer_ = nullptr; 466 shared_ring_buffer_ = nullptr;
461 467
462 awakable_list_.CancelAll(); 468 awakable_list_.CancelAll();
469 watchers_.NotifyClosed();
463 if (!transferred_) { 470 if (!transferred_) {
464 base::AutoUnlock unlock(lock_); 471 base::AutoUnlock unlock(lock_);
465 node_controller_->ClosePort(control_port_); 472 node_controller_->ClosePort(control_port_);
466 } 473 }
467 474
468 return MOJO_RESULT_OK; 475 return MOJO_RESULT_OK;
469 } 476 }
470 477
471 HandleSignalsState 478 HandleSignalsState
472 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { 479 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
473 lock_.AssertAcquired(); 480 lock_.AssertAcquired();
474 481
475 HandleSignalsState rv; 482 HandleSignalsState rv;
476 if (shared_ring_buffer_ && bytes_available_) { 483 if (shared_ring_buffer_ && bytes_available_) {
477 if (!in_two_phase_read_) 484 if (!in_two_phase_read_ && bytes_available_ && !suppress_readable_signal_)
yzshen1 2017/03/11 00:44:58 nit: there is no need to check |bytes_available_|
Ken Rockot(use gerrit already) 2017/03/12 22:24:13 Done
478 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
479 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
480 } else if (!peer_closed_ && shared_ring_buffer_) { 487 } else if (!peer_closed_ && shared_ring_buffer_) {
481 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 488 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
482 } 489 }
483 490
484 if (peer_closed_) 491 if (peer_closed_)
485 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 492 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
487 return rv; 494 return rv;
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
558 << m->num_bytes << " bytes were written. [control_port=" 565 << m->num_bytes << " bytes were written. [control_port="
559 << control_port_.name() << "]"; 566 << control_port_.name() << "]";
560 567
561 bytes_available_ += m->num_bytes; 568 bytes_available_ += m->num_bytes;
562 } 569 }
563 } while (message); 570 } while (message);
564 } 571 }
565 572
566 if (peer_closed_ != was_peer_closed || 573 if (peer_closed_ != was_peer_closed ||
567 bytes_available_ != previous_bytes_available) { 574 bytes_available_ != previous_bytes_available) {
568 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 575 suppress_readable_signal_ = false;
576 HandleSignalsState state = GetHandleSignalsStateNoLock();
577 awakable_list_.AwakeForStateChange(state);
578 watchers_.NotifyState(state);
569 } 579 }
570 } 580 }
571 581
572 } // namespace edk 582 } // namespace edk
573 } // namespace mojo 583 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698