Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm> 10 #include <algorithm>
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 NodeController* node_controller, 73 NodeController* node_controller,
74 const ports::PortRef& control_port, 74 const ports::PortRef& control_port,
75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, 75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
76 const MojoCreateDataPipeOptions& options, 76 const MojoCreateDataPipeOptions& options,
77 bool initialized, 77 bool initialized,
78 uint64_t pipe_id) 78 uint64_t pipe_id)
79 : options_(options), 79 : options_(options),
80 node_controller_(node_controller), 80 node_controller_(node_controller),
81 control_port_(control_port), 81 control_port_(control_port),
82 pipe_id_(pipe_id), 82 pipe_id_(pipe_id),
83 watchers_(this),
83 shared_ring_buffer_(shared_ring_buffer) { 84 shared_ring_buffer_(shared_ring_buffer) {
84 if (initialized) { 85 if (initialized) {
85 base::AutoLock lock(lock_); 86 base::AutoLock lock(lock_);
86 InitializeNoLock(); 87 InitializeNoLock();
87 } 88 }
88 } 89 }
89 90
90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { 91 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
91 return Type::DATA_PIPE_CONSUMER; 92 return Type::DATA_PIPE_CONSUMER;
92 } 93 }
93 94
94 MojoResult DataPipeConsumerDispatcher::Close() { 95 MojoResult DataPipeConsumerDispatcher::Close() {
95 base::AutoLock lock(lock_); 96 base::AutoLock lock(lock_);
96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; 97 DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
97 return CloseNoLock(); 98 return CloseNoLock();
98 } 99 }
99 100
100
101 MojoResult DataPipeConsumerDispatcher::Watch(
102 MojoHandleSignals signals,
103 const Watcher::WatchCallback& callback,
104 uintptr_t context) {
105 base::AutoLock lock(lock_);
106
107 if (is_closed_ || in_transit_)
108 return MOJO_RESULT_INVALID_ARGUMENT;
109
110 return awakable_list_.AddWatcher(
111 signals, callback, context, GetHandleSignalsStateNoLock());
112 }
113
114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
115 base::AutoLock lock(lock_);
116
117 if (is_closed_ || in_transit_)
118 return MOJO_RESULT_INVALID_ARGUMENT;
119
120 return awakable_list_.RemoveWatcher(context);
121 }
122
123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, 101 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
124 uint32_t* num_bytes, 102 uint32_t* num_bytes,
125 MojoReadDataFlags flags) { 103 MojoReadDataFlags flags) {
126 base::AutoLock lock(lock_); 104 base::AutoLock lock(lock_);
127 new_data_available_ = false;
128 105
129 if (!shared_ring_buffer_ || in_transit_) 106 if (!shared_ring_buffer_ || in_transit_)
130 return MOJO_RESULT_INVALID_ARGUMENT; 107 return MOJO_RESULT_INVALID_ARGUMENT;
131 108
132 if (in_two_phase_read_) 109 if (in_two_phase_read_)
133 return MOJO_RESULT_BUSY; 110 return MOJO_RESULT_BUSY;
134 111
112 const bool had_new_data = new_data_available_;
113 new_data_available_ = false;
114
135 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { 115 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
136 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || 116 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
137 (flags & MOJO_READ_DATA_FLAG_DISCARD)) 117 (flags & MOJO_READ_DATA_FLAG_DISCARD))
138 return MOJO_RESULT_INVALID_ARGUMENT; 118 return MOJO_RESULT_INVALID_ARGUMENT;
139 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. 119 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
140 DVLOG_IF(2, elements) 120 DVLOG_IF(2, elements)
141 << "Query mode: ignoring non-null |elements|"; 121 << "Query mode: ignoring non-null |elements|";
142 *num_bytes = static_cast<uint32_t>(bytes_available_); 122 *num_bytes = static_cast<uint32_t>(bytes_available_);
123
124 if (had_new_data)
125 watchers_.NotifyState(GetHandleSignalsStateNoLock());
143 return MOJO_RESULT_OK; 126 return MOJO_RESULT_OK;
144 } 127 }
145 128
146 bool discard = false; 129 bool discard = false;
147 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { 130 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
148 // These flags are mutally exclusive. 131 // These flags are mutally exclusive.
149 if (flags & MOJO_READ_DATA_FLAG_PEEK) 132 if (flags & MOJO_READ_DATA_FLAG_PEEK)
150 return MOJO_RESULT_INVALID_ARGUMENT; 133 return MOJO_RESULT_INVALID_ARGUMENT;
151 DVLOG_IF(2, elements) 134 DVLOG_IF(2, elements)
152 << "Discard mode: ignoring non-null |elements|"; 135 << "Discard mode: ignoring non-null |elements|";
153 discard = true; 136 discard = true;
154 } 137 }
155 138
156 uint32_t max_num_bytes_to_read = *num_bytes; 139 uint32_t max_num_bytes_to_read = *num_bytes;
157 if (max_num_bytes_to_read % options_.element_num_bytes != 0) 140 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
158 return MOJO_RESULT_INVALID_ARGUMENT; 141 return MOJO_RESULT_INVALID_ARGUMENT;
159 142
160 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; 143 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
161 uint32_t min_num_bytes_to_read = 144 uint32_t min_num_bytes_to_read =
162 all_or_none ? max_num_bytes_to_read : 0; 145 all_or_none ? max_num_bytes_to_read : 0;
163 146
164 if (min_num_bytes_to_read > bytes_available_) { 147 if (min_num_bytes_to_read > bytes_available_) {
148 if (had_new_data)
149 watchers_.NotifyState(GetHandleSignalsStateNoLock());
165 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 150 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
166 : MOJO_RESULT_OUT_OF_RANGE; 151 : MOJO_RESULT_OUT_OF_RANGE;
167 } 152 }
168 153
169 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); 154 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
170 if (bytes_to_read == 0) { 155 if (bytes_to_read == 0) {
156 if (had_new_data)
157 watchers_.NotifyState(GetHandleSignalsStateNoLock());
171 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 158 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
172 : MOJO_RESULT_SHOULD_WAIT; 159 : MOJO_RESULT_SHOULD_WAIT;
173 } 160 }
174 161
175 if (!discard) { 162 if (!discard) {
176 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 163 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
177 CHECK(data); 164 CHECK(data);
178 165
179 uint8_t* destination = static_cast<uint8_t*>(elements); 166 uint8_t* destination = static_cast<uint8_t*>(elements);
180 CHECK(destination); 167 CHECK(destination);
(...skipping 11 matching lines...) Expand all
192 179
193 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); 180 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
194 if (discard || !peek) { 181 if (discard || !peek) {
195 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; 182 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
196 bytes_available_ -= bytes_to_read; 183 bytes_available_ -= bytes_to_read;
197 184
198 base::AutoUnlock unlock(lock_); 185 base::AutoUnlock unlock(lock_);
199 NotifyRead(bytes_to_read); 186 NotifyRead(bytes_to_read);
200 } 187 }
201 188
189 // We may have just read the last available data and thus changed the signals
190 // state.
191 watchers_.NotifyState(GetHandleSignalsStateNoLock());
192
202 return MOJO_RESULT_OK; 193 return MOJO_RESULT_OK;
203 } 194 }
204 195
205 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, 196 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
206 uint32_t* buffer_num_bytes, 197 uint32_t* buffer_num_bytes,
207 MojoReadDataFlags flags) { 198 MojoReadDataFlags flags) {
208 base::AutoLock lock(lock_); 199 base::AutoLock lock(lock_);
209 new_data_available_ = false;
210 if (!shared_ring_buffer_ || in_transit_) 200 if (!shared_ring_buffer_ || in_transit_)
211 return MOJO_RESULT_INVALID_ARGUMENT; 201 return MOJO_RESULT_INVALID_ARGUMENT;
212 202
213 if (in_two_phase_read_) 203 if (in_two_phase_read_)
214 return MOJO_RESULT_BUSY; 204 return MOJO_RESULT_BUSY;
215 205
216 // These flags may not be used in two-phase mode. 206 // These flags may not be used in two-phase mode.
217 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || 207 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
218 (flags & MOJO_READ_DATA_FLAG_QUERY) || 208 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
219 (flags & MOJO_READ_DATA_FLAG_PEEK)) 209 (flags & MOJO_READ_DATA_FLAG_PEEK))
220 return MOJO_RESULT_INVALID_ARGUMENT; 210 return MOJO_RESULT_INVALID_ARGUMENT;
221 211
212 const bool had_new_data = new_data_available_;
213 new_data_available_ = false;
214
222 if (bytes_available_ == 0) { 215 if (bytes_available_ == 0) {
216 if (had_new_data)
217 watchers_.NotifyState(GetHandleSignalsStateNoLock());
223 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION 218 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
224 : MOJO_RESULT_SHOULD_WAIT; 219 : MOJO_RESULT_SHOULD_WAIT;
225 } 220 }
226 221
227 DCHECK_LT(read_offset_, options_.capacity_num_bytes); 222 DCHECK_LT(read_offset_, options_.capacity_num_bytes);
228 uint32_t bytes_to_read = std::min(bytes_available_, 223 uint32_t bytes_to_read = std::min(bytes_available_,
229 options_.capacity_num_bytes - read_offset_); 224 options_.capacity_num_bytes - read_offset_);
230 225
231 CHECK(ring_buffer_mapping_); 226 CHECK(ring_buffer_mapping_);
232 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 227 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
233 CHECK(data); 228 CHECK(data);
234 229
235 in_two_phase_read_ = true; 230 in_two_phase_read_ = true;
236 *buffer = data + read_offset_; 231 *buffer = data + read_offset_;
237 *buffer_num_bytes = bytes_to_read; 232 *buffer_num_bytes = bytes_to_read;
238 two_phase_max_bytes_read_ = bytes_to_read; 233 two_phase_max_bytes_read_ = bytes_to_read;
239 234
235 if (had_new_data)
236 watchers_.NotifyState(GetHandleSignalsStateNoLock());
237
240 return MOJO_RESULT_OK; 238 return MOJO_RESULT_OK;
241 } 239 }
242 240
243 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { 241 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
244 base::AutoLock lock(lock_); 242 base::AutoLock lock(lock_);
245 if (!in_two_phase_read_) 243 if (!in_two_phase_read_)
246 return MOJO_RESULT_FAILED_PRECONDITION; 244 return MOJO_RESULT_FAILED_PRECONDITION;
247 245
248 if (in_transit_) 246 if (in_transit_)
249 return MOJO_RESULT_INVALID_ARGUMENT; 247 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 16 matching lines...) Expand all
266 base::AutoUnlock unlock(lock_); 264 base::AutoUnlock unlock(lock_);
267 NotifyRead(num_bytes_read); 265 NotifyRead(num_bytes_read);
268 } 266 }
269 267
270 in_two_phase_read_ = false; 268 in_two_phase_read_ = false;
271 two_phase_max_bytes_read_ = 0; 269 two_phase_max_bytes_read_ = 0;
272 270
273 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 271 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
274 if (!new_state.equals(old_state)) 272 if (!new_state.equals(old_state))
275 awakable_list_.AwakeForStateChange(new_state); 273 awakable_list_.AwakeForStateChange(new_state);
274 watchers_.NotifyState(new_state);
276 275
277 return rv; 276 return rv;
278 } 277 }
279 278
280 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { 279 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
281 base::AutoLock lock(lock_); 280 base::AutoLock lock(lock_);
282 return GetHandleSignalsStateNoLock(); 281 return GetHandleSignalsStateNoLock();
283 } 282 }
284 283
284 MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
285 const scoped_refptr<WatcherDispatcher>& watcher,
286 uintptr_t context) {
287 base::AutoLock lock(lock_);
288 if (is_closed_ || in_transit_)
289 return MOJO_RESULT_INVALID_ARGUMENT;
290 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
291 }
292
293 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
294 WatcherDispatcher* watcher,
295 uintptr_t context) {
296 base::AutoLock lock(lock_);
297 if (is_closed_ || in_transit_)
298 return MOJO_RESULT_INVALID_ARGUMENT;
299 return watchers_.Remove(watcher, context);
300 }
301
285 MojoResult DataPipeConsumerDispatcher::AddAwakable( 302 MojoResult DataPipeConsumerDispatcher::AddAwakable(
286 Awakable* awakable, 303 Awakable* awakable,
287 MojoHandleSignals signals, 304 MojoHandleSignals signals,
288 uintptr_t context, 305 uintptr_t context,
289 HandleSignalsState* signals_state) { 306 HandleSignalsState* signals_state) {
290 base::AutoLock lock(lock_); 307 base::AutoLock lock(lock_);
291 if (!shared_ring_buffer_ || in_transit_) { 308 if (!shared_ring_buffer_ || in_transit_) {
292 if (signals_state) 309 if (signals_state)
293 *signals_state = HandleSignalsState(); 310 *signals_state = HandleSignalsState();
294 return MOJO_RESULT_INVALID_ARGUMENT; 311 return MOJO_RESULT_INVALID_ARGUMENT;
(...skipping 162 matching lines...) Expand 10 before | Expand all | Expand 10 after
457 474
458 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { 475 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
459 lock_.AssertAcquired(); 476 lock_.AssertAcquired();
460 if (is_closed_ || in_transit_) 477 if (is_closed_ || in_transit_)
461 return MOJO_RESULT_INVALID_ARGUMENT; 478 return MOJO_RESULT_INVALID_ARGUMENT;
462 is_closed_ = true; 479 is_closed_ = true;
463 ring_buffer_mapping_.reset(); 480 ring_buffer_mapping_.reset();
464 shared_ring_buffer_ = nullptr; 481 shared_ring_buffer_ = nullptr;
465 482
466 awakable_list_.CancelAll(); 483 awakable_list_.CancelAll();
484 watchers_.NotifyClosed();
467 if (!transferred_) { 485 if (!transferred_) {
468 base::AutoUnlock unlock(lock_); 486 base::AutoUnlock unlock(lock_);
469 node_controller_->ClosePort(control_port_); 487 node_controller_->ClosePort(control_port_);
470 } 488 }
471 489
472 return MOJO_RESULT_OK; 490 return MOJO_RESULT_OK;
473 } 491 }
474 492
475 HandleSignalsState 493 HandleSignalsState
476 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { 494 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
574 bytes_available_ += m->num_bytes; 592 bytes_available_ += m->num_bytes;
575 } 593 }
576 } while (message); 594 } while (message);
577 } 595 }
578 596
579 bool has_new_data = bytes_available_ != previous_bytes_available; 597 bool has_new_data = bytes_available_ != previous_bytes_available;
580 if (has_new_data) 598 if (has_new_data)
581 new_data_available_ = true; 599 new_data_available_ = true;
582 600
583 if (peer_closed_ != was_peer_closed || has_new_data) { 601 if (peer_closed_ != was_peer_closed || has_new_data) {
584 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 602 HandleSignalsState state = GetHandleSignalsStateNoLock();
603 awakable_list_.AwakeForStateChange(state);
604 watchers_.NotifyState(state);
585 } 605 }
586 } 606 }
587 607
588 } // namespace edk 608 } // namespace edk
589 } // namespace mojo 609 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.h ('k') | mojo/edk/system/data_pipe_producer_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698