Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(633)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 2750373002: Revert of Mojo: Armed Watchers (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm> 10 #include <algorithm>
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 NodeController* node_controller, 73 NodeController* node_controller,
74 const ports::PortRef& control_port, 74 const ports::PortRef& control_port,
75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
76 const MojoCreateDataPipeOptions& options, 76 const MojoCreateDataPipeOptions& options,
77 bool initialized, 77 bool initialized,
78 uint64_t pipe_id) 78 uint64_t pipe_id)
79 : options_(options), 79 : options_(options),
80 node_controller_(node_controller), 80 node_controller_(node_controller),
81 control_port_(control_port), 81 control_port_(control_port),
82 pipe_id_(pipe_id), 82 pipe_id_(pipe_id),
83 watchers_(this),
84 shared_ring_buffer_(shared_ring_buffer) { 83 shared_ring_buffer_(shared_ring_buffer) {
85 if (initialized) { 84 if (initialized) {
86 base::AutoLock lock(lock_); 85 base::AutoLock lock(lock_);
87 InitializeNoLock(); 86 InitializeNoLock();
88 } 87 }
89 } 88 }
90 89
91 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { 90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
92 return Type::DATA_PIPE_CONSUMER; 91 return Type::DATA_PIPE_CONSUMER;
93 } 92 }
94 93
95 MojoResult DataPipeConsumerDispatcher::Close() { 94 MojoResult DataPipeConsumerDispatcher::Close() {
96 base::AutoLock lock(lock_); 95 base::AutoLock lock(lock_);
97 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; 96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
98 return CloseNoLock(); 97 return CloseNoLock();
99 } 98 }
100 99
100
101 MojoResult DataPipeConsumerDispatcher::Watch(
102 MojoHandleSignals signals,
103 const Watcher::WatchCallback& callback,
104 uintptr_t context) {
105 base::AutoLock lock(lock_);
106
107 if (is_closed_ || in_transit_)
108 return MOJO_RESULT_INVALID_ARGUMENT;
109
110 return awakable_list_.AddWatcher(
111 signals, callback, context, GetHandleSignalsStateNoLock());
112 }
113
114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
115 base::AutoLock lock(lock_);
116
117 if (is_closed_ || in_transit_)
118 return MOJO_RESULT_INVALID_ARGUMENT;
119
120 return awakable_list_.RemoveWatcher(context);
121 }
122
101 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, 123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
102 uint32_t* num_bytes, 124 uint32_t* num_bytes,
103 MojoReadDataFlags flags) { 125 MojoReadDataFlags flags) {
104 base::AutoLock lock(lock_); 126 base::AutoLock lock(lock_);
127 new_data_available_ = false;
105 128
106 if (!shared_ring_buffer_ || in_transit_) 129 if (!shared_ring_buffer_ || in_transit_)
107 return MOJO_RESULT_INVALID_ARGUMENT; 130 return MOJO_RESULT_INVALID_ARGUMENT;
108 131
109 if (in_two_phase_read_) 132 if (in_two_phase_read_)
110 return MOJO_RESULT_BUSY; 133 return MOJO_RESULT_BUSY;
111 134
112 const bool had_new_data = new_data_available_;
113 new_data_available_ = false;
114
115 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { 135 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
116 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || 136 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
117 (flags & MOJO_READ_DATA_FLAG_DISCARD)) 137 (flags & MOJO_READ_DATA_FLAG_DISCARD))
118 return MOJO_RESULT_INVALID_ARGUMENT; 138 return MOJO_RESULT_INVALID_ARGUMENT;
119 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. 139 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
120 DVLOG_IF(2, elements) 140 DVLOG_IF(2, elements)
121 << "Query mode: ignoring non-null |elements|"; 141 << "Query mode: ignoring non-null |elements|";
122 *num_bytes = static_cast<uint32_t>(bytes_available_); 142 *num_bytes = static_cast<uint32_t>(bytes_available_);
123
124 if (had_new_data)
125 watchers_.NotifyState(GetHandleSignalsStateNoLock());
126 return MOJO_RESULT_OK; 143 return MOJO_RESULT_OK;
127 } 144 }
128 145
129 bool discard = false; 146 bool discard = false;
130 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { 147 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
131 // These flags are mutally exclusive. 148 // These flags are mutally exclusive.
132 if (flags & MOJO_READ_DATA_FLAG_PEEK) 149 if (flags & MOJO_READ_DATA_FLAG_PEEK)
133 return MOJO_RESULT_INVALID_ARGUMENT; 150 return MOJO_RESULT_INVALID_ARGUMENT;
134 DVLOG_IF(2, elements) 151 DVLOG_IF(2, elements)
135 << "Discard mode: ignoring non-null |elements|"; 152 << "Discard mode: ignoring non-null |elements|";
136 discard = true; 153 discard = true;
137 } 154 }
138 155
139 uint32_t max_num_bytes_to_read = *num_bytes; 156 uint32_t max_num_bytes_to_read = *num_bytes;
140 if (max_num_bytes_to_read % options_.element_num_bytes != 0) 157 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
141 return MOJO_RESULT_INVALID_ARGUMENT; 158 return MOJO_RESULT_INVALID_ARGUMENT;
142 159
143 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; 160 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
144 uint32_t min_num_bytes_to_read = 161 uint32_t min_num_bytes_to_read =
145 all_or_none ? max_num_bytes_to_read : 0; 162 all_or_none ? max_num_bytes_to_read : 0;
146 163
147 if (min_num_bytes_to_read > bytes_available_) { 164 if (min_num_bytes_to_read > bytes_available_) {
148 if (had_new_data)
149 watchers_.NotifyState(GetHandleSignalsStateNoLock());
150 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 165 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
151 : MOJO_RESULT_OUT_OF_RANGE; 166 : MOJO_RESULT_OUT_OF_RANGE;
152 } 167 }
153 168
154 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); 169 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
155 if (bytes_to_read == 0) { 170 if (bytes_to_read == 0) {
156 if (had_new_data)
157 watchers_.NotifyState(GetHandleSignalsStateNoLock());
158 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 171 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
159 : MOJO_RESULT_SHOULD_WAIT; 172 : MOJO_RESULT_SHOULD_WAIT;
160 } 173 }
161 174
162 if (!discard) { 175 if (!discard) {
163 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 176 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
164 CHECK(data); 177 CHECK(data);
165 178
166 uint8_t* destination = static_cast<uint8_t*>(elements); 179 uint8_t* destination = static_cast<uint8_t*>(elements);
167 CHECK(destination); 180 CHECK(destination);
(...skipping 11 matching lines...) Expand all
179 192
180 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); 193 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
181 if (discard || !peek) { 194 if (discard || !peek) {
182 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; 195 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
183 bytes_available_ -= bytes_to_read; 196 bytes_available_ -= bytes_to_read;
184 197
185 base::AutoUnlock unlock(lock_); 198 base::AutoUnlock unlock(lock_);
186 NotifyRead(bytes_to_read); 199 NotifyRead(bytes_to_read);
187 } 200 }
188 201
189 // We may have just read the last available data and thus changed the signals
190 // state.
191 watchers_.NotifyState(GetHandleSignalsStateNoLock());
192
193 return MOJO_RESULT_OK; 202 return MOJO_RESULT_OK;
194 } 203 }
195 204
196 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, 205 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
197 uint32_t* buffer_num_bytes, 206 uint32_t* buffer_num_bytes,
198 MojoReadDataFlags flags) { 207 MojoReadDataFlags flags) {
199 base::AutoLock lock(lock_); 208 base::AutoLock lock(lock_);
209 new_data_available_ = false;
200 if (!shared_ring_buffer_ || in_transit_) 210 if (!shared_ring_buffer_ || in_transit_)
201 return MOJO_RESULT_INVALID_ARGUMENT; 211 return MOJO_RESULT_INVALID_ARGUMENT;
202 212
203 if (in_two_phase_read_) 213 if (in_two_phase_read_)
204 return MOJO_RESULT_BUSY; 214 return MOJO_RESULT_BUSY;
205 215
206 // These flags may not be used in two-phase mode. 216 // These flags may not be used in two-phase mode.
207 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || 217 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
208 (flags & MOJO_READ_DATA_FLAG_QUERY) || 218 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
209 (flags & MOJO_READ_DATA_FLAG_PEEK)) 219 (flags & MOJO_READ_DATA_FLAG_PEEK))
210 return MOJO_RESULT_INVALID_ARGUMENT; 220 return MOJO_RESULT_INVALID_ARGUMENT;
211 221
212 const bool had_new_data = new_data_available_;
213 new_data_available_ = false;
214
215 if (bytes_available_ == 0) { 222 if (bytes_available_ == 0) {
216 if (had_new_data)
217 watchers_.NotifyState(GetHandleSignalsStateNoLock());
218 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 223 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
219 : MOJO_RESULT_SHOULD_WAIT; 224 : MOJO_RESULT_SHOULD_WAIT;
220 } 225 }
221 226
222 DCHECK_LT(read_offset_, options_.capacity_num_bytes); 227 DCHECK_LT(read_offset_, options_.capacity_num_bytes);
223 uint32_t bytes_to_read = std::min(bytes_available_, 228 uint32_t bytes_to_read = std::min(bytes_available_,
224 options_.capacity_num_bytes - read_offset_); 229 options_.capacity_num_bytes - read_offset_);
225 230
226 CHECK(ring_buffer_mapping_); 231 CHECK(ring_buffer_mapping_);
227 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 232 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
228 CHECK(data); 233 CHECK(data);
229 234
230 in_two_phase_read_ = true; 235 in_two_phase_read_ = true;
231 *buffer = data + read_offset_; 236 *buffer = data + read_offset_;
232 *buffer_num_bytes = bytes_to_read; 237 *buffer_num_bytes = bytes_to_read;
233 two_phase_max_bytes_read_ = bytes_to_read; 238 two_phase_max_bytes_read_ = bytes_to_read;
234 239
235 if (had_new_data)
236 watchers_.NotifyState(GetHandleSignalsStateNoLock());
237
238 return MOJO_RESULT_OK; 240 return MOJO_RESULT_OK;
239 } 241 }
240 242
241 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { 243 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
242 base::AutoLock lock(lock_); 244 base::AutoLock lock(lock_);
243 if (!in_two_phase_read_) 245 if (!in_two_phase_read_)
244 return MOJO_RESULT_FAILED_PRECONDITION; 246 return MOJO_RESULT_FAILED_PRECONDITION;
245 247
246 if (in_transit_) 248 if (in_transit_)
247 return MOJO_RESULT_INVALID_ARGUMENT; 249 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 16 matching lines...) Expand all
264 base::AutoUnlock unlock(lock_); 266 base::AutoUnlock unlock(lock_);
265 NotifyRead(num_bytes_read); 267 NotifyRead(num_bytes_read);
266 } 268 }
267 269
268 in_two_phase_read_ = false; 270 in_two_phase_read_ = false;
269 two_phase_max_bytes_read_ = 0; 271 two_phase_max_bytes_read_ = 0;
270 272
271 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 273 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
272 if (!new_state.equals(old_state)) 274 if (!new_state.equals(old_state))
273 awakable_list_.AwakeForStateChange(new_state); 275 awakable_list_.AwakeForStateChange(new_state);
274 watchers_.NotifyState(new_state);
275 276
276 return rv; 277 return rv;
277 } 278 }
278 279
279 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { 280 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
280 base::AutoLock lock(lock_); 281 base::AutoLock lock(lock_);
281 return GetHandleSignalsStateNoLock(); 282 return GetHandleSignalsStateNoLock();
282 } 283 }
283 284
284 MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
285 const scoped_refptr<WatcherDispatcher>& watcher,
286 uintptr_t context) {
287 base::AutoLock lock(lock_);
288 if (is_closed_ || in_transit_)
289 return MOJO_RESULT_INVALID_ARGUMENT;
290 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
291 }
292
293 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
294 WatcherDispatcher* watcher,
295 uintptr_t context) {
296 base::AutoLock lock(lock_);
297 if (is_closed_ || in_transit_)
298 return MOJO_RESULT_INVALID_ARGUMENT;
299 return watchers_.Remove(watcher, context);
300 }
301
302 MojoResult DataPipeConsumerDispatcher::AddAwakable( 285 MojoResult DataPipeConsumerDispatcher::AddAwakable(
303 Awakable* awakable, 286 Awakable* awakable,
304 MojoHandleSignals signals, 287 MojoHandleSignals signals,
305 uintptr_t context, 288 uintptr_t context,
306 HandleSignalsState* signals_state) { 289 HandleSignalsState* signals_state) {
307 base::AutoLock lock(lock_); 290 base::AutoLock lock(lock_);
308 if (!shared_ring_buffer_ || in_transit_) { 291 if (!shared_ring_buffer_ || in_transit_) {
309 if (signals_state) 292 if (signals_state)
310 *signals_state = HandleSignalsState(); 293 *signals_state = HandleSignalsState();
311 return MOJO_RESULT_INVALID_ARGUMENT; 294 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 162 matching lines...) Expand 10 before | Expand all | Expand 10 after
474 457
475 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { 458 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
476 lock_.AssertAcquired(); 459 lock_.AssertAcquired();
477 if (is_closed_ || in_transit_) 460 if (is_closed_ || in_transit_)
478 return MOJO_RESULT_INVALID_ARGUMENT; 461 return MOJO_RESULT_INVALID_ARGUMENT;
479 is_closed_ = true; 462 is_closed_ = true;
480 ring_buffer_mapping_.reset(); 463 ring_buffer_mapping_.reset();
481 shared_ring_buffer_ = nullptr; 464 shared_ring_buffer_ = nullptr;
482 465
483 awakable_list_.CancelAll(); 466 awakable_list_.CancelAll();
484 watchers_.NotifyClosed();
485 if (!transferred_) { 467 if (!transferred_) {
486 base::AutoUnlock unlock(lock_); 468 base::AutoUnlock unlock(lock_);
487 node_controller_->ClosePort(control_port_); 469 node_controller_->ClosePort(control_port_);
488 } 470 }
489 471
490 return MOJO_RESULT_OK; 472 return MOJO_RESULT_OK;
491 } 473 }
492 474
493 HandleSignalsState 475 HandleSignalsState
494 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { 476 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
592 bytes_available_ += m->num_bytes; 574 bytes_available_ += m->num_bytes;
593 } 575 }
594 } while (message); 576 } while (message);
595 } 577 }
596 578
597 bool has_new_data = bytes_available_ != previous_bytes_available; 579 bool has_new_data = bytes_available_ != previous_bytes_available;
598 if (has_new_data) 580 if (has_new_data)
599 new_data_available_ = true; 581 new_data_available_ = true;
600 582
601 if (peer_closed_ != was_peer_closed || has_new_data) { 583 if (peer_closed_ != was_peer_closed || has_new_data) {
602 HandleSignalsState state = GetHandleSignalsStateNoLock(); 584 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
603 awakable_list_.AwakeForStateChange(state);
604 watchers_.NotifyState(state);
605 } 585 }
606 } 586 }
607 587
608 } // namespace edk 588 } // namespace edk
609 } // namespace mojo 589 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.h ('k') | mojo/edk/system/data_pipe_producer_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698