OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe.h" | 5 #include "mojo/edk/system/data_pipe.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 #include <string.h> | 9 #include <string.h> |
10 | 10 |
11 #include <algorithm> | 11 #include <algorithm> |
12 #include <limits> | 12 #include <limits> |
13 | 13 |
14 #include "base/bind.h" | |
15 #include "mojo/edk/embedder/embedder_internal.h" | |
16 #include "mojo/edk/embedder/platform_support.h" | |
17 #include "mojo/edk/system/broker.h" | |
18 | |
14 #include "mojo/edk/system/configuration.h" | 19 #include "mojo/edk/system/configuration.h" |
15 #include "mojo/edk/system/options_validation.h" | 20 #include "mojo/edk/system/options_validation.h" |
16 #include "mojo/edk/system/raw_channel.h" | 21 #include "mojo/edk/system/raw_channel.h" |
17 #include "mojo/edk/system/transport_data.h" | 22 #include "mojo/edk/system/transport_data.h" |
18 | 23 |
19 namespace mojo { | 24 namespace mojo { |
20 namespace edk { | 25 namespace edk { |
21 | 26 |
22 namespace { | 27 namespace { |
23 | 28 |
24 const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1); | 29 const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1); |
25 | 30 |
26 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { | 31 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { |
27 uint32_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) | 32 uint32_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
28 | 33 |
29 // These are from MojoCreateDataPipeOptions | 34 // These are from MojoCreateDataPipeOptions |
30 MojoCreateDataPipeOptionsFlags flags; | 35 MojoCreateDataPipeOptionsFlags flags; |
31 uint32_t element_num_bytes; | 36 uint32_t element_num_bytes; |
32 uint32_t capacity_num_bytes; | 37 uint32_t capacity_num_bytes; |
33 | 38 |
34 uint32_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) | 39 uint32_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
35 uint32_t shared_memory_size; | 40 uint32_t channel_pending_read_size; |
41 uint32_t channel_pending_write_size; | |
42 | |
43 uint32_t shared_buffer_handle_index; | |
44 uint32_t ring_buffer_start; | |
45 uint32_t ring_buffer_size; | |
36 }; | 46 }; |
37 | 47 |
38 } // namespace | 48 } // namespace |
39 | 49 |
40 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { | 50 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
41 MojoCreateDataPipeOptions result = { | 51 MojoCreateDataPipeOptions result = { |
42 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), | 52 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
43 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, | 53 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
44 1u, | 54 1u, |
45 static_cast<uint32_t>( | 55 static_cast<uint32_t>( |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
93 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) | 103 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) |
94 return MOJO_RESULT_INVALID_ARGUMENT; | 104 return MOJO_RESULT_INVALID_ARGUMENT; |
95 if (reader.options().capacity_num_bytes > | 105 if (reader.options().capacity_num_bytes > |
96 GetConfiguration().max_data_pipe_capacity_bytes) | 106 GetConfiguration().max_data_pipe_capacity_bytes) |
97 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 107 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
98 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; | 108 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
99 | 109 |
100 return MOJO_RESULT_OK; | 110 return MOJO_RESULT_OK; |
101 } | 111 } |
102 | 112 |
103 void DataPipe::StartSerialize(bool have_channel_handle, | 113 DataPipe::DataPipe(const MojoCreateDataPipeOptions& options) |
104 bool have_shared_memory, | 114 : channel_(nullptr), |
105 size_t* max_size, | 115 options_(options), |
106 size_t* max_platform_handles) { | 116 channel_released_(false), |
117 shared_buffer_(nullptr), | |
118 mapping_(nullptr), | |
119 ring_buffer_start_(0u), | |
120 ring_buffer_size_(0u) {} | |
121 | |
122 DataPipe::~DataPipe() {} | |
123 | |
124 void DataPipe::Init(ScopedPlatformHandle message_pipe, | |
Anand Mistry (off Chromium)
2016/01/11 06:19:34
s/message_pipe/channel
where appropriate.
Eliot Courtney
2016/01/13 00:00:09
Done.
| |
125 char* serialized_write_buffer, | |
126 uint32_t serialized_write_buffer_size, | |
127 char* serialized_read_buffer, | |
128 uint32_t serialized_read_buffer_size, | |
129 ScopedPlatformHandle shared_buffer_handle, | |
130 uint32_t ring_buffer_start, | |
131 uint32_t ring_buffer_size, | |
132 bool is_producer, | |
133 const base::Closure& init_callback) { | |
134 is_producer_ = is_producer; | |
135 | |
136 ring_buffer_start_ = ring_buffer_start; | |
137 ring_buffer_size_ = ring_buffer_size; | |
138 | |
139 if (shared_buffer_handle.is_valid()) { | |
140 shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle( | |
141 options_.capacity_num_bytes, std::move(shared_buffer_handle)); | |
142 DCHECK(shared_buffer_); | |
143 } else if (is_producer_) { | |
144 #if defined(OS_WIN) | |
145 shared_buffer_ = internal::g_platform_support->CreateSharedBuffer( | |
146 options_.capacity_num_bytes); | |
147 #else | |
148 shared_buffer_ = | |
149 internal::g_broker->CreateSharedBuffer(options_.capacity_num_bytes); | |
150 #endif | |
151 CHECK(shared_buffer_); | |
152 } | |
153 | |
154 if (message_pipe.is_valid()) { | |
155 channel_ = RawChannel::Create(std::move(message_pipe)); | |
156 channel_->SetSerializedData(serialized_read_buffer, | |
157 serialized_read_buffer_size, | |
158 serialized_write_buffer, | |
159 serialized_write_buffer_size, nullptr, nullptr); | |
160 internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback); | |
161 channel_->EnsureLazyInitialized(); | |
162 | |
163 if (is_producer_) { | |
164 internal::g_io_thread_task_runner->PostTask( | |
165 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); | |
166 } | |
167 } | |
168 } | |
169 | |
170 void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) { | |
171 // We need to release the channel and get its pending reads/writes. | |
172 if (!channel_released_) { | |
173 Serialize(); | |
174 } | |
175 | |
107 *max_size = sizeof(SerializedDataPipeHandleDispatcher); | 176 *max_size = sizeof(SerializedDataPipeHandleDispatcher); |
108 *max_platform_handles = 0; | 177 *max_platform_handles = 0; |
109 if (have_channel_handle) | 178 if (serialized_channel_handle_.is_valid()) |
110 (*max_platform_handles)++; | 179 (*max_platform_handles)++; |
111 if (have_shared_memory) | 180 // For shared buffer to transfer pending reads / writes. |
181 if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty()) | |
182 (*max_platform_handles)++; | |
183 if (shared_buffer_) | |
112 (*max_platform_handles)++; | 184 (*max_platform_handles)++; |
113 } | 185 } |
114 | 186 |
115 void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, | 187 void DataPipe::Serialize() { |
116 ScopedPlatformHandle channel_handle, | 188 // We need to stop watching handle immediately, even though not on IO thread, |
117 ScopedPlatformHandle shared_memory_handle, | 189 // so that other messages aren't read after this. |
118 size_t shared_memory_size, | 190 if (channel_) { |
119 void* destination, | 191 std::vector<int> fds; |
192 bool write_error = false; | |
193 serialized_channel_handle_ = channel_->ReleaseHandle( | |
Anand Mistry (off Chromium)
2016/01/11 06:19:34
From what I can tell, this is racy. Basically, thi
Eliot Courtney
2016/01/14 02:17:57
Discussed offline -- this is fairly impossible to
| |
194 &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds, | |
195 &write_error); | |
196 | |
197 CHECK(fds.empty()); | |
198 if (write_error) { | |
199 serialized_channel_handle_.reset(); | |
200 } | |
201 channel_ = nullptr; | |
202 } | |
203 channel_released_ = true; | |
204 } | |
205 | |
206 void DataPipe::EndSerialize(void* destination, | |
120 size_t* actual_size, | 207 size_t* actual_size, |
121 PlatformHandleVector* platform_handles) { | 208 PlatformHandleVector* platform_handles) { |
209 ScopedPlatformHandle channel_shared_handle; | |
210 size_t channel_shared_size = | |
211 serialized_read_buffer_.size() + serialized_write_buffer_.size(); | |
212 if (channel_shared_size) { | |
213 #if defined(OS_WIN) | |
214 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
215 internal::g_platform_support->CreateSharedBuffer(channel_shared_size)); | |
216 #else | |
217 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
218 internal::g_broker->CreateSharedBuffer(channel_shared_size)); | |
219 #endif | |
220 | |
221 scoped_ptr<PlatformSharedBufferMapping> mapping( | |
222 shared_buffer->Map(0, channel_shared_size)); | |
223 | |
224 void* base = mapping->GetBase(); | |
225 if (!serialized_read_buffer_.empty()) { | |
226 memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size()); | |
227 } | |
228 if (!serialized_write_buffer_.empty()) { | |
229 memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(), | |
230 &serialized_write_buffer_[0], serialized_write_buffer_.size()); | |
231 } | |
232 | |
233 channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release()); | |
234 } | |
235 | |
122 SerializedDataPipeHandleDispatcher* serialization = | 236 SerializedDataPipeHandleDispatcher* serialization = |
123 static_cast<SerializedDataPipeHandleDispatcher*>(destination); | 237 static_cast<SerializedDataPipeHandleDispatcher*>(destination); |
124 if (channel_handle.is_valid()) { | 238 if (serialized_channel_handle_.is_valid()) { |
125 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | 239 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
126 serialization->platform_handle_index = | 240 serialization->channel_handle_index = |
127 static_cast<uint32_t>(platform_handles->size()); | 241 static_cast<uint32_t>(platform_handles->size()); |
128 platform_handles->push_back(channel_handle.release()); | 242 platform_handles->push_back(serialized_channel_handle_.release()); |
129 } else { | 243 } else { |
130 serialization->platform_handle_index = kInvalidDataPipeHandleIndex; | 244 serialization->channel_handle_index = kInvalidDataPipeHandleIndex; |
131 } | 245 } |
132 | 246 |
133 serialization->flags = options.flags; | 247 serialization->flags = options_.flags; |
134 serialization->element_num_bytes = options.element_num_bytes; | 248 serialization->element_num_bytes = options_.element_num_bytes; |
135 serialization->capacity_num_bytes = options.capacity_num_bytes; | 249 serialization->capacity_num_bytes = options_.capacity_num_bytes; |
136 | 250 |
137 serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); | 251 serialization->channel_pending_read_size = |
138 if (serialization->shared_memory_size) { | 252 static_cast<uint32_t>(serialized_read_buffer_.size()); |
253 serialization->channel_pending_write_size = | |
254 static_cast<uint32_t>(serialized_write_buffer_.size()); | |
255 | |
256 if (channel_shared_handle.is_valid()) { | |
139 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | 257 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
140 serialization->shared_memory_handle_index = | 258 serialization->channel_shared_handle_index = |
141 static_cast<uint32_t>(platform_handles->size()); | 259 static_cast<uint32_t>(platform_handles->size()); |
142 platform_handles->push_back(shared_memory_handle.release()); | 260 platform_handles->push_back(channel_shared_handle.release()); |
143 } else { | 261 } else { |
144 serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; | 262 serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex; |
145 } | 263 } |
146 | 264 |
265 ScopedPlatformHandle shared_buffer_handle; | |
266 if (shared_buffer_) { | |
267 shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release()); | |
268 } | |
269 | |
270 if (shared_buffer_handle.is_valid()) { | |
271 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | |
272 serialization->shared_buffer_handle_index = | |
273 static_cast<uint32_t>(platform_handles->size()); | |
274 platform_handles->push_back(shared_buffer_handle.release()); | |
275 } else { | |
276 serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex; | |
277 } | |
278 | |
279 serialization->ring_buffer_start = ring_buffer_start_; | |
280 serialization->ring_buffer_size = ring_buffer_size_; | |
281 | |
147 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); | 282 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); |
148 } | 283 } |
149 | 284 |
150 ScopedPlatformHandle DataPipe::Deserialize( | 285 ScopedPlatformHandle DataPipe::Deserialize( |
151 const void* source, | 286 const void* source, |
152 size_t size, | 287 size_t size, |
153 PlatformHandleVector* platform_handles, | 288 PlatformHandleVector* platform_handles, |
154 MojoCreateDataPipeOptions* options, | 289 MojoCreateDataPipeOptions* options, |
155 ScopedPlatformHandle* shared_memory_handle, | 290 ScopedPlatformHandle* channel_shared_handle, |
156 size_t* shared_memory_size) { | 291 uint32_t* serialized_read_buffer_size, |
292 uint32_t* serialized_write_buffer_size, | |
293 ScopedPlatformHandle* shared_buffer_handle, | |
294 uint32_t* ring_buffer_start, | |
295 uint32_t* ring_buffer_size) { | |
157 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { | 296 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { |
158 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; | 297 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; |
159 return ScopedPlatformHandle(); | 298 return ScopedPlatformHandle(); |
160 } | 299 } |
161 | 300 |
162 const SerializedDataPipeHandleDispatcher* serialization = | 301 const SerializedDataPipeHandleDispatcher* serialization = |
163 static_cast<const SerializedDataPipeHandleDispatcher*>(source); | 302 static_cast<const SerializedDataPipeHandleDispatcher*>(source); |
164 size_t platform_handle_index = serialization->platform_handle_index; | |
165 | 303 |
166 // Starts off invalid, which is what we want. | 304 // Starts off invalid, which is what we want. |
167 PlatformHandle platform_handle; | 305 PlatformHandle channel_handle; |
168 if (platform_handle_index != kInvalidDataPipeHandleIndex) { | 306 uint32_t channel_handle_index = serialization->channel_handle_index; |
169 if (!platform_handles || | 307 if (channel_handle_index != kInvalidDataPipeHandleIndex) { |
170 platform_handle_index >= platform_handles->size()) { | 308 if (!platform_handles || channel_handle_index >= platform_handles->size()) { |
171 LOG(ERROR) | 309 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
172 << "Invalid serialized data pipe dispatcher (missing handles)"; | |
173 return ScopedPlatformHandle(); | 310 return ScopedPlatformHandle(); |
174 } | 311 } |
175 | 312 |
176 // We take ownership of the handle, so we have to invalidate the one in | 313 // We take ownership of the handle, so we have to invalidate the one in |
177 // |platform_handles|. | 314 // |platform_handles|. |
178 std::swap(platform_handle, (*platform_handles)[platform_handle_index]); | 315 std::swap(channel_handle, (*platform_handles)[channel_handle_index]); |
179 } | 316 } |
180 | 317 |
181 options->struct_size = sizeof(MojoCreateDataPipeOptions); | 318 options->struct_size = sizeof(MojoCreateDataPipeOptions); |
182 options->flags = serialization->flags; | 319 options->flags = serialization->flags; |
183 options->element_num_bytes = serialization->element_num_bytes; | 320 options->element_num_bytes = serialization->element_num_bytes; |
184 options->capacity_num_bytes = serialization->capacity_num_bytes; | 321 options->capacity_num_bytes = serialization->capacity_num_bytes; |
185 | 322 *serialized_read_buffer_size = serialization->channel_pending_read_size; |
186 if (shared_memory_size) { | 323 *serialized_write_buffer_size = serialization->channel_pending_write_size; |
187 *shared_memory_size = serialization->shared_memory_size; | 324 *ring_buffer_start = serialization->ring_buffer_start; |
188 if (*shared_memory_size) { | 325 *ring_buffer_size = serialization->ring_buffer_size; |
189 DCHECK(serialization->shared_memory_handle_index != | 326 |
190 kInvalidDataPipeHandleIndex); | 327 uint32_t channel_shared_handle_index = |
191 if (!platform_handles || | 328 serialization->channel_shared_handle_index; |
192 serialization->shared_memory_handle_index >= | 329 if (*serialized_read_buffer_size || *serialized_write_buffer_size) { |
193 platform_handles->size()) { | 330 DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex); |
194 LOG(ERROR) << "Invalid serialized data pipe dispatcher " | 331 if (!platform_handles || |
195 << "(missing handles)"; | 332 channel_shared_handle_index >= platform_handles->size()) { |
196 return ScopedPlatformHandle(); | 333 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
334 return ScopedPlatformHandle(); | |
335 } | |
336 | |
337 // Take ownership. | |
338 PlatformHandle temp_channel_shared_handle; | |
339 std::swap(temp_channel_shared_handle, | |
340 (*platform_handles)[channel_shared_handle_index]); | |
341 *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle); | |
342 } | |
343 | |
344 uint32_t shared_buffer_handle_index = | |
345 serialization->shared_buffer_handle_index; | |
346 if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) { | |
347 if (!platform_handles || | |
348 shared_buffer_handle_index >= platform_handles->size()) { | |
349 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; | |
350 return ScopedPlatformHandle(); | |
351 } | |
352 | |
353 // Take ownership. | |
354 PlatformHandle temp_shared_buffer_handle; | |
355 std::swap(temp_shared_buffer_handle, | |
356 (*platform_handles)[shared_buffer_handle_index]); | |
357 *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle); | |
358 } | |
359 | |
360 return ScopedPlatformHandle(channel_handle); | |
361 } | |
362 | |
363 uint8_t* DataPipe::GetSharedBufferBase() { | |
364 if (!mapping_) { | |
365 DCHECK(shared_buffer_); | |
366 mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes); | |
367 } | |
368 if (!mapping_) | |
369 return nullptr; | |
370 return static_cast<uint8_t*>(mapping_->GetBase()); | |
371 } | |
372 | |
373 bool DataPipe::WriteDataIntoSharedBuffer(const void* elements, | |
374 uint32_t num_bytes) { | |
375 uint32_t buffer_size = options_.capacity_num_bytes; | |
376 uint8_t* base = GetSharedBufferBase(); | |
377 if (!base) | |
378 return false; | |
379 | |
380 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); | |
381 DCHECK_LE(num_bytes, buffer_size - ring_buffer_size_); | |
382 | |
383 uint32_t ring_buffer_end = | |
384 (ring_buffer_start_ + ring_buffer_size_) % buffer_size; | |
385 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
386 uint32_t bytes_until_end = buffer_size - ring_buffer_end; | |
387 memcpy(base + ring_buffer_end, elements, | |
388 std::min(num_bytes, bytes_until_end)); | |
389 | |
390 if (bytes_until_end < num_bytes) { | |
391 memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end, | |
392 num_bytes - bytes_until_end); | |
393 } | |
394 } else { | |
395 memcpy(base + ring_buffer_end, elements, num_bytes); | |
396 } | |
397 | |
398 return true; | |
399 } | |
400 | |
401 bool DataPipe::ReadDataFromSharedBuffer(void* data, uint32_t num_bytes) { | |
402 DCHECK_LE(num_bytes, ring_buffer_size_); | |
403 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); | |
404 | |
405 uint32_t buffer_size = options_.capacity_num_bytes; | |
406 uint8_t* base = GetSharedBufferBase(); | |
407 if (!base) | |
408 return false; | |
409 | |
410 uint32_t ring_buffer_end = | |
411 (ring_buffer_start_ + ring_buffer_size_) % buffer_size; | |
412 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
413 memcpy(data, base + ring_buffer_start_, num_bytes); | |
414 } else { | |
415 uint32_t bytes_until_end = buffer_size - ring_buffer_start_; | |
416 memcpy(data, base + ring_buffer_start_, | |
417 std::min(num_bytes, bytes_until_end)); | |
418 | |
419 if (bytes_until_end < num_bytes) { | |
420 memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base, | |
421 num_bytes - bytes_until_end); | |
422 } | |
423 } | |
424 | |
425 return true; | |
426 } | |
427 | |
428 uint32_t DataPipe::GetReadableBytes() const { | |
429 return shared_buffer_ ? ring_buffer_size_ : 0; | |
430 } | |
431 | |
432 uint32_t DataPipe::GetWritableBytes() const { | |
433 return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0; | |
434 } | |
435 | |
436 void DataPipe::UpdateFromRead(uint32_t num_bytes) { | |
437 ring_buffer_start_ = | |
438 (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes; | |
439 ring_buffer_size_ -= num_bytes; | |
440 DCHECK_GE(ring_buffer_size_, 0u); | |
441 } | |
442 | |
443 void DataPipe::UpdateFromWrite(uint32_t num_bytes) { | |
444 ring_buffer_size_ += num_bytes; | |
445 DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes); | |
446 } | |
447 | |
448 bool DataPipe::NotifyWrite(uint32_t num_bytes) { | |
449 DCHECK(is_producer_); | |
450 | |
451 if (!channel_) | |
452 return false; | |
453 | |
454 DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes}; | |
455 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
456 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
457 if (!channel_->WriteMessage(std::move(message))) { | |
458 return false; | |
459 } | |
460 return true; | |
461 } | |
462 | |
463 bool DataPipe::NotifyRead(uint32_t num_bytes) { | |
464 DCHECK(!is_producer_); | |
465 | |
466 if (!channel_) | |
467 return false; | |
468 DataPipeCommandHeader command = {DATA_READ, num_bytes}; | |
469 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
470 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
471 if (!channel_->WriteMessage(std::move(message))) { | |
472 return false; | |
473 } | |
474 return true; | |
475 } | |
476 | |
477 void DataPipe::NotifySharedBuffer() { | |
478 if (!channel_) { | |
479 DLOG(ERROR) << "NotifySharedBuffer: No channel"; | |
480 return; | |
481 } | |
482 DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER, | |
483 options_.capacity_num_bytes}; | |
484 ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle(); | |
485 DCHECK(handle.is_valid()); | |
486 ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector()); | |
487 fds->push_back(handle.release()); | |
488 | |
489 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
490 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
491 message->SetTransportData(make_scoped_ptr(new TransportData( | |
492 std::move(fds), channel_->GetSerializedPlatformHandleSize()))); | |
493 if (!channel_->WriteMessage(std::move(message))) { | |
494 DLOG(ERROR) << "NotifySharedBuffer: Can't write"; | |
495 } | |
496 } | |
497 | |
498 void DataPipe::UpdateSharedBuffer( | |
499 scoped_refptr<PlatformSharedBuffer> shared_buffer) { | |
500 DCHECK(shared_buffer); | |
501 shared_buffer_ = shared_buffer; | |
502 mapping_ = nullptr; | |
503 } | |
504 | |
505 void DataPipe::Shutdown() { | |
506 if (channel_) { | |
507 channel_->Shutdown(); | |
508 channel_ = nullptr; | |
509 } | |
510 } | |
511 | |
512 void DataPipe::CreateEquivalentAndClose(DataPipe* out) { | |
513 // Serialize, releasing the handle, otherwise we will get callbacks to the | |
514 // wrong delegate for RawChannel. | |
515 Serialize(); | |
516 | |
517 std::swap(options_, out->options_); | |
518 std::swap(channel_released_, out->channel_released_); | |
519 serialized_read_buffer_.swap(out->serialized_read_buffer_); | |
520 serialized_write_buffer_.swap(out->serialized_write_buffer_); | |
521 serialized_channel_handle_.swap(out->serialized_channel_handle_); | |
522 shared_buffer_.swap(out->shared_buffer_); | |
523 mapping_.swap(out->mapping_); | |
524 std::swap(ring_buffer_start_, out->ring_buffer_start_); | |
525 std::swap(ring_buffer_size_, out->ring_buffer_size_); | |
526 std::swap(is_producer_, out->is_producer_); | |
527 } | |
528 | |
529 void* DataPipe::GetWriteBuffer(uint32_t* num_bytes) { | |
530 if (!shared_buffer_) { | |
531 *num_bytes = 0; | |
532 return nullptr; | |
533 } | |
534 // Must provide sequential memory. | |
535 uint32_t buffer_size = options_.capacity_num_bytes; | |
536 uint32_t ring_buffer_end = | |
537 (ring_buffer_start_ + ring_buffer_size_) % buffer_size; | |
538 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
539 *num_bytes = buffer_size - ring_buffer_end; | |
540 } else { | |
541 *num_bytes = ring_buffer_start_ - ring_buffer_end; | |
542 } | |
543 | |
544 return GetSharedBufferBase() + ring_buffer_end; | |
545 } | |
546 | |
547 const void* DataPipe::GetReadBuffer(uint32_t* num_bytes) { | |
548 if (!shared_buffer_) { | |
549 *num_bytes = 0; | |
550 return nullptr; | |
551 } | |
552 // Must provide sequential memory. | |
553 uint32_t buffer_size = options_.capacity_num_bytes; | |
554 uint32_t ring_buffer_end = | |
555 (ring_buffer_start_ + ring_buffer_size_) % buffer_size; | |
556 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
557 *num_bytes = ring_buffer_size_; | |
558 } else { | |
559 *num_bytes = buffer_size - ring_buffer_start_; | |
560 } | |
561 | |
562 return GetSharedBufferBase() + ring_buffer_start_; | |
563 } | |
564 | |
565 bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command, | |
566 ScopedPlatformHandleVectorPtr platform_handles) { | |
567 PlatformHandle handle; | |
568 ScopedPlatformHandle scoped_handle; | |
569 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
570 bool was_unreadable = GetReadableBytes() == 0; | |
571 bool was_unwritable = GetWritableBytes() == 0; | |
572 switch (command.command) { | |
573 case NOTIFY_SHARED_BUFFER: | |
574 CHECK_EQ(platform_handles->size(), 1u); | |
575 std::swap(handle, (*platform_handles.get())[0]); | |
576 scoped_handle.reset(handle); | |
577 UpdateSharedBuffer( | |
578 internal::g_platform_support->CreateSharedBufferFromHandle( | |
579 command.num_bytes, std::move(scoped_handle))); | |
580 break; | |
581 case DATA_WRITTEN: | |
582 if (!is_producer_) { | |
583 UpdateFromWrite(command.num_bytes); | |
584 } else { | |
585 LOG(ERROR) << "Producer was told of a write, which shouldn't happen."; | |
586 DCHECK(0); | |
Anand Mistry (off Chromium)
2016/01/11 06:19:34
NOTREACHED();
Eliot Courtney
2016/01/13 00:00:09
Done.
| |
197 } | 587 } |
198 | 588 break; |
199 PlatformHandle temp_shared_memory_handle; | 589 case DATA_READ: |
200 std::swap(temp_shared_memory_handle, | 590 if (is_producer_) { |
201 (*platform_handles)[serialization->shared_memory_handle_index]); | 591 UpdateFromRead(command.num_bytes); |
202 *shared_memory_handle = | 592 } else { |
203 ScopedPlatformHandle(temp_shared_memory_handle); | 593 LOG(ERROR) << "Consumer was told of a read, which shouldn't happen."; |
204 } | 594 DCHECK(0); |
205 } | 595 } |
206 | 596 break; |
207 size -= sizeof(SerializedDataPipeHandleDispatcher); | 597 default: |
208 | 598 LOG(ERROR) << "Shouldn't happen"; |
209 return ScopedPlatformHandle(platform_handle); | 599 DCHECK(0); |
600 } | |
601 | |
602 // Handles write/read case and shared buffer becoming available case. | |
603 return (was_unreadable && GetReadableBytes()) || | |
604 (was_unwritable && GetWritableBytes()); | |
210 } | 605 } |
211 | 606 |
212 } // namespace edk | 607 } // namespace edk |
213 } // namespace mojo | 608 } // namespace mojo |
OLD | NEW |