Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 2744943002: Mojo: Move waiting APIs to public library (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm> 10 #include <algorithm>
(...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after
241 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { 241 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
242 base::AutoLock lock(lock_); 242 base::AutoLock lock(lock_);
243 if (!in_two_phase_read_) 243 if (!in_two_phase_read_)
244 return MOJO_RESULT_FAILED_PRECONDITION; 244 return MOJO_RESULT_FAILED_PRECONDITION;
245 245
246 if (in_transit_) 246 if (in_transit_)
247 return MOJO_RESULT_INVALID_ARGUMENT; 247 return MOJO_RESULT_INVALID_ARGUMENT;
248 248
249 CHECK(shared_ring_buffer_); 249 CHECK(shared_ring_buffer_);
250 250
251 HandleSignalsState old_state = GetHandleSignalsStateNoLock();
252 MojoResult rv; 251 MojoResult rv;
253 if (num_bytes_read > two_phase_max_bytes_read_ || 252 if (num_bytes_read > two_phase_max_bytes_read_ ||
254 num_bytes_read % options_.element_num_bytes != 0) { 253 num_bytes_read % options_.element_num_bytes != 0) {
255 rv = MOJO_RESULT_INVALID_ARGUMENT; 254 rv = MOJO_RESULT_INVALID_ARGUMENT;
256 } else { 255 } else {
257 rv = MOJO_RESULT_OK; 256 rv = MOJO_RESULT_OK;
258 read_offset_ = 257 read_offset_ =
259 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; 258 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
260 259
261 DCHECK_GE(bytes_available_, num_bytes_read); 260 DCHECK_GE(bytes_available_, num_bytes_read);
262 bytes_available_ -= num_bytes_read; 261 bytes_available_ -= num_bytes_read;
263 262
264 base::AutoUnlock unlock(lock_); 263 base::AutoUnlock unlock(lock_);
265 NotifyRead(num_bytes_read); 264 NotifyRead(num_bytes_read);
266 } 265 }
267 266
268 in_two_phase_read_ = false; 267 in_two_phase_read_ = false;
269 two_phase_max_bytes_read_ = 0; 268 two_phase_max_bytes_read_ = 0;
270 269
271 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 270 watchers_.NotifyState(GetHandleSignalsStateNoLock());
272 if (!new_state.equals(old_state))
273 awakable_list_.AwakeForStateChange(new_state);
274 watchers_.NotifyState(new_state);
275 271
276 return rv; 272 return rv;
277 } 273 }
278 274
279 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { 275 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
280 base::AutoLock lock(lock_); 276 base::AutoLock lock(lock_);
281 return GetHandleSignalsStateNoLock(); 277 return GetHandleSignalsStateNoLock();
282 } 278 }
283 279
284 MojoResult DataPipeConsumerDispatcher::AddWatcherRef( 280 MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
285 const scoped_refptr<WatcherDispatcher>& watcher, 281 const scoped_refptr<WatcherDispatcher>& watcher,
286 uintptr_t context) { 282 uintptr_t context) {
287 base::AutoLock lock(lock_); 283 base::AutoLock lock(lock_);
288 if (is_closed_ || in_transit_) 284 if (is_closed_ || in_transit_)
289 return MOJO_RESULT_INVALID_ARGUMENT; 285 return MOJO_RESULT_INVALID_ARGUMENT;
290 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); 286 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
291 } 287 }
292 288
293 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( 289 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
294 WatcherDispatcher* watcher, 290 WatcherDispatcher* watcher,
295 uintptr_t context) { 291 uintptr_t context) {
296 base::AutoLock lock(lock_); 292 base::AutoLock lock(lock_);
297 if (is_closed_ || in_transit_) 293 if (is_closed_ || in_transit_)
298 return MOJO_RESULT_INVALID_ARGUMENT; 294 return MOJO_RESULT_INVALID_ARGUMENT;
299 return watchers_.Remove(watcher, context); 295 return watchers_.Remove(watcher, context);
300 } 296 }
301 297
302 MojoResult DataPipeConsumerDispatcher::AddAwakable(
303 Awakable* awakable,
304 MojoHandleSignals signals,
305 uintptr_t context,
306 HandleSignalsState* signals_state) {
307 base::AutoLock lock(lock_);
308 if (!shared_ring_buffer_ || in_transit_) {
309 if (signals_state)
310 *signals_state = HandleSignalsState();
311 return MOJO_RESULT_INVALID_ARGUMENT;
312 }
313 UpdateSignalsStateNoLock();
314 HandleSignalsState state = GetHandleSignalsStateNoLock();
315 if (state.satisfies(signals)) {
316 if (signals_state)
317 *signals_state = state;
318 return MOJO_RESULT_ALREADY_EXISTS;
319 }
320 if (!state.can_satisfy(signals)) {
321 if (signals_state)
322 *signals_state = state;
323 return MOJO_RESULT_FAILED_PRECONDITION;
324 }
325
326 awakable_list_.Add(awakable, signals, context);
327 return MOJO_RESULT_OK;
328 }
329
330 void DataPipeConsumerDispatcher::RemoveAwakable(
331 Awakable* awakable,
332 HandleSignalsState* signals_state) {
333 base::AutoLock lock(lock_);
334 if ((!shared_ring_buffer_ || in_transit_) && signals_state)
335 *signals_state = HandleSignalsState();
336 else if (signals_state)
337 *signals_state = GetHandleSignalsStateNoLock();
338 awakable_list_.Remove(awakable);
339 }
340
341 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, 298 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
342 uint32_t* num_ports, 299 uint32_t* num_ports,
343 uint32_t* num_handles) { 300 uint32_t* num_handles) {
344 base::AutoLock lock(lock_); 301 base::AutoLock lock(lock_);
345 DCHECK(in_transit_); 302 DCHECK(in_transit_);
346 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); 303 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
347 *num_ports = 1; 304 *num_ports = 1;
348 *num_handles = 1; 305 *num_handles = 1;
349 } 306 }
350 307
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
473 } 430 }
474 431
475 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { 432 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
476 lock_.AssertAcquired(); 433 lock_.AssertAcquired();
477 if (is_closed_ || in_transit_) 434 if (is_closed_ || in_transit_)
478 return MOJO_RESULT_INVALID_ARGUMENT; 435 return MOJO_RESULT_INVALID_ARGUMENT;
479 is_closed_ = true; 436 is_closed_ = true;
480 ring_buffer_mapping_.reset(); 437 ring_buffer_mapping_.reset();
481 shared_ring_buffer_ = nullptr; 438 shared_ring_buffer_ = nullptr;
482 439
483 awakable_list_.CancelAll();
484 watchers_.NotifyClosed(); 440 watchers_.NotifyClosed();
485 if (!transferred_) { 441 if (!transferred_) {
486 base::AutoUnlock unlock(lock_); 442 base::AutoUnlock unlock(lock_);
487 node_controller_->ClosePort(control_port_); 443 node_controller_->ClosePort(control_port_);
488 } 444 }
489 445
490 return MOJO_RESULT_OK; 446 return MOJO_RESULT_OK;
491 } 447 }
492 448
493 HandleSignalsState 449 HandleSignalsState
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
591 547
592 bytes_available_ += m->num_bytes; 548 bytes_available_ += m->num_bytes;
593 } 549 }
594 } while (message); 550 } while (message);
595 } 551 }
596 552
597 bool has_new_data = bytes_available_ != previous_bytes_available; 553 bool has_new_data = bytes_available_ != previous_bytes_available;
598 if (has_new_data) 554 if (has_new_data)
599 new_data_available_ = true; 555 new_data_available_ = true;
600 556
601 if (peer_closed_ != was_peer_closed || has_new_data) { 557 if (peer_closed_ != was_peer_closed || has_new_data)
602 HandleSignalsState state = GetHandleSignalsStateNoLock(); 558 watchers_.NotifyState(GetHandleSignalsStateNoLock());
603 awakable_list_.AwakeForStateChange(state);
604 watchers_.NotifyState(state);
605 }
606 } 559 }
607 560
608 } // namespace edk 561 } // namespace edk
609 } // namespace mojo 562 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.h ('k') | mojo/edk/system/data_pipe_producer_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698