Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(55)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
91 return Type::DATA_PIPE_PRODUCER; 91 return Type::DATA_PIPE_PRODUCER;
92 } 92 }
93 93
94 MojoResult DataPipeProducerDispatcher::Close() { 94 MojoResult DataPipeProducerDispatcher::Close() {
95 base::AutoLock lock(lock_); 95 base::AutoLock lock(lock_);
96 DVLOG(1) << "Closing data pipe producer " << pipe_id_; 96 DVLOG(1) << "Closing data pipe producer " << pipe_id_;
97 return CloseNoLock(); 97 return CloseNoLock();
98 } 98 }
99 99
100 MojoResult DataPipeProducerDispatcher::Watch( 100 MojoResult DataPipeProducerDispatcher::RegisterWatcher(
101 MojoHandleSignals signals, 101 MojoHandleSignals signals,
102 const Watcher::WatchCallback& callback, 102 const Watcher::WatchCallback& callback,
103 uintptr_t context) { 103 uintptr_t context) {
104 base::AutoLock lock(lock_); 104 base::AutoLock lock(lock_);
105 105
106 if (is_closed_ || in_transit_) 106 if (is_closed_ || in_transit_)
107 return MOJO_RESULT_INVALID_ARGUMENT; 107 return MOJO_RESULT_INVALID_ARGUMENT;
108 108
109 return awakable_list_.AddWatcher( 109 return watchers_.Add(signals, callback, context,
110 signals, callback, context, GetHandleSignalsStateNoLock()); 110 GetHandleSignalsStateNoLock());
111 } 111 }
112 112
113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { 113 MojoResult DataPipeProducerDispatcher::ArmWatcher(uintptr_t context) {
114 base::AutoLock lock(lock_); 114 base::AutoLock lock(lock_);
115 115
116 if (is_closed_ || in_transit_) 116 if (is_closed_ || in_transit_)
117 return MOJO_RESULT_INVALID_ARGUMENT; 117 return MOJO_RESULT_INVALID_ARGUMENT;
118 118
119 return awakable_list_.RemoveWatcher(context); 119 return watchers_.Arm(context, GetHandleSignalsStateNoLock());
120 }
121
122 MojoResult DataPipeProducerDispatcher::UnregisterWatcher(uintptr_t context) {
123 base::AutoLock lock(lock_);
124
125 if (is_closed_ || in_transit_)
126 return MOJO_RESULT_INVALID_ARGUMENT;
127
128 return watchers_.Remove(context);
120 } 129 }
121 130
122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, 131 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
123 uint32_t* num_bytes, 132 uint32_t* num_bytes,
124 MojoWriteDataFlags flags) { 133 MojoWriteDataFlags flags) {
125 base::AutoLock lock(lock_); 134 base::AutoLock lock(lock_);
126 if (!shared_ring_buffer_ || in_transit_) 135 if (!shared_ring_buffer_ || in_transit_)
127 return MOJO_RESULT_INVALID_ARGUMENT; 136 return MOJO_RESULT_INVALID_ARGUMENT;
128 137
129 if (in_two_phase_write_) 138 if (in_two_phase_write_)
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
170 memcpy(data + write_offset_, source, tail_bytes_to_write); 179 memcpy(data + write_offset_, source, tail_bytes_to_write);
171 if (head_bytes_to_write > 0) 180 if (head_bytes_to_write > 0)
172 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); 181 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
173 182
174 DCHECK_LE(num_bytes_to_write, available_capacity_); 183 DCHECK_LE(num_bytes_to_write, available_capacity_);
175 available_capacity_ -= num_bytes_to_write; 184 available_capacity_ -= num_bytes_to_write;
176 write_offset_ = (write_offset_ + num_bytes_to_write) % 185 write_offset_ = (write_offset_ + num_bytes_to_write) %
177 options_.capacity_num_bytes; 186 options_.capacity_num_bytes;
178 187
179 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 188 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
180 if (!new_state.equals(old_state)) 189 if (!new_state.equals(old_state)) {
181 awakable_list_.AwakeForStateChange(new_state); 190 awakable_list_.AwakeForStateChange(new_state);
191 watchers_.NotifyState(new_state);
192 }
182 193
183 base::AutoUnlock unlock(lock_); 194 base::AutoUnlock unlock(lock_);
184 NotifyWrite(num_bytes_to_write); 195 NotifyWrite(num_bytes_to_write);
185 196
186 return MOJO_RESULT_OK; 197 return MOJO_RESULT_OK;
187 } 198 }
188 199
189 MojoResult DataPipeProducerDispatcher::BeginWriteData( 200 MojoResult DataPipeProducerDispatcher::BeginWriteData(
190 void** buffer, 201 void** buffer,
191 uint32_t* buffer_num_bytes, 202 uint32_t* buffer_num_bytes,
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
242 253
243 base::AutoUnlock unlock(lock_); 254 base::AutoUnlock unlock(lock_);
244 NotifyWrite(num_bytes_written); 255 NotifyWrite(num_bytes_written);
245 } 256 }
246 257
247 in_two_phase_write_ = false; 258 in_two_phase_write_ = false;
248 259
249 // If we're now writable, we *became* writable (since we weren't writable 260 // If we're now writable, we *became* writable (since we weren't writable
250 // during the two-phase write), so awake producer awakables. 261 // during the two-phase write), so awake producer awakables.
251 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); 262 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
252 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 263 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) {
253 awakable_list_.AwakeForStateChange(new_state); 264 awakable_list_.AwakeForStateChange(new_state);
265 watchers_.NotifyState(new_state);
266 }
254 267
255 return rv; 268 return rv;
256 } 269 }
257 270
258 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { 271 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
259 base::AutoLock lock(lock_); 272 base::AutoLock lock(lock_);
260 return GetHandleSignalsStateNoLock(); 273 return GetHandleSignalsStateNoLock();
261 } 274 }
262 275
263 MojoResult DataPipeProducerDispatcher::AddAwakable( 276 MojoResult DataPipeProducerDispatcher::AddAwakable(
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
349 in_transit_ = false; 362 in_transit_ = false;
350 ignore_result(buffer_handle_for_transit_.release()); 363 ignore_result(buffer_handle_for_transit_.release());
351 CloseNoLock(); 364 CloseNoLock();
352 } 365 }
353 366
354 void DataPipeProducerDispatcher::CancelTransit() { 367 void DataPipeProducerDispatcher::CancelTransit() {
355 base::AutoLock lock(lock_); 368 base::AutoLock lock(lock_);
356 DCHECK(in_transit_); 369 DCHECK(in_transit_);
357 in_transit_ = false; 370 in_transit_ = false;
358 buffer_handle_for_transit_.reset(); 371 buffer_handle_for_transit_.reset();
359 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 372
373 HandleSignalsState state = GetHandleSignalsStateNoLock();
374 awakable_list_.AwakeForStateChange(state);
375 watchers_.NotifyState(state);
360 } 376 }
361 377
362 // static 378 // static
363 scoped_refptr<DataPipeProducerDispatcher> 379 scoped_refptr<DataPipeProducerDispatcher>
364 DataPipeProducerDispatcher::Deserialize(const void* data, 380 DataPipeProducerDispatcher::Deserialize(const void* data,
365 size_t num_bytes, 381 size_t num_bytes,
366 const ports::PortName* ports, 382 const ports::PortName* ports,
367 size_t num_ports, 383 size_t num_ports,
368 PlatformHandle* handles, 384 PlatformHandle* handles,
369 size_t num_handles) { 385 size_t num_handles) {
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
433 449
434 MojoResult DataPipeProducerDispatcher::CloseNoLock() { 450 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
435 lock_.AssertAcquired(); 451 lock_.AssertAcquired();
436 if (is_closed_ || in_transit_) 452 if (is_closed_ || in_transit_)
437 return MOJO_RESULT_INVALID_ARGUMENT; 453 return MOJO_RESULT_INVALID_ARGUMENT;
438 is_closed_ = true; 454 is_closed_ = true;
439 ring_buffer_mapping_.reset(); 455 ring_buffer_mapping_.reset();
440 shared_ring_buffer_ = nullptr; 456 shared_ring_buffer_ = nullptr;
441 457
442 awakable_list_.CancelAll(); 458 awakable_list_.CancelAll();
459 watchers_.NotifyClosed();
443 if (!transferred_) { 460 if (!transferred_) {
444 base::AutoUnlock unlock(lock_); 461 base::AutoUnlock unlock(lock_);
445 node_controller_->ClosePort(control_port_); 462 node_controller_->ClosePort(control_port_);
446 } 463 }
447 464
448 return MOJO_RESULT_OK; 465 return MOJO_RESULT_OK;
449 } 466 }
450 467
451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() 468 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
452 const { 469 const {
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
534 << m->num_bytes << " bytes were read. [control_port=" 551 << m->num_bytes << " bytes were read. [control_port="
535 << control_port_.name() << "]"; 552 << control_port_.name() << "]";
536 553
537 available_capacity_ += m->num_bytes; 554 available_capacity_ += m->num_bytes;
538 } 555 }
539 } while (message); 556 } while (message);
540 } 557 }
541 558
542 if (peer_closed_ != was_peer_closed || 559 if (peer_closed_ != was_peer_closed ||
543 available_capacity_ != previous_capacity) { 560 available_capacity_ != previous_capacity) {
544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 561 HandleSignalsState state = GetHandleSignalsStateNoLock();
562 awakable_list_.AwakeForStateChange(state);
563 watchers_.NotifyState(state);
545 } 564 }
546 } 565 }
547 566
548 } // namespace edk 567 } // namespace edk
549 } // namespace mojo 568 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698