Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(182)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 2744943002: Mojo: Move waiting APIs to public library (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
121 // Don't return "should wait" since you can't wait for a specified amount of 121 // Don't return "should wait" since you can't wait for a specified amount of
122 // data. 122 // data.
123 return MOJO_RESULT_OUT_OF_RANGE; 123 return MOJO_RESULT_OUT_OF_RANGE;
124 } 124 }
125 125
126 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); 126 DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
127 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); 127 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
128 if (num_bytes_to_write == 0) 128 if (num_bytes_to_write == 0)
129 return MOJO_RESULT_SHOULD_WAIT; 129 return MOJO_RESULT_SHOULD_WAIT;
130 130
131 HandleSignalsState old_state = GetHandleSignalsStateNoLock();
132
133 *num_bytes = num_bytes_to_write; 131 *num_bytes = num_bytes_to_write;
134 132
135 CHECK(ring_buffer_mapping_); 133 CHECK(ring_buffer_mapping_);
136 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); 134 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
137 CHECK(data); 135 CHECK(data);
138 136
139 const uint8_t* source = static_cast<const uint8_t*>(elements); 137 const uint8_t* source = static_cast<const uint8_t*>(elements);
140 CHECK(source); 138 CHECK(source);
141 139
142 DCHECK_LE(write_offset_, options_.capacity_num_bytes); 140 DCHECK_LE(write_offset_, options_.capacity_num_bytes);
143 uint32_t tail_bytes_to_write = 141 uint32_t tail_bytes_to_write =
144 std::min(options_.capacity_num_bytes - write_offset_, 142 std::min(options_.capacity_num_bytes - write_offset_,
145 num_bytes_to_write); 143 num_bytes_to_write);
146 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; 144 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
147 145
148 DCHECK_GT(tail_bytes_to_write, 0u); 146 DCHECK_GT(tail_bytes_to_write, 0u);
149 memcpy(data + write_offset_, source, tail_bytes_to_write); 147 memcpy(data + write_offset_, source, tail_bytes_to_write);
150 if (head_bytes_to_write > 0) 148 if (head_bytes_to_write > 0)
151 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); 149 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
152 150
153 DCHECK_LE(num_bytes_to_write, available_capacity_); 151 DCHECK_LE(num_bytes_to_write, available_capacity_);
154 available_capacity_ -= num_bytes_to_write; 152 available_capacity_ -= num_bytes_to_write;
155 write_offset_ = (write_offset_ + num_bytes_to_write) % 153 write_offset_ = (write_offset_ + num_bytes_to_write) %
156 options_.capacity_num_bytes; 154 options_.capacity_num_bytes;
157 155
158 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 156 watchers_.NotifyState(GetHandleSignalsStateNoLock());
159 if (!new_state.equals(old_state))
160 awakable_list_.AwakeForStateChange(new_state);
161 watchers_.NotifyState(new_state);
162 157
163 base::AutoUnlock unlock(lock_); 158 base::AutoUnlock unlock(lock_);
164 NotifyWrite(num_bytes_to_write); 159 NotifyWrite(num_bytes_to_write);
165 160
166 return MOJO_RESULT_OK; 161 return MOJO_RESULT_OK;
167 } 162 }
168 163
169 MojoResult DataPipeProducerDispatcher::BeginWriteData( 164 MojoResult DataPipeProducerDispatcher::BeginWriteData(
170 void** buffer, 165 void** buffer,
171 uint32_t* buffer_num_bytes, 166 uint32_t* buffer_num_bytes,
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
225 write_offset_ = (write_offset_ + num_bytes_written) % 220 write_offset_ = (write_offset_ + num_bytes_written) %
226 options_.capacity_num_bytes; 221 options_.capacity_num_bytes;
227 222
228 base::AutoUnlock unlock(lock_); 223 base::AutoUnlock unlock(lock_);
229 NotifyWrite(num_bytes_written); 224 NotifyWrite(num_bytes_written);
230 } 225 }
231 226
232 in_two_phase_write_ = false; 227 in_two_phase_write_ = false;
233 228
234 // If we're now writable, we *became* writable (since we weren't writable 229 // If we're now writable, we *became* writable (since we weren't writable
235 // during the two-phase write), so awake producer awakables. 230 // during the two-phase write), so notify watchers.
236 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 231 watchers_.NotifyState(GetHandleSignalsStateNoLock());
237 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
238 awakable_list_.AwakeForStateChange(new_state);
239 watchers_.NotifyState(new_state);
240 232
241 return rv; 233 return rv;
242 } 234 }
243 235
244 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { 236 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
245 base::AutoLock lock(lock_); 237 base::AutoLock lock(lock_);
246 return GetHandleSignalsStateNoLock(); 238 return GetHandleSignalsStateNoLock();
247 } 239 }
248 240
249 MojoResult DataPipeProducerDispatcher::AddWatcherRef( 241 MojoResult DataPipeProducerDispatcher::AddWatcherRef(
250 const scoped_refptr<WatcherDispatcher>& watcher, 242 const scoped_refptr<WatcherDispatcher>& watcher,
251 uintptr_t context) { 243 uintptr_t context) {
252 base::AutoLock lock(lock_); 244 base::AutoLock lock(lock_);
253 if (is_closed_ || in_transit_) 245 if (is_closed_ || in_transit_)
254 return MOJO_RESULT_INVALID_ARGUMENT; 246 return MOJO_RESULT_INVALID_ARGUMENT;
255 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); 247 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
256 } 248 }
257 249
258 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( 250 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
259 WatcherDispatcher* watcher, 251 WatcherDispatcher* watcher,
260 uintptr_t context) { 252 uintptr_t context) {
261 base::AutoLock lock(lock_); 253 base::AutoLock lock(lock_);
262 if (is_closed_ || in_transit_) 254 if (is_closed_ || in_transit_)
263 return MOJO_RESULT_INVALID_ARGUMENT; 255 return MOJO_RESULT_INVALID_ARGUMENT;
264 return watchers_.Remove(watcher, context); 256 return watchers_.Remove(watcher, context);
265 } 257 }
266 258
267 MojoResult DataPipeProducerDispatcher::AddAwakable(
268 Awakable* awakable,
269 MojoHandleSignals signals,
270 uintptr_t context,
271 HandleSignalsState* signals_state) {
272 base::AutoLock lock(lock_);
273 if (!shared_ring_buffer_ || in_transit_) {
274 if (signals_state)
275 *signals_state = HandleSignalsState();
276 return MOJO_RESULT_INVALID_ARGUMENT;
277 }
278 UpdateSignalsStateNoLock();
279 HandleSignalsState state = GetHandleSignalsStateNoLock();
280 if (state.satisfies(signals)) {
281 if (signals_state)
282 *signals_state = state;
283 return MOJO_RESULT_ALREADY_EXISTS;
284 }
285 if (!state.can_satisfy(signals)) {
286 if (signals_state)
287 *signals_state = state;
288 return MOJO_RESULT_FAILED_PRECONDITION;
289 }
290
291 awakable_list_.Add(awakable, signals, context);
292 return MOJO_RESULT_OK;
293 }
294
295 void DataPipeProducerDispatcher::RemoveAwakable(
296 Awakable* awakable,
297 HandleSignalsState* signals_state) {
298 base::AutoLock lock(lock_);
299 if ((!shared_ring_buffer_ || in_transit_) && signals_state)
300 *signals_state = HandleSignalsState();
301 else if (signals_state)
302 *signals_state = GetHandleSignalsStateNoLock();
303 awakable_list_.Remove(awakable);
304 }
305
306 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, 259 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
307 uint32_t* num_ports, 260 uint32_t* num_ports,
308 uint32_t* num_handles) { 261 uint32_t* num_handles) {
309 base::AutoLock lock(lock_); 262 base::AutoLock lock(lock_);
310 DCHECK(in_transit_); 263 DCHECK(in_transit_);
311 *num_bytes = sizeof(SerializedState); 264 *num_bytes = sizeof(SerializedState);
312 *num_ports = 1; 265 *num_ports = 1;
313 *num_handles = 1; 266 *num_handles = 1;
314 } 267 }
315 268
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
355 CloseNoLock(); 308 CloseNoLock();
356 } 309 }
357 310
358 void DataPipeProducerDispatcher::CancelTransit() { 311 void DataPipeProducerDispatcher::CancelTransit() {
359 base::AutoLock lock(lock_); 312 base::AutoLock lock(lock_);
360 DCHECK(in_transit_); 313 DCHECK(in_transit_);
361 in_transit_ = false; 314 in_transit_ = false;
362 buffer_handle_for_transit_.reset(); 315 buffer_handle_for_transit_.reset();
363 316
364 HandleSignalsState state = GetHandleSignalsStateNoLock(); 317 HandleSignalsState state = GetHandleSignalsStateNoLock();
365 awakable_list_.AwakeForStateChange(state);
366 watchers_.NotifyState(state); 318 watchers_.NotifyState(state);
367 } 319 }
368 320
369 // static 321 // static
370 scoped_refptr<DataPipeProducerDispatcher> 322 scoped_refptr<DataPipeProducerDispatcher>
371 DataPipeProducerDispatcher::Deserialize(const void* data, 323 DataPipeProducerDispatcher::Deserialize(const void* data,
372 size_t num_bytes, 324 size_t num_bytes,
373 const ports::PortName* ports, 325 const ports::PortName* ports,
374 size_t num_ports, 326 size_t num_ports,
375 PlatformHandle* handles, 327 PlatformHandle* handles,
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
439 } 391 }
440 392
441 MojoResult DataPipeProducerDispatcher::CloseNoLock() { 393 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
442 lock_.AssertAcquired(); 394 lock_.AssertAcquired();
443 if (is_closed_ || in_transit_) 395 if (is_closed_ || in_transit_)
444 return MOJO_RESULT_INVALID_ARGUMENT; 396 return MOJO_RESULT_INVALID_ARGUMENT;
445 is_closed_ = true; 397 is_closed_ = true;
446 ring_buffer_mapping_.reset(); 398 ring_buffer_mapping_.reset();
447 shared_ring_buffer_ = nullptr; 399 shared_ring_buffer_ = nullptr;
448 400
449 awakable_list_.CancelAll();
450 watchers_.NotifyClosed(); 401 watchers_.NotifyClosed();
451 if (!transferred_) { 402 if (!transferred_) {
452 base::AutoUnlock unlock(lock_); 403 base::AutoUnlock unlock(lock_);
453 node_controller_->ClosePort(control_port_); 404 node_controller_->ClosePort(control_port_);
454 } 405 }
455 406
456 return MOJO_RESULT_OK; 407 return MOJO_RESULT_OK;
457 } 408 }
458 409
459 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() 410 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
541 << m->num_bytes << " bytes were read. [control_port=" 492 << m->num_bytes << " bytes were read. [control_port="
542 << control_port_.name() << "]"; 493 << control_port_.name() << "]";
543 494
544 available_capacity_ += m->num_bytes; 495 available_capacity_ += m->num_bytes;
545 } 496 }
546 } while (message); 497 } while (message);
547 } 498 }
548 499
549 if (peer_closed_ != was_peer_closed || 500 if (peer_closed_ != was_peer_closed ||
550 available_capacity_ != previous_capacity) { 501 available_capacity_ != previous_capacity) {
551 HandleSignalsState state = GetHandleSignalsStateNoLock(); 502 watchers_.NotifyState(GetHandleSignalsStateNoLock());
552 awakable_list_.AwakeForStateChange(state);
553 watchers_.NotifyState(state);
554 } 503 }
555 } 504 }
556 505
557 } // namespace edk 506 } // namespace edk
558 } // namespace mojo 507 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698