OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
121 // Don't return "should wait" since you can't wait for a specified amount of | 121 // Don't return "should wait" since you can't wait for a specified amount of |
122 // data. | 122 // data. |
123 return MOJO_RESULT_OUT_OF_RANGE; | 123 return MOJO_RESULT_OUT_OF_RANGE; |
124 } | 124 } |
125 | 125 |
126 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); | 126 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); |
127 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); | 127 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); |
128 if (num_bytes_to_write == 0) | 128 if (num_bytes_to_write == 0) |
129 return MOJO_RESULT_SHOULD_WAIT; | 129 return MOJO_RESULT_SHOULD_WAIT; |
130 | 130 |
131 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); | |
132 | |
133 *num_bytes = num_bytes_to_write; | 131 *num_bytes = num_bytes_to_write; |
134 | 132 |
135 CHECK(ring_buffer_mapping_); | 133 CHECK(ring_buffer_mapping_); |
136 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); | 134 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
137 CHECK(data); | 135 CHECK(data); |
138 | 136 |
139 const uint8_t* source = static_cast<const uint8_t*>(elements); | 137 const uint8_t* source = static_cast<const uint8_t*>(elements); |
140 CHECK(source); | 138 CHECK(source); |
141 | 139 |
142 DCHECK_LE(write_offset_, options_.capacity_num_bytes); | 140 DCHECK_LE(write_offset_, options_.capacity_num_bytes); |
143 uint32_t tail_bytes_to_write = | 141 uint32_t tail_bytes_to_write = |
144 std::min(options_.capacity_num_bytes - write_offset_, | 142 std::min(options_.capacity_num_bytes - write_offset_, |
145 num_bytes_to_write); | 143 num_bytes_to_write); |
146 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; | 144 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; |
147 | 145 |
148 DCHECK_GT(tail_bytes_to_write, 0u); | 146 DCHECK_GT(tail_bytes_to_write, 0u); |
149 memcpy(data + write_offset_, source, tail_bytes_to_write); | 147 memcpy(data + write_offset_, source, tail_bytes_to_write); |
150 if (head_bytes_to_write > 0) | 148 if (head_bytes_to_write > 0) |
151 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); | 149 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
152 | 150 |
153 DCHECK_LE(num_bytes_to_write, available_capacity_); | 151 DCHECK_LE(num_bytes_to_write, available_capacity_); |
154 available_capacity_ -= num_bytes_to_write; | 152 available_capacity_ -= num_bytes_to_write; |
155 write_offset_ = (write_offset_ + num_bytes_to_write) % | 153 write_offset_ = (write_offset_ + num_bytes_to_write) % |
156 options_.capacity_num_bytes; | 154 options_.capacity_num_bytes; |
157 | 155 |
158 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 156 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
159 if (!new_state.equals(old_state)) | |
160 awakable_list_.AwakeForStateChange(new_state); | |
161 watchers_.NotifyState(new_state); | |
162 | 157 |
163 base::AutoUnlock unlock(lock_); | 158 base::AutoUnlock unlock(lock_); |
164 NotifyWrite(num_bytes_to_write); | 159 NotifyWrite(num_bytes_to_write); |
165 | 160 |
166 return MOJO_RESULT_OK; | 161 return MOJO_RESULT_OK; |
167 } | 162 } |
168 | 163 |
169 MojoResult DataPipeProducerDispatcher::BeginWriteData( | 164 MojoResult DataPipeProducerDispatcher::BeginWriteData( |
170 void** buffer, | 165 void** buffer, |
171 uint32_t* buffer_num_bytes, | 166 uint32_t* buffer_num_bytes, |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
225 write_offset_ = (write_offset_ + num_bytes_written) % | 220 write_offset_ = (write_offset_ + num_bytes_written) % |
226 options_.capacity_num_bytes; | 221 options_.capacity_num_bytes; |
227 | 222 |
228 base::AutoUnlock unlock(lock_); | 223 base::AutoUnlock unlock(lock_); |
229 NotifyWrite(num_bytes_written); | 224 NotifyWrite(num_bytes_written); |
230 } | 225 } |
231 | 226 |
232 in_two_phase_write_ = false; | 227 in_two_phase_write_ = false; |
233 | 228 |
234 // If we're now writable, we *became* writable (since we weren't writable | 229 // If we're now writable, we *became* writable (since we weren't writable |
235 // during the two-phase write), so awake producer awakables. | 230 // during the two-phase write), so notify watchers. |
236 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); | 231 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
237 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | |
238 awakable_list_.AwakeForStateChange(new_state); | |
239 watchers_.NotifyState(new_state); | |
240 | 232 |
241 return rv; | 233 return rv; |
242 } | 234 } |
243 | 235 |
244 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { | 236 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
245 base::AutoLock lock(lock_); | 237 base::AutoLock lock(lock_); |
246 return GetHandleSignalsStateNoLock(); | 238 return GetHandleSignalsStateNoLock(); |
247 } | 239 } |
248 | 240 |
249 MojoResult DataPipeProducerDispatcher::AddWatcherRef( | 241 MojoResult DataPipeProducerDispatcher::AddWatcherRef( |
250 const scoped_refptr<WatcherDispatcher>& watcher, | 242 const scoped_refptr<WatcherDispatcher>& watcher, |
251 uintptr_t context) { | 243 uintptr_t context) { |
252 base::AutoLock lock(lock_); | 244 base::AutoLock lock(lock_); |
253 if (is_closed_ || in_transit_) | 245 if (is_closed_ || in_transit_) |
254 return MOJO_RESULT_INVALID_ARGUMENT; | 246 return MOJO_RESULT_INVALID_ARGUMENT; |
255 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | 247 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
256 } | 248 } |
257 | 249 |
258 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( | 250 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( |
259 WatcherDispatcher* watcher, | 251 WatcherDispatcher* watcher, |
260 uintptr_t context) { | 252 uintptr_t context) { |
261 base::AutoLock lock(lock_); | 253 base::AutoLock lock(lock_); |
262 if (is_closed_ || in_transit_) | 254 if (is_closed_ || in_transit_) |
263 return MOJO_RESULT_INVALID_ARGUMENT; | 255 return MOJO_RESULT_INVALID_ARGUMENT; |
264 return watchers_.Remove(watcher, context); | 256 return watchers_.Remove(watcher, context); |
265 } | 257 } |
266 | 258 |
267 MojoResult DataPipeProducerDispatcher::AddAwakable( | |
268 Awakable* awakable, | |
269 MojoHandleSignals signals, | |
270 uintptr_t context, | |
271 HandleSignalsState* signals_state) { | |
272 base::AutoLock lock(lock_); | |
273 if (!shared_ring_buffer_ || in_transit_) { | |
274 if (signals_state) | |
275 *signals_state = HandleSignalsState(); | |
276 return MOJO_RESULT_INVALID_ARGUMENT; | |
277 } | |
278 UpdateSignalsStateNoLock(); | |
279 HandleSignalsState state = GetHandleSignalsStateNoLock(); | |
280 if (state.satisfies(signals)) { | |
281 if (signals_state) | |
282 *signals_state = state; | |
283 return MOJO_RESULT_ALREADY_EXISTS; | |
284 } | |
285 if (!state.can_satisfy(signals)) { | |
286 if (signals_state) | |
287 *signals_state = state; | |
288 return MOJO_RESULT_FAILED_PRECONDITION; | |
289 } | |
290 | |
291 awakable_list_.Add(awakable, signals, context); | |
292 return MOJO_RESULT_OK; | |
293 } | |
294 | |
295 void DataPipeProducerDispatcher::RemoveAwakable( | |
296 Awakable* awakable, | |
297 HandleSignalsState* signals_state) { | |
298 base::AutoLock lock(lock_); | |
299 if ((!shared_ring_buffer_ || in_transit_) && signals_state) | |
300 *signals_state = HandleSignalsState(); | |
301 else if (signals_state) | |
302 *signals_state = GetHandleSignalsStateNoLock(); | |
303 awakable_list_.Remove(awakable); | |
304 } | |
305 | |
306 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, | 259 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, |
307 uint32_t* num_ports, | 260 uint32_t* num_ports, |
308 uint32_t* num_handles) { | 261 uint32_t* num_handles) { |
309 base::AutoLock lock(lock_); | 262 base::AutoLock lock(lock_); |
310 DCHECK(in_transit_); | 263 DCHECK(in_transit_); |
311 *num_bytes = sizeof(SerializedState); | 264 *num_bytes = sizeof(SerializedState); |
312 *num_ports = 1; | 265 *num_ports = 1; |
313 *num_handles = 1; | 266 *num_handles = 1; |
314 } | 267 } |
315 | 268 |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
355 CloseNoLock(); | 308 CloseNoLock(); |
356 } | 309 } |
357 | 310 |
358 void DataPipeProducerDispatcher::CancelTransit() { | 311 void DataPipeProducerDispatcher::CancelTransit() { |
359 base::AutoLock lock(lock_); | 312 base::AutoLock lock(lock_); |
360 DCHECK(in_transit_); | 313 DCHECK(in_transit_); |
361 in_transit_ = false; | 314 in_transit_ = false; |
362 buffer_handle_for_transit_.reset(); | 315 buffer_handle_for_transit_.reset(); |
363 | 316 |
364 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 317 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
365 awakable_list_.AwakeForStateChange(state); | |
366 watchers_.NotifyState(state); | 318 watchers_.NotifyState(state); |
367 } | 319 } |
368 | 320 |
369 // static | 321 // static |
370 scoped_refptr<DataPipeProducerDispatcher> | 322 scoped_refptr<DataPipeProducerDispatcher> |
371 DataPipeProducerDispatcher::Deserialize(const void* data, | 323 DataPipeProducerDispatcher::Deserialize(const void* data, |
372 size_t num_bytes, | 324 size_t num_bytes, |
373 const ports::PortName* ports, | 325 const ports::PortName* ports, |
374 size_t num_ports, | 326 size_t num_ports, |
375 PlatformHandle* handles, | 327 PlatformHandle* handles, |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
439 } | 391 } |
440 | 392 |
441 MojoResult DataPipeProducerDispatcher::CloseNoLock() { | 393 MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
442 lock_.AssertAcquired(); | 394 lock_.AssertAcquired(); |
443 if (is_closed_ || in_transit_) | 395 if (is_closed_ || in_transit_) |
444 return MOJO_RESULT_INVALID_ARGUMENT; | 396 return MOJO_RESULT_INVALID_ARGUMENT; |
445 is_closed_ = true; | 397 is_closed_ = true; |
446 ring_buffer_mapping_.reset(); | 398 ring_buffer_mapping_.reset(); |
447 shared_ring_buffer_ = nullptr; | 399 shared_ring_buffer_ = nullptr; |
448 | 400 |
449 awakable_list_.CancelAll(); | |
450 watchers_.NotifyClosed(); | 401 watchers_.NotifyClosed(); |
451 if (!transferred_) { | 402 if (!transferred_) { |
452 base::AutoUnlock unlock(lock_); | 403 base::AutoUnlock unlock(lock_); |
453 node_controller_->ClosePort(control_port_); | 404 node_controller_->ClosePort(control_port_); |
454 } | 405 } |
455 | 406 |
456 return MOJO_RESULT_OK; | 407 return MOJO_RESULT_OK; |
457 } | 408 } |
458 | 409 |
459 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() | 410 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
541 << m->num_bytes << " bytes were read. [control_port=" | 492 << m->num_bytes << " bytes were read. [control_port=" |
542 << control_port_.name() << "]"; | 493 << control_port_.name() << "]"; |
543 | 494 |
544 available_capacity_ += m->num_bytes; | 495 available_capacity_ += m->num_bytes; |
545 } | 496 } |
546 } while (message); | 497 } while (message); |
547 } | 498 } |
548 | 499 |
549 if (peer_closed_ != was_peer_closed || | 500 if (peer_closed_ != was_peer_closed || |
550 available_capacity_ != previous_capacity) { | 501 available_capacity_ != previous_capacity) { |
551 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 502 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
552 awakable_list_.AwakeForStateChange(state); | |
553 watchers_.NotifyState(state); | |
554 } | 503 } |
555 } | 504 } |
556 | 505 |
557 } // namespace edk | 506 } // namespace edk |
558 } // namespace mojo | 507 } // namespace mojo |
OLD | NEW |