OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe.h" | 5 #include "mojo/edk/system/data_pipe.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <limits> | 10 #include <limits> |
11 | 11 |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "mojo/edk/system/awakable_list.h" | 13 #include "mojo/edk/system/awakable_list.h" |
14 #include "mojo/edk/system/configuration.h" | 14 #include "mojo/edk/system/configuration.h" |
| 15 #include "mojo/edk/system/data_pipe_impl.h" |
| 16 #include "mojo/edk/system/local_data_pipe_impl.h" |
15 #include "mojo/edk/system/memory.h" | 17 #include "mojo/edk/system/memory.h" |
16 #include "mojo/edk/system/options_validation.h" | 18 #include "mojo/edk/system/options_validation.h" |
17 | 19 |
18 namespace mojo { | 20 namespace mojo { |
19 namespace system { | 21 namespace system { |
20 | 22 |
21 // static | 23 // static |
22 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { | 24 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
23 MojoCreateDataPipeOptions result = { | 25 MojoCreateDataPipeOptions result = { |
24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), | 26 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
76 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) | 78 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) |
77 return MOJO_RESULT_INVALID_ARGUMENT; | 79 return MOJO_RESULT_INVALID_ARGUMENT; |
78 if (reader.options().capacity_num_bytes > | 80 if (reader.options().capacity_num_bytes > |
79 GetConfiguration().max_data_pipe_capacity_bytes) | 81 GetConfiguration().max_data_pipe_capacity_bytes) |
80 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 82 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
81 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; | 83 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
82 | 84 |
83 return MOJO_RESULT_OK; | 85 return MOJO_RESULT_OK; |
84 } | 86 } |
85 | 87 |
| 88 // static |
| 89 DataPipe* DataPipe::CreateLocal( |
| 90 const MojoCreateDataPipeOptions& validated_options) { |
| 91 return new DataPipe(true, true, validated_options, |
| 92 make_scoped_ptr(new LocalDataPipeImpl())); |
| 93 } |
| 94 |
86 void DataPipe::ProducerCancelAllAwakables() { | 95 void DataPipe::ProducerCancelAllAwakables() { |
87 base::AutoLock locker(lock_); | 96 base::AutoLock locker(lock_); |
88 DCHECK(has_local_producer_no_lock()); | 97 DCHECK(has_local_producer_no_lock()); |
89 producer_awakable_list_->CancelAll(); | 98 producer_awakable_list_->CancelAll(); |
90 } | 99 } |
91 | 100 |
92 void DataPipe::ProducerClose() { | 101 void DataPipe::ProducerClose() { |
93 base::AutoLock locker(lock_); | 102 base::AutoLock locker(lock_); |
94 ProducerCloseNoLock(); | 103 ProducerCloseNoLock(); |
95 } | 104 } |
(...skipping 13 matching lines...) Expand all Loading... |
109 uint32_t max_num_bytes_to_write = num_bytes.Get(); | 118 uint32_t max_num_bytes_to_write = num_bytes.Get(); |
110 if (max_num_bytes_to_write % element_num_bytes_ != 0) | 119 if (max_num_bytes_to_write % element_num_bytes_ != 0) |
111 return MOJO_RESULT_INVALID_ARGUMENT; | 120 return MOJO_RESULT_INVALID_ARGUMENT; |
112 | 121 |
113 if (max_num_bytes_to_write == 0) | 122 if (max_num_bytes_to_write == 0) |
114 return MOJO_RESULT_OK; // Nothing to do. | 123 return MOJO_RESULT_OK; // Nothing to do. |
115 | 124 |
116 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; | 125 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; |
117 | 126 |
118 HandleSignalsState old_consumer_state = | 127 HandleSignalsState old_consumer_state = |
119 ConsumerGetHandleSignalsStateImplNoLock(); | 128 impl_->ConsumerGetHandleSignalsState(); |
120 MojoResult rv = ProducerWriteDataImplNoLock( | 129 MojoResult rv = impl_->ProducerWriteData( |
121 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); | 130 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); |
122 HandleSignalsState new_consumer_state = | 131 HandleSignalsState new_consumer_state = |
123 ConsumerGetHandleSignalsStateImplNoLock(); | 132 impl_->ConsumerGetHandleSignalsState(); |
124 if (!new_consumer_state.equals(old_consumer_state)) | 133 if (!new_consumer_state.equals(old_consumer_state)) |
125 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); | 134 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
126 return rv; | 135 return rv; |
127 } | 136 } |
128 | 137 |
129 MojoResult DataPipe::ProducerBeginWriteData( | 138 MojoResult DataPipe::ProducerBeginWriteData( |
130 UserPointer<void*> buffer, | 139 UserPointer<void*> buffer, |
131 UserPointer<uint32_t> buffer_num_bytes, | 140 UserPointer<uint32_t> buffer_num_bytes, |
132 bool all_or_none) { | 141 bool all_or_none) { |
133 base::AutoLock locker(lock_); | 142 base::AutoLock locker(lock_); |
134 DCHECK(has_local_producer_no_lock()); | 143 DCHECK(has_local_producer_no_lock()); |
135 | 144 |
136 if (producer_in_two_phase_write_no_lock()) | 145 if (producer_in_two_phase_write_no_lock()) |
137 return MOJO_RESULT_BUSY; | 146 return MOJO_RESULT_BUSY; |
138 if (!consumer_open_no_lock()) | 147 if (!consumer_open_no_lock()) |
139 return MOJO_RESULT_FAILED_PRECONDITION; | 148 return MOJO_RESULT_FAILED_PRECONDITION; |
140 | 149 |
141 uint32_t min_num_bytes_to_write = 0; | 150 uint32_t min_num_bytes_to_write = 0; |
142 if (all_or_none) { | 151 if (all_or_none) { |
143 min_num_bytes_to_write = buffer_num_bytes.Get(); | 152 min_num_bytes_to_write = buffer_num_bytes.Get(); |
144 if (min_num_bytes_to_write % element_num_bytes_ != 0) | 153 if (min_num_bytes_to_write % element_num_bytes_ != 0) |
145 return MOJO_RESULT_INVALID_ARGUMENT; | 154 return MOJO_RESULT_INVALID_ARGUMENT; |
146 } | 155 } |
147 | 156 |
148 MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes, | 157 MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes, |
149 min_num_bytes_to_write); | 158 min_num_bytes_to_write); |
150 if (rv != MOJO_RESULT_OK) | 159 if (rv != MOJO_RESULT_OK) |
151 return rv; | 160 return rv; |
152 // Note: No need to awake producer awakables, even though we're going from | 161 // Note: No need to awake producer awakables, even though we're going from |
153 // writable to non-writable (since you can't wait on non-writability). | 162 // writable to non-writable (since you can't wait on non-writability). |
154 // Similarly, though this may have discarded data (in "may discard" mode), | 163 // Similarly, though this may have discarded data (in "may discard" mode), |
155 // making it non-readable, there's still no need to awake consumer awakables. | 164 // making it non-readable, there's still no need to awake consumer awakables. |
156 DCHECK(producer_in_two_phase_write_no_lock()); | 165 DCHECK(producer_in_two_phase_write_no_lock()); |
157 return MOJO_RESULT_OK; | 166 return MOJO_RESULT_OK; |
158 } | 167 } |
159 | 168 |
160 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { | 169 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { |
161 base::AutoLock locker(lock_); | 170 base::AutoLock locker(lock_); |
162 DCHECK(has_local_producer_no_lock()); | 171 DCHECK(has_local_producer_no_lock()); |
163 | 172 |
164 if (!producer_in_two_phase_write_no_lock()) | 173 if (!producer_in_two_phase_write_no_lock()) |
165 return MOJO_RESULT_FAILED_PRECONDITION; | 174 return MOJO_RESULT_FAILED_PRECONDITION; |
166 // Note: Allow successful completion of the two-phase write even if the | 175 // Note: Allow successful completion of the two-phase write even if the |
167 // consumer has been closed. | 176 // consumer has been closed. |
168 | 177 |
169 HandleSignalsState old_consumer_state = | 178 HandleSignalsState old_consumer_state = |
170 ConsumerGetHandleSignalsStateImplNoLock(); | 179 impl_->ConsumerGetHandleSignalsState(); |
171 MojoResult rv; | 180 MojoResult rv; |
172 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || | 181 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || |
173 num_bytes_written % element_num_bytes_ != 0) { | 182 num_bytes_written % element_num_bytes_ != 0) { |
174 rv = MOJO_RESULT_INVALID_ARGUMENT; | 183 rv = MOJO_RESULT_INVALID_ARGUMENT; |
175 producer_two_phase_max_num_bytes_written_ = 0; | 184 producer_two_phase_max_num_bytes_written_ = 0; |
176 } else { | 185 } else { |
177 rv = ProducerEndWriteDataImplNoLock(num_bytes_written); | 186 rv = impl_->ProducerEndWriteData(num_bytes_written); |
178 } | 187 } |
179 // Two-phase write ended even on failure. | 188 // Two-phase write ended even on failure. |
180 DCHECK(!producer_in_two_phase_write_no_lock()); | 189 DCHECK(!producer_in_two_phase_write_no_lock()); |
181 // If we're now writable, we *became* writable (since we weren't writable | 190 // If we're now writable, we *became* writable (since we weren't writable |
182 // during the two-phase write), so awake producer awakables. | 191 // during the two-phase write), so awake producer awakables. |
183 HandleSignalsState new_producer_state = | 192 HandleSignalsState new_producer_state = |
184 ProducerGetHandleSignalsStateImplNoLock(); | 193 impl_->ProducerGetHandleSignalsState(); |
185 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | 194 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
186 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); | 195 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
187 HandleSignalsState new_consumer_state = | 196 HandleSignalsState new_consumer_state = |
188 ConsumerGetHandleSignalsStateImplNoLock(); | 197 impl_->ConsumerGetHandleSignalsState(); |
189 if (!new_consumer_state.equals(old_consumer_state)) | 198 if (!new_consumer_state.equals(old_consumer_state)) |
190 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); | 199 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
191 return rv; | 200 return rv; |
192 } | 201 } |
193 | 202 |
194 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { | 203 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { |
195 base::AutoLock locker(lock_); | 204 base::AutoLock locker(lock_); |
196 DCHECK(has_local_producer_no_lock()); | 205 DCHECK(has_local_producer_no_lock()); |
197 return ProducerGetHandleSignalsStateImplNoLock(); | 206 return impl_->ProducerGetHandleSignalsState(); |
198 } | 207 } |
199 | 208 |
200 MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, | 209 MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, |
201 MojoHandleSignals signals, | 210 MojoHandleSignals signals, |
202 uint32_t context, | 211 uint32_t context, |
203 HandleSignalsState* signals_state) { | 212 HandleSignalsState* signals_state) { |
204 base::AutoLock locker(lock_); | 213 base::AutoLock locker(lock_); |
205 DCHECK(has_local_producer_no_lock()); | 214 DCHECK(has_local_producer_no_lock()); |
206 | 215 |
207 HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock(); | 216 HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState(); |
208 if (producer_state.satisfies(signals)) { | 217 if (producer_state.satisfies(signals)) { |
209 if (signals_state) | 218 if (signals_state) |
210 *signals_state = producer_state; | 219 *signals_state = producer_state; |
211 return MOJO_RESULT_ALREADY_EXISTS; | 220 return MOJO_RESULT_ALREADY_EXISTS; |
212 } | 221 } |
213 if (!producer_state.can_satisfy(signals)) { | 222 if (!producer_state.can_satisfy(signals)) { |
214 if (signals_state) | 223 if (signals_state) |
215 *signals_state = producer_state; | 224 *signals_state = producer_state; |
216 return MOJO_RESULT_FAILED_PRECONDITION; | 225 return MOJO_RESULT_FAILED_PRECONDITION; |
217 } | 226 } |
218 | 227 |
219 producer_awakable_list_->Add(awakable, signals, context); | 228 producer_awakable_list_->Add(awakable, signals, context); |
220 return MOJO_RESULT_OK; | 229 return MOJO_RESULT_OK; |
221 } | 230 } |
222 | 231 |
223 void DataPipe::ProducerRemoveAwakable(Awakable* awakable, | 232 void DataPipe::ProducerRemoveAwakable(Awakable* awakable, |
224 HandleSignalsState* signals_state) { | 233 HandleSignalsState* signals_state) { |
225 base::AutoLock locker(lock_); | 234 base::AutoLock locker(lock_); |
226 DCHECK(has_local_producer_no_lock()); | 235 DCHECK(has_local_producer_no_lock()); |
227 producer_awakable_list_->Remove(awakable); | 236 producer_awakable_list_->Remove(awakable); |
228 if (signals_state) | 237 if (signals_state) |
229 *signals_state = ProducerGetHandleSignalsStateImplNoLock(); | 238 *signals_state = impl_->ProducerGetHandleSignalsState(); |
230 } | 239 } |
231 | 240 |
232 void DataPipe::ProducerStartSerialize(Channel* channel, | 241 void DataPipe::ProducerStartSerialize(Channel* channel, |
233 size_t* max_size, | 242 size_t* max_size, |
234 size_t* max_platform_handles) { | 243 size_t* max_platform_handles) { |
235 base::AutoLock locker(lock_); | 244 base::AutoLock locker(lock_); |
236 DCHECK(has_local_producer_no_lock()); | 245 DCHECK(has_local_producer_no_lock()); |
237 ProducerStartSerializeImplNoLock(channel, max_size, max_platform_handles); | 246 impl_->ProducerStartSerialize(channel, max_size, max_platform_handles); |
238 } | 247 } |
239 | 248 |
240 bool DataPipe::ProducerEndSerialize( | 249 bool DataPipe::ProducerEndSerialize( |
241 Channel* channel, | 250 Channel* channel, |
242 void* destination, | 251 void* destination, |
243 size_t* actual_size, | 252 size_t* actual_size, |
244 embedder::PlatformHandleVector* platform_handles) { | 253 embedder::PlatformHandleVector* platform_handles) { |
245 base::AutoLock locker(lock_); | 254 base::AutoLock locker(lock_); |
246 DCHECK(has_local_producer_no_lock()); | 255 DCHECK(has_local_producer_no_lock()); |
247 return ProducerEndSerializeImplNoLock(channel, destination, actual_size, | 256 return impl_->ProducerEndSerialize(channel, destination, actual_size, |
248 platform_handles); | 257 platform_handles); |
249 } | 258 } |
250 | 259 |
251 bool DataPipe::ProducerIsBusy() const { | 260 bool DataPipe::ProducerIsBusy() const { |
252 base::AutoLock locker(lock_); | 261 base::AutoLock locker(lock_); |
253 return producer_in_two_phase_write_no_lock(); | 262 return producer_in_two_phase_write_no_lock(); |
254 } | 263 } |
255 | 264 |
256 void DataPipe::ConsumerCancelAllAwakables() { | 265 void DataPipe::ConsumerCancelAllAwakables() { |
257 base::AutoLock locker(lock_); | 266 base::AutoLock locker(lock_); |
258 DCHECK(has_local_consumer_no_lock()); | 267 DCHECK(has_local_consumer_no_lock()); |
(...skipping 18 matching lines...) Expand all Loading... |
277 uint32_t max_num_bytes_to_read = num_bytes.Get(); | 286 uint32_t max_num_bytes_to_read = num_bytes.Get(); |
278 if (max_num_bytes_to_read % element_num_bytes_ != 0) | 287 if (max_num_bytes_to_read % element_num_bytes_ != 0) |
279 return MOJO_RESULT_INVALID_ARGUMENT; | 288 return MOJO_RESULT_INVALID_ARGUMENT; |
280 | 289 |
281 if (max_num_bytes_to_read == 0) | 290 if (max_num_bytes_to_read == 0) |
282 return MOJO_RESULT_OK; // Nothing to do. | 291 return MOJO_RESULT_OK; // Nothing to do. |
283 | 292 |
284 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; | 293 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; |
285 | 294 |
286 HandleSignalsState old_producer_state = | 295 HandleSignalsState old_producer_state = |
287 ProducerGetHandleSignalsStateImplNoLock(); | 296 impl_->ProducerGetHandleSignalsState(); |
288 MojoResult rv = ConsumerReadDataImplNoLock( | 297 MojoResult rv = impl_->ConsumerReadData( |
289 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek); | 298 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek); |
290 HandleSignalsState new_producer_state = | 299 HandleSignalsState new_producer_state = |
291 ProducerGetHandleSignalsStateImplNoLock(); | 300 impl_->ProducerGetHandleSignalsState(); |
292 if (!new_producer_state.equals(old_producer_state)) | 301 if (!new_producer_state.equals(old_producer_state)) |
293 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); | 302 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
294 return rv; | 303 return rv; |
295 } | 304 } |
296 | 305 |
297 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, | 306 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, |
298 bool all_or_none) { | 307 bool all_or_none) { |
299 base::AutoLock locker(lock_); | 308 base::AutoLock locker(lock_); |
300 DCHECK(has_local_consumer_no_lock()); | 309 DCHECK(has_local_consumer_no_lock()); |
301 | 310 |
302 if (consumer_in_two_phase_read_no_lock()) | 311 if (consumer_in_two_phase_read_no_lock()) |
303 return MOJO_RESULT_BUSY; | 312 return MOJO_RESULT_BUSY; |
304 | 313 |
305 uint32_t max_num_bytes_to_discard = num_bytes.Get(); | 314 uint32_t max_num_bytes_to_discard = num_bytes.Get(); |
306 if (max_num_bytes_to_discard % element_num_bytes_ != 0) | 315 if (max_num_bytes_to_discard % element_num_bytes_ != 0) |
307 return MOJO_RESULT_INVALID_ARGUMENT; | 316 return MOJO_RESULT_INVALID_ARGUMENT; |
308 | 317 |
309 if (max_num_bytes_to_discard == 0) | 318 if (max_num_bytes_to_discard == 0) |
310 return MOJO_RESULT_OK; // Nothing to do. | 319 return MOJO_RESULT_OK; // Nothing to do. |
311 | 320 |
312 uint32_t min_num_bytes_to_discard = | 321 uint32_t min_num_bytes_to_discard = |
313 all_or_none ? max_num_bytes_to_discard : 0; | 322 all_or_none ? max_num_bytes_to_discard : 0; |
314 | 323 |
315 HandleSignalsState old_producer_state = | 324 HandleSignalsState old_producer_state = |
316 ProducerGetHandleSignalsStateImplNoLock(); | 325 impl_->ProducerGetHandleSignalsState(); |
317 MojoResult rv = ConsumerDiscardDataImplNoLock( | 326 MojoResult rv = impl_->ConsumerDiscardData( |
318 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); | 327 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); |
319 HandleSignalsState new_producer_state = | 328 HandleSignalsState new_producer_state = |
320 ProducerGetHandleSignalsStateImplNoLock(); | 329 impl_->ProducerGetHandleSignalsState(); |
321 if (!new_producer_state.equals(old_producer_state)) | 330 if (!new_producer_state.equals(old_producer_state)) |
322 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); | 331 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
323 return rv; | 332 return rv; |
324 } | 333 } |
325 | 334 |
326 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { | 335 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { |
327 base::AutoLock locker(lock_); | 336 base::AutoLock locker(lock_); |
328 DCHECK(has_local_consumer_no_lock()); | 337 DCHECK(has_local_consumer_no_lock()); |
329 | 338 |
330 if (consumer_in_two_phase_read_no_lock()) | 339 if (consumer_in_two_phase_read_no_lock()) |
331 return MOJO_RESULT_BUSY; | 340 return MOJO_RESULT_BUSY; |
332 | 341 |
333 // Note: Don't need to validate |*num_bytes| for query. | 342 // Note: Don't need to validate |*num_bytes| for query. |
334 return ConsumerQueryDataImplNoLock(num_bytes); | 343 return impl_->ConsumerQueryData(num_bytes); |
335 } | 344 } |
336 | 345 |
337 MojoResult DataPipe::ConsumerBeginReadData( | 346 MojoResult DataPipe::ConsumerBeginReadData( |
338 UserPointer<const void*> buffer, | 347 UserPointer<const void*> buffer, |
339 UserPointer<uint32_t> buffer_num_bytes, | 348 UserPointer<uint32_t> buffer_num_bytes, |
340 bool all_or_none) { | 349 bool all_or_none) { |
341 base::AutoLock locker(lock_); | 350 base::AutoLock locker(lock_); |
342 DCHECK(has_local_consumer_no_lock()); | 351 DCHECK(has_local_consumer_no_lock()); |
343 | 352 |
344 if (consumer_in_two_phase_read_no_lock()) | 353 if (consumer_in_two_phase_read_no_lock()) |
345 return MOJO_RESULT_BUSY; | 354 return MOJO_RESULT_BUSY; |
346 | 355 |
347 uint32_t min_num_bytes_to_read = 0; | 356 uint32_t min_num_bytes_to_read = 0; |
348 if (all_or_none) { | 357 if (all_or_none) { |
349 min_num_bytes_to_read = buffer_num_bytes.Get(); | 358 min_num_bytes_to_read = buffer_num_bytes.Get(); |
350 if (min_num_bytes_to_read % element_num_bytes_ != 0) | 359 if (min_num_bytes_to_read % element_num_bytes_ != 0) |
351 return MOJO_RESULT_INVALID_ARGUMENT; | 360 return MOJO_RESULT_INVALID_ARGUMENT; |
352 } | 361 } |
353 | 362 |
354 MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes, | 363 MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes, |
355 min_num_bytes_to_read); | 364 min_num_bytes_to_read); |
356 if (rv != MOJO_RESULT_OK) | 365 if (rv != MOJO_RESULT_OK) |
357 return rv; | 366 return rv; |
358 DCHECK(consumer_in_two_phase_read_no_lock()); | 367 DCHECK(consumer_in_two_phase_read_no_lock()); |
359 return MOJO_RESULT_OK; | 368 return MOJO_RESULT_OK; |
360 } | 369 } |
361 | 370 |
362 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { | 371 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { |
363 base::AutoLock locker(lock_); | 372 base::AutoLock locker(lock_); |
364 DCHECK(has_local_consumer_no_lock()); | 373 DCHECK(has_local_consumer_no_lock()); |
365 | 374 |
366 if (!consumer_in_two_phase_read_no_lock()) | 375 if (!consumer_in_two_phase_read_no_lock()) |
367 return MOJO_RESULT_FAILED_PRECONDITION; | 376 return MOJO_RESULT_FAILED_PRECONDITION; |
368 | 377 |
369 HandleSignalsState old_producer_state = | 378 HandleSignalsState old_producer_state = |
370 ProducerGetHandleSignalsStateImplNoLock(); | 379 impl_->ProducerGetHandleSignalsState(); |
371 MojoResult rv; | 380 MojoResult rv; |
372 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || | 381 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || |
373 num_bytes_read % element_num_bytes_ != 0) { | 382 num_bytes_read % element_num_bytes_ != 0) { |
374 rv = MOJO_RESULT_INVALID_ARGUMENT; | 383 rv = MOJO_RESULT_INVALID_ARGUMENT; |
375 consumer_two_phase_max_num_bytes_read_ = 0; | 384 consumer_two_phase_max_num_bytes_read_ = 0; |
376 } else { | 385 } else { |
377 rv = ConsumerEndReadDataImplNoLock(num_bytes_read); | 386 rv = impl_->ConsumerEndReadData(num_bytes_read); |
378 } | 387 } |
379 // Two-phase read ended even on failure. | 388 // Two-phase read ended even on failure. |
380 DCHECK(!consumer_in_two_phase_read_no_lock()); | 389 DCHECK(!consumer_in_two_phase_read_no_lock()); |
381 // If we're now readable, we *became* readable (since we weren't readable | 390 // If we're now readable, we *became* readable (since we weren't readable |
382 // during the two-phase read), so awake consumer awakables. | 391 // during the two-phase read), so awake consumer awakables. |
383 HandleSignalsState new_consumer_state = | 392 HandleSignalsState new_consumer_state = |
384 ConsumerGetHandleSignalsStateImplNoLock(); | 393 impl_->ConsumerGetHandleSignalsState(); |
385 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) | 394 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) |
386 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); | 395 AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
387 HandleSignalsState new_producer_state = | 396 HandleSignalsState new_producer_state = |
388 ProducerGetHandleSignalsStateImplNoLock(); | 397 impl_->ProducerGetHandleSignalsState(); |
389 if (!new_producer_state.equals(old_producer_state)) | 398 if (!new_producer_state.equals(old_producer_state)) |
390 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); | 399 AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
391 return rv; | 400 return rv; |
392 } | 401 } |
393 | 402 |
394 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { | 403 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { |
395 base::AutoLock locker(lock_); | 404 base::AutoLock locker(lock_); |
396 DCHECK(has_local_consumer_no_lock()); | 405 DCHECK(has_local_consumer_no_lock()); |
397 return ConsumerGetHandleSignalsStateImplNoLock(); | 406 return impl_->ConsumerGetHandleSignalsState(); |
398 } | 407 } |
399 | 408 |
400 MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, | 409 MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, |
401 MojoHandleSignals signals, | 410 MojoHandleSignals signals, |
402 uint32_t context, | 411 uint32_t context, |
403 HandleSignalsState* signals_state) { | 412 HandleSignalsState* signals_state) { |
404 base::AutoLock locker(lock_); | 413 base::AutoLock locker(lock_); |
405 DCHECK(has_local_consumer_no_lock()); | 414 DCHECK(has_local_consumer_no_lock()); |
406 | 415 |
407 HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock(); | 416 HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState(); |
408 if (consumer_state.satisfies(signals)) { | 417 if (consumer_state.satisfies(signals)) { |
409 if (signals_state) | 418 if (signals_state) |
410 *signals_state = consumer_state; | 419 *signals_state = consumer_state; |
411 return MOJO_RESULT_ALREADY_EXISTS; | 420 return MOJO_RESULT_ALREADY_EXISTS; |
412 } | 421 } |
413 if (!consumer_state.can_satisfy(signals)) { | 422 if (!consumer_state.can_satisfy(signals)) { |
414 if (signals_state) | 423 if (signals_state) |
415 *signals_state = consumer_state; | 424 *signals_state = consumer_state; |
416 return MOJO_RESULT_FAILED_PRECONDITION; | 425 return MOJO_RESULT_FAILED_PRECONDITION; |
417 } | 426 } |
418 | 427 |
419 consumer_awakable_list_->Add(awakable, signals, context); | 428 consumer_awakable_list_->Add(awakable, signals, context); |
420 return MOJO_RESULT_OK; | 429 return MOJO_RESULT_OK; |
421 } | 430 } |
422 | 431 |
423 void DataPipe::ConsumerRemoveAwakable(Awakable* awakable, | 432 void DataPipe::ConsumerRemoveAwakable(Awakable* awakable, |
424 HandleSignalsState* signals_state) { | 433 HandleSignalsState* signals_state) { |
425 base::AutoLock locker(lock_); | 434 base::AutoLock locker(lock_); |
426 DCHECK(has_local_consumer_no_lock()); | 435 DCHECK(has_local_consumer_no_lock()); |
427 consumer_awakable_list_->Remove(awakable); | 436 consumer_awakable_list_->Remove(awakable); |
428 if (signals_state) | 437 if (signals_state) |
429 *signals_state = ConsumerGetHandleSignalsStateImplNoLock(); | 438 *signals_state = impl_->ConsumerGetHandleSignalsState(); |
430 } | 439 } |
431 | 440 |
432 void DataPipe::ConsumerStartSerialize(Channel* channel, | 441 void DataPipe::ConsumerStartSerialize(Channel* channel, |
433 size_t* max_size, | 442 size_t* max_size, |
434 size_t* max_platform_handles) { | 443 size_t* max_platform_handles) { |
435 base::AutoLock locker(lock_); | 444 base::AutoLock locker(lock_); |
436 DCHECK(has_local_consumer_no_lock()); | 445 DCHECK(has_local_consumer_no_lock()); |
437 ConsumerStartSerializeImplNoLock(channel, max_size, max_platform_handles); | 446 impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles); |
438 } | 447 } |
439 | 448 |
440 bool DataPipe::ConsumerEndSerialize( | 449 bool DataPipe::ConsumerEndSerialize( |
441 Channel* channel, | 450 Channel* channel, |
442 void* destination, | 451 void* destination, |
443 size_t* actual_size, | 452 size_t* actual_size, |
444 embedder::PlatformHandleVector* platform_handles) { | 453 embedder::PlatformHandleVector* platform_handles) { |
445 base::AutoLock locker(lock_); | 454 base::AutoLock locker(lock_); |
446 DCHECK(has_local_consumer_no_lock()); | 455 DCHECK(has_local_consumer_no_lock()); |
447 return ConsumerEndSerializeImplNoLock(channel, destination, actual_size, | 456 return impl_->ConsumerEndSerialize(channel, destination, actual_size, |
448 platform_handles); | 457 platform_handles); |
449 } | 458 } |
450 | 459 |
451 bool DataPipe::ConsumerIsBusy() const { | 460 bool DataPipe::ConsumerIsBusy() const { |
452 base::AutoLock locker(lock_); | 461 base::AutoLock locker(lock_); |
453 return consumer_in_two_phase_read_no_lock(); | 462 return consumer_in_two_phase_read_no_lock(); |
454 } | 463 } |
455 | 464 |
456 DataPipe::DataPipe(bool has_local_producer, | 465 DataPipe::DataPipe(bool has_local_producer, |
457 bool has_local_consumer, | 466 bool has_local_consumer, |
458 const MojoCreateDataPipeOptions& validated_options) | 467 const MojoCreateDataPipeOptions& validated_options, |
| 468 scoped_ptr<DataPipeImpl> impl) |
459 : may_discard_((validated_options.flags & | 469 : may_discard_((validated_options.flags & |
460 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), | 470 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), |
461 element_num_bytes_(validated_options.element_num_bytes), | 471 element_num_bytes_(validated_options.element_num_bytes), |
462 capacity_num_bytes_(validated_options.capacity_num_bytes), | 472 capacity_num_bytes_(validated_options.capacity_num_bytes), |
463 producer_open_(true), | 473 producer_open_(true), |
464 consumer_open_(true), | 474 consumer_open_(true), |
465 producer_awakable_list_(has_local_producer ? new AwakableList() | 475 producer_awakable_list_(has_local_producer ? new AwakableList() |
466 : nullptr), | 476 : nullptr), |
467 consumer_awakable_list_(has_local_consumer ? new AwakableList() | 477 consumer_awakable_list_(has_local_consumer ? new AwakableList() |
468 : nullptr), | 478 : nullptr), |
469 producer_two_phase_max_num_bytes_written_(0), | 479 producer_two_phase_max_num_bytes_written_(0), |
470 consumer_two_phase_max_num_bytes_read_(0) { | 480 consumer_two_phase_max_num_bytes_read_(0), |
| 481 impl_(impl.Pass()) { |
| 482 impl_->set_owner(this); |
| 483 |
| 484 #if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
471 // Check that the passed in options actually are validated. | 485 // Check that the passed in options actually are validated. |
472 MojoCreateDataPipeOptions unused = {0}; | 486 MojoCreateDataPipeOptions unused = {0}; |
473 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), | 487 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), |
474 MOJO_RESULT_OK); | 488 MOJO_RESULT_OK); |
| 489 #endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
475 } | 490 } |
476 | 491 |
477 DataPipe::~DataPipe() { | 492 DataPipe::~DataPipe() { |
478 DCHECK(!producer_open_); | 493 DCHECK(!producer_open_); |
479 DCHECK(!consumer_open_); | 494 DCHECK(!consumer_open_); |
480 DCHECK(!producer_awakable_list_); | 495 DCHECK(!producer_awakable_list_); |
481 DCHECK(!consumer_awakable_list_); | 496 DCHECK(!consumer_awakable_list_); |
482 } | 497 } |
483 | 498 |
484 void DataPipe::ProducerCloseNoLock() { | 499 void DataPipe::ProducerCloseNoLock() { |
485 lock_.AssertAcquired(); | 500 lock_.AssertAcquired(); |
486 DCHECK(producer_open_); | 501 DCHECK(producer_open_); |
487 producer_open_ = false; | 502 producer_open_ = false; |
488 DCHECK(has_local_producer_no_lock()); | 503 DCHECK(has_local_producer_no_lock()); |
489 producer_awakable_list_.reset(); | 504 producer_awakable_list_.reset(); |
490 // Not a bug, except possibly in "user" code. | 505 // Not a bug, except possibly in "user" code. |
491 DVLOG_IF(2, producer_in_two_phase_write_no_lock()) | 506 DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
492 << "Producer closed with active two-phase write"; | 507 << "Producer closed with active two-phase write"; |
493 producer_two_phase_max_num_bytes_written_ = 0; | 508 producer_two_phase_max_num_bytes_written_ = 0; |
494 ProducerCloseImplNoLock(); | 509 impl_->ProducerClose(); |
495 AwakeConsumerAwakablesForStateChangeNoLock( | 510 AwakeConsumerAwakablesForStateChangeNoLock( |
496 ConsumerGetHandleSignalsStateImplNoLock()); | 511 impl_->ConsumerGetHandleSignalsState()); |
497 } | 512 } |
498 | 513 |
499 void DataPipe::ConsumerCloseNoLock() { | 514 void DataPipe::ConsumerCloseNoLock() { |
500 lock_.AssertAcquired(); | 515 lock_.AssertAcquired(); |
501 DCHECK(consumer_open_); | 516 DCHECK(consumer_open_); |
502 consumer_open_ = false; | 517 consumer_open_ = false; |
503 DCHECK(has_local_consumer_no_lock()); | 518 DCHECK(has_local_consumer_no_lock()); |
504 consumer_awakable_list_.reset(); | 519 consumer_awakable_list_.reset(); |
505 // Not a bug, except possibly in "user" code. | 520 // Not a bug, except possibly in "user" code. |
506 DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) | 521 DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
507 << "Consumer closed with active two-phase read"; | 522 << "Consumer closed with active two-phase read"; |
508 consumer_two_phase_max_num_bytes_read_ = 0; | 523 consumer_two_phase_max_num_bytes_read_ = 0; |
509 ConsumerCloseImplNoLock(); | 524 impl_->ConsumerClose(); |
510 AwakeProducerAwakablesForStateChangeNoLock( | 525 AwakeProducerAwakablesForStateChangeNoLock( |
511 ProducerGetHandleSignalsStateImplNoLock()); | 526 impl_->ProducerGetHandleSignalsState()); |
512 } | 527 } |
513 | 528 |
514 void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( | 529 void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( |
515 const HandleSignalsState& new_producer_state) { | 530 const HandleSignalsState& new_producer_state) { |
516 lock_.AssertAcquired(); | 531 lock_.AssertAcquired(); |
517 if (!has_local_producer_no_lock()) | 532 if (!has_local_producer_no_lock()) |
518 return; | 533 return; |
519 producer_awakable_list_->AwakeForStateChange(new_producer_state); | 534 producer_awakable_list_->AwakeForStateChange(new_producer_state); |
520 } | 535 } |
521 | 536 |
522 void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock( | 537 void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock( |
523 const HandleSignalsState& new_consumer_state) { | 538 const HandleSignalsState& new_consumer_state) { |
524 lock_.AssertAcquired(); | 539 lock_.AssertAcquired(); |
525 if (!has_local_consumer_no_lock()) | 540 if (!has_local_consumer_no_lock()) |
526 return; | 541 return; |
527 consumer_awakable_list_->AwakeForStateChange(new_consumer_state); | 542 consumer_awakable_list_->AwakeForStateChange(new_consumer_state); |
528 } | 543 } |
529 | 544 |
530 } // namespace system | 545 } // namespace system |
531 } // namespace mojo | 546 } // namespace mojo |
OLD | NEW |