Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe.h" | 5 #include "mojo/edk/system/data_pipe.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 #include <string.h> | 9 #include <string.h> |
| 10 | 10 |
| 11 #include <algorithm> | 11 #include <algorithm> |
| 12 #include <limits> | 12 #include <limits> |
| 13 | 13 |
| 14 #include "base/bind.h" | |
| 15 #include "mojo/edk/embedder/embedder_internal.h" | |
| 16 #include "mojo/edk/embedder/platform_support.h" | |
| 17 #include "mojo/edk/system/broker.h" | |
| 18 | |
| 14 #include "mojo/edk/system/configuration.h" | 19 #include "mojo/edk/system/configuration.h" |
| 15 #include "mojo/edk/system/options_validation.h" | 20 #include "mojo/edk/system/options_validation.h" |
| 16 #include "mojo/edk/system/raw_channel.h" | 21 #include "mojo/edk/system/raw_channel.h" |
| 17 #include "mojo/edk/system/transport_data.h" | 22 #include "mojo/edk/system/transport_data.h" |
| 18 | 23 |
| 19 namespace mojo { | 24 namespace mojo { |
| 20 namespace edk { | 25 namespace edk { |
| 21 | 26 |
| 22 namespace { | 27 namespace { |
| 23 | 28 |
| 24 const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1); | 29 const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1); |
| 25 | 30 |
| 26 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { | 31 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { |
| 27 uint32_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) | 32 uint32_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
| 28 | 33 |
| 29 // These are from MojoCreateDataPipeOptions | 34 // These are from MojoCreateDataPipeOptions |
| 30 MojoCreateDataPipeOptionsFlags flags; | 35 MojoCreateDataPipeOptionsFlags flags; |
| 31 uint32_t element_num_bytes; | 36 uint32_t element_num_bytes; |
| 32 uint32_t capacity_num_bytes; | 37 uint32_t capacity_num_bytes; |
| 33 | 38 |
| 34 uint32_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) | 39 uint32_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
| 35 uint32_t shared_memory_size; | 40 uint32_t channel_pending_read_size; |
| 41 uint32_t channel_pending_write_size; | |
| 42 | |
| 43 uint32_t shared_buffer_handle_index; | |
| 44 uint32_t ring_buffer_start; | |
| 45 uint32_t ring_buffer_size; | |
| 36 }; | 46 }; |
| 37 | 47 |
| 38 } // namespace | 48 } // namespace |
| 39 | 49 |
| 40 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { | 50 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
| 41 MojoCreateDataPipeOptions result = { | 51 MojoCreateDataPipeOptions result = { |
| 42 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), | 52 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
| 43 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, | 53 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
| 44 1u, | 54 1u, |
| 45 static_cast<uint32_t>( | 55 static_cast<uint32_t>( |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 93 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) | 103 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) |
| 94 return MOJO_RESULT_INVALID_ARGUMENT; | 104 return MOJO_RESULT_INVALID_ARGUMENT; |
| 95 if (reader.options().capacity_num_bytes > | 105 if (reader.options().capacity_num_bytes > |
| 96 GetConfiguration().max_data_pipe_capacity_bytes) | 106 GetConfiguration().max_data_pipe_capacity_bytes) |
| 97 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 107 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 98 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; | 108 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
| 99 | 109 |
| 100 return MOJO_RESULT_OK; | 110 return MOJO_RESULT_OK; |
| 101 } | 111 } |
| 102 | 112 |
| 103 void DataPipe::StartSerialize(bool have_channel_handle, | 113 DataPipe::DataPipe(const MojoCreateDataPipeOptions& options) |
| 104 bool have_shared_memory, | 114 : channel_(nullptr), |
| 105 size_t* max_size, | 115 options_(options), |
| 106 size_t* max_platform_handles) { | 116 channel_released_(false), |
| 117 shared_buffer_(nullptr), | |
| 118 mapping_(nullptr), | |
| 119 ring_buffer_start_(0u), | |
| 120 ring_buffer_size_(0u) {} | |
| 121 | |
| 122 DataPipe::~DataPipe() {} | |
| 123 | |
| 124 void DataPipe::Init(ScopedPlatformHandle message_pipe, | |
|
Anand Mistry (off Chromium)
2016/01/11 06:19:34
s/message_pipe/channel
where appropriate.
Eliot Courtney
2016/01/13 00:00:09
Done.
| |
| 125 char* serialized_write_buffer, | |
| 126 uint32_t serialized_write_buffer_size, | |
| 127 char* serialized_read_buffer, | |
| 128 uint32_t serialized_read_buffer_size, | |
| 129 ScopedPlatformHandle shared_buffer_handle, | |
| 130 uint32_t ring_buffer_start, | |
| 131 uint32_t ring_buffer_size, | |
| 132 bool is_producer, | |
| 133 const base::Closure& init_callback) { | |
| 134 is_producer_ = is_producer; | |
| 135 | |
| 136 ring_buffer_start_ = ring_buffer_start; | |
| 137 ring_buffer_size_ = ring_buffer_size; | |
| 138 | |
| 139 if (shared_buffer_handle.is_valid()) { | |
| 140 shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle( | |
| 141 options_.capacity_num_bytes, std::move(shared_buffer_handle)); | |
| 142 DCHECK(shared_buffer_); | |
| 143 } else if (is_producer_) { | |
| 144 #if defined(OS_WIN) | |
| 145 shared_buffer_ = internal::g_platform_support->CreateSharedBuffer( | |
| 146 options_.capacity_num_bytes); | |
| 147 #else | |
| 148 shared_buffer_ = | |
| 149 internal::g_broker->CreateSharedBuffer(options_.capacity_num_bytes); | |
| 150 #endif | |
| 151 CHECK(shared_buffer_); | |
| 152 } | |
| 153 | |
| 154 if (message_pipe.is_valid()) { | |
| 155 channel_ = RawChannel::Create(std::move(message_pipe)); | |
| 156 channel_->SetSerializedData(serialized_read_buffer, | |
| 157 serialized_read_buffer_size, | |
| 158 serialized_write_buffer, | |
| 159 serialized_write_buffer_size, nullptr, nullptr); | |
| 160 internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback); | |
| 161 channel_->EnsureLazyInitialized(); | |
| 162 | |
| 163 if (is_producer_) { | |
| 164 internal::g_io_thread_task_runner->PostTask( | |
| 165 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); | |
| 166 } | |
| 167 } | |
| 168 } | |
| 169 | |
| 170 void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) { | |
| 171 // We need to release the channel and get its pending reads/writes. | |
| 172 if (!channel_released_) { | |
| 173 Serialize(); | |
| 174 } | |
| 175 | |
| 107 *max_size = sizeof(SerializedDataPipeHandleDispatcher); | 176 *max_size = sizeof(SerializedDataPipeHandleDispatcher); |
| 108 *max_platform_handles = 0; | 177 *max_platform_handles = 0; |
| 109 if (have_channel_handle) | 178 if (serialized_channel_handle_.is_valid()) |
| 110 (*max_platform_handles)++; | 179 (*max_platform_handles)++; |
| 111 if (have_shared_memory) | 180 // For shared buffer to transfer pending reads / writes. |
| 181 if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty()) | |
| 182 (*max_platform_handles)++; | |
| 183 if (shared_buffer_) | |
| 112 (*max_platform_handles)++; | 184 (*max_platform_handles)++; |
| 113 } | 185 } |
| 114 | 186 |
| 115 void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, | 187 void DataPipe::Serialize() { |
| 116 ScopedPlatformHandle channel_handle, | 188 // We need to stop watching handle immediately, even though not on IO thread, |
| 117 ScopedPlatformHandle shared_memory_handle, | 189 // so that other messages aren't read after this. |
| 118 size_t shared_memory_size, | 190 if (channel_) { |
| 119 void* destination, | 191 std::vector<int> fds; |
| 192 bool write_error = false; | |
| 193 serialized_channel_handle_ = channel_->ReleaseHandle( | |
|
Anand Mistry (off Chromium)
2016/01/11 06:19:34
From what I can tell, this is racy. Basically, thi
Eliot Courtney
2016/01/14 02:17:57
Discussed offline -- this is fairly impossible to
| |
| 194 &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds, | |
| 195 &write_error); | |
| 196 | |
| 197 CHECK(fds.empty()); | |
| 198 if (write_error) { | |
| 199 serialized_channel_handle_.reset(); | |
| 200 } | |
| 201 channel_ = nullptr; | |
| 202 } | |
| 203 channel_released_ = true; | |
| 204 } | |
| 205 | |
| 206 void DataPipe::EndSerialize(void* destination, | |
| 120 size_t* actual_size, | 207 size_t* actual_size, |
| 121 PlatformHandleVector* platform_handles) { | 208 PlatformHandleVector* platform_handles) { |
| 209 ScopedPlatformHandle channel_shared_handle; | |
| 210 size_t channel_shared_size = | |
| 211 serialized_read_buffer_.size() + serialized_write_buffer_.size(); | |
| 212 if (channel_shared_size) { | |
| 213 #if defined(OS_WIN) | |
| 214 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
| 215 internal::g_platform_support->CreateSharedBuffer(channel_shared_size)); | |
| 216 #else | |
| 217 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
| 218 internal::g_broker->CreateSharedBuffer(channel_shared_size)); | |
| 219 #endif | |
| 220 | |
| 221 scoped_ptr<PlatformSharedBufferMapping> mapping( | |
| 222 shared_buffer->Map(0, channel_shared_size)); | |
| 223 | |
| 224 void* base = mapping->GetBase(); | |
| 225 if (!serialized_read_buffer_.empty()) { | |
| 226 memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size()); | |
| 227 } | |
| 228 if (!serialized_write_buffer_.empty()) { | |
| 229 memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(), | |
| 230 &serialized_write_buffer_[0], serialized_write_buffer_.size()); | |
| 231 } | |
| 232 | |
| 233 channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release()); | |
| 234 } | |
| 235 | |
| 122 SerializedDataPipeHandleDispatcher* serialization = | 236 SerializedDataPipeHandleDispatcher* serialization = |
| 123 static_cast<SerializedDataPipeHandleDispatcher*>(destination); | 237 static_cast<SerializedDataPipeHandleDispatcher*>(destination); |
| 124 if (channel_handle.is_valid()) { | 238 if (serialized_channel_handle_.is_valid()) { |
| 125 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | 239 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
| 126 serialization->platform_handle_index = | 240 serialization->channel_handle_index = |
| 127 static_cast<uint32_t>(platform_handles->size()); | 241 static_cast<uint32_t>(platform_handles->size()); |
| 128 platform_handles->push_back(channel_handle.release()); | 242 platform_handles->push_back(serialized_channel_handle_.release()); |
| 129 } else { | 243 } else { |
| 130 serialization->platform_handle_index = kInvalidDataPipeHandleIndex; | 244 serialization->channel_handle_index = kInvalidDataPipeHandleIndex; |
| 131 } | 245 } |
| 132 | 246 |
| 133 serialization->flags = options.flags; | 247 serialization->flags = options_.flags; |
| 134 serialization->element_num_bytes = options.element_num_bytes; | 248 serialization->element_num_bytes = options_.element_num_bytes; |
| 135 serialization->capacity_num_bytes = options.capacity_num_bytes; | 249 serialization->capacity_num_bytes = options_.capacity_num_bytes; |
| 136 | 250 |
| 137 serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); | 251 serialization->channel_pending_read_size = |
| 138 if (serialization->shared_memory_size) { | 252 static_cast<uint32_t>(serialized_read_buffer_.size()); |
| 253 serialization->channel_pending_write_size = | |
| 254 static_cast<uint32_t>(serialized_write_buffer_.size()); | |
| 255 | |
| 256 if (channel_shared_handle.is_valid()) { | |
| 139 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | 257 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
| 140 serialization->shared_memory_handle_index = | 258 serialization->channel_shared_handle_index = |
| 141 static_cast<uint32_t>(platform_handles->size()); | 259 static_cast<uint32_t>(platform_handles->size()); |
| 142 platform_handles->push_back(shared_memory_handle.release()); | 260 platform_handles->push_back(channel_shared_handle.release()); |
| 143 } else { | 261 } else { |
| 144 serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; | 262 serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex; |
| 145 } | 263 } |
| 146 | 264 |
| 265 ScopedPlatformHandle shared_buffer_handle; | |
| 266 if (shared_buffer_) { | |
| 267 shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release()); | |
| 268 } | |
| 269 | |
| 270 if (shared_buffer_handle.is_valid()) { | |
| 271 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); | |
| 272 serialization->shared_buffer_handle_index = | |
| 273 static_cast<uint32_t>(platform_handles->size()); | |
| 274 platform_handles->push_back(shared_buffer_handle.release()); | |
| 275 } else { | |
| 276 serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex; | |
| 277 } | |
| 278 | |
| 279 serialization->ring_buffer_start = ring_buffer_start_; | |
| 280 serialization->ring_buffer_size = ring_buffer_size_; | |
| 281 | |
| 147 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); | 282 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); |
| 148 } | 283 } |
| 149 | 284 |
| 150 ScopedPlatformHandle DataPipe::Deserialize( | 285 ScopedPlatformHandle DataPipe::Deserialize( |
| 151 const void* source, | 286 const void* source, |
| 152 size_t size, | 287 size_t size, |
| 153 PlatformHandleVector* platform_handles, | 288 PlatformHandleVector* platform_handles, |
| 154 MojoCreateDataPipeOptions* options, | 289 MojoCreateDataPipeOptions* options, |
| 155 ScopedPlatformHandle* shared_memory_handle, | 290 ScopedPlatformHandle* channel_shared_handle, |
| 156 size_t* shared_memory_size) { | 291 uint32_t* serialized_read_buffer_size, |
| 292 uint32_t* serialized_write_buffer_size, | |
| 293 ScopedPlatformHandle* shared_buffer_handle, | |
| 294 uint32_t* ring_buffer_start, | |
| 295 uint32_t* ring_buffer_size) { | |
| 157 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { | 296 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { |
| 158 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; | 297 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; |
| 159 return ScopedPlatformHandle(); | 298 return ScopedPlatformHandle(); |
| 160 } | 299 } |
| 161 | 300 |
| 162 const SerializedDataPipeHandleDispatcher* serialization = | 301 const SerializedDataPipeHandleDispatcher* serialization = |
| 163 static_cast<const SerializedDataPipeHandleDispatcher*>(source); | 302 static_cast<const SerializedDataPipeHandleDispatcher*>(source); |
| 164 size_t platform_handle_index = serialization->platform_handle_index; | |
| 165 | 303 |
| 166 // Starts off invalid, which is what we want. | 304 // Starts off invalid, which is what we want. |
| 167 PlatformHandle platform_handle; | 305 PlatformHandle channel_handle; |
| 168 if (platform_handle_index != kInvalidDataPipeHandleIndex) { | 306 uint32_t channel_handle_index = serialization->channel_handle_index; |
| 169 if (!platform_handles || | 307 if (channel_handle_index != kInvalidDataPipeHandleIndex) { |
| 170 platform_handle_index >= platform_handles->size()) { | 308 if (!platform_handles || channel_handle_index >= platform_handles->size()) { |
| 171 LOG(ERROR) | 309 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
| 172 << "Invalid serialized data pipe dispatcher (missing handles)"; | |
| 173 return ScopedPlatformHandle(); | 310 return ScopedPlatformHandle(); |
| 174 } | 311 } |
| 175 | 312 |
| 176 // We take ownership of the handle, so we have to invalidate the one in | 313 // We take ownership of the handle, so we have to invalidate the one in |
| 177 // |platform_handles|. | 314 // |platform_handles|. |
| 178 std::swap(platform_handle, (*platform_handles)[platform_handle_index]); | 315 std::swap(channel_handle, (*platform_handles)[channel_handle_index]); |
| 179 } | 316 } |
| 180 | 317 |
| 181 options->struct_size = sizeof(MojoCreateDataPipeOptions); | 318 options->struct_size = sizeof(MojoCreateDataPipeOptions); |
| 182 options->flags = serialization->flags; | 319 options->flags = serialization->flags; |
| 183 options->element_num_bytes = serialization->element_num_bytes; | 320 options->element_num_bytes = serialization->element_num_bytes; |
| 184 options->capacity_num_bytes = serialization->capacity_num_bytes; | 321 options->capacity_num_bytes = serialization->capacity_num_bytes; |
| 185 | 322 *serialized_read_buffer_size = serialization->channel_pending_read_size; |
| 186 if (shared_memory_size) { | 323 *serialized_write_buffer_size = serialization->channel_pending_write_size; |
| 187 *shared_memory_size = serialization->shared_memory_size; | 324 *ring_buffer_start = serialization->ring_buffer_start; |
| 188 if (*shared_memory_size) { | 325 *ring_buffer_size = serialization->ring_buffer_size; |
| 189 DCHECK(serialization->shared_memory_handle_index != | 326 |
| 190 kInvalidDataPipeHandleIndex); | 327 uint32_t channel_shared_handle_index = |
| 191 if (!platform_handles || | 328 serialization->channel_shared_handle_index; |
| 192 serialization->shared_memory_handle_index >= | 329 if (*serialized_read_buffer_size || *serialized_write_buffer_size) { |
| 193 platform_handles->size()) { | 330 DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex); |
| 194 LOG(ERROR) << "Invalid serialized data pipe dispatcher " | 331 if (!platform_handles || |
| 195 << "(missing handles)"; | 332 channel_shared_handle_index >= platform_handles->size()) { |
| 196 return ScopedPlatformHandle(); | 333 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
| 334 return ScopedPlatformHandle(); | |
| 335 } | |
| 336 | |
| 337 // Take ownership. | |
| 338 PlatformHandle temp_channel_shared_handle; | |
| 339 std::swap(temp_channel_shared_handle, | |
| 340 (*platform_handles)[channel_shared_handle_index]); | |
| 341 *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle); | |
| 342 } | |
| 343 | |
| 344 uint32_t shared_buffer_handle_index = | |
| 345 serialization->shared_buffer_handle_index; | |
| 346 if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) { | |
| 347 if (!platform_handles || | |
| 348 shared_buffer_handle_index >= platform_handles->size()) { | |
| 349 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; | |
| 350 return ScopedPlatformHandle(); | |
| 351 } | |
| 352 | |
| 353 // Take ownership. | |
| 354 PlatformHandle temp_shared_buffer_handle; | |
| 355 std::swap(temp_shared_buffer_handle, | |
| 356 (*platform_handles)[shared_buffer_handle_index]); | |
| 357 *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle); | |
| 358 } | |
| 359 | |
| 360 return ScopedPlatformHandle(channel_handle); | |
| 361 } | |
| 362 | |
| 363 uint8_t* DataPipe::GetSharedBufferBase() { | |
| 364 if (!mapping_) { | |
| 365 DCHECK(shared_buffer_); | |
| 366 mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes); | |
| 367 } | |
| 368 if (!mapping_) | |
| 369 return nullptr; | |
| 370 return static_cast<uint8_t*>(mapping_->GetBase()); | |
| 371 } | |
| 372 | |
| 373 bool DataPipe::WriteDataIntoSharedBuffer(const void* elements, | |
| 374 uint32_t num_bytes) { | |
| 375 uint32_t buffer_size = options_.capacity_num_bytes; | |
| 376 uint8_t* base = GetSharedBufferBase(); | |
| 377 if (!base) | |
| 378 return false; | |
| 379 | |
| 380 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); | |
| 381 DCHECK_LE(num_bytes, buffer_size - ring_buffer_size_); | |
| 382 | |
| 383 uint32_t ring_buffer_end = | |
| 384 (ring_buffer_start_ + ring_buffer_size_) % buffer_size; | |
| 385 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
| 386 uint32_t bytes_until_end = buffer_size - ring_buffer_end; | |
| 387 memcpy(base + ring_buffer_end, elements, | |
| 388 std::min(num_bytes, bytes_until_end)); | |
| 389 | |
| 390 if (bytes_until_end < num_bytes) { | |
| 391 memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end, | |
| 392 num_bytes - bytes_until_end); | |
| 393 } | |
| 394 } else { | |
| 395 memcpy(base + ring_buffer_end, elements, num_bytes); | |
| 396 } | |
| 397 | |
| 398 return true; | |
| 399 } | |
| 400 | |
| 401 bool DataPipe::ReadDataFromSharedBuffer(void* data, uint32_t num_bytes) { | |
| 402 DCHECK_LE(num_bytes, ring_buffer_size_); | |
| 403 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); | |
| 404 | |
| 405 uint32_t buffer_size = options_.capacity_num_bytes; | |
| 406 uint8_t* base = GetSharedBufferBase(); | |
| 407 if (!base) | |
| 408 return false; | |
| 409 | |
| 410 uint32_t ring_buffer_end = | |
| 411 (ring_buffer_start_ + ring_buffer_size_) % buffer_size; | |
| 412 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
| 413 memcpy(data, base + ring_buffer_start_, num_bytes); | |
| 414 } else { | |
| 415 uint32_t bytes_until_end = buffer_size - ring_buffer_start_; | |
| 416 memcpy(data, base + ring_buffer_start_, | |
| 417 std::min(num_bytes, bytes_until_end)); | |
| 418 | |
| 419 if (bytes_until_end < num_bytes) { | |
| 420 memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base, | |
| 421 num_bytes - bytes_until_end); | |
| 422 } | |
| 423 } | |
| 424 | |
| 425 return true; | |
| 426 } | |
| 427 | |
| 428 uint32_t DataPipe::GetReadableBytes() const { | |
| 429 return shared_buffer_ ? ring_buffer_size_ : 0; | |
| 430 } | |
| 431 | |
| 432 uint32_t DataPipe::GetWritableBytes() const { | |
| 433 return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0; | |
| 434 } | |
| 435 | |
| 436 void DataPipe::UpdateFromRead(uint32_t num_bytes) { | |
| 437 ring_buffer_start_ = | |
| 438 (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes; | |
| 439 ring_buffer_size_ -= num_bytes; | |
| 440 DCHECK_GE(ring_buffer_size_, 0u); | |
| 441 } | |
| 442 | |
| 443 void DataPipe::UpdateFromWrite(uint32_t num_bytes) { | |
| 444 ring_buffer_size_ += num_bytes; | |
| 445 DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes); | |
| 446 } | |
| 447 | |
| 448 bool DataPipe::NotifyWrite(uint32_t num_bytes) { | |
| 449 DCHECK(is_producer_); | |
| 450 | |
| 451 if (!channel_) | |
| 452 return false; | |
| 453 | |
| 454 DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes}; | |
| 455 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 456 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
| 457 if (!channel_->WriteMessage(std::move(message))) { | |
| 458 return false; | |
| 459 } | |
| 460 return true; | |
| 461 } | |
| 462 | |
| 463 bool DataPipe::NotifyRead(uint32_t num_bytes) { | |
| 464 DCHECK(!is_producer_); | |
| 465 | |
| 466 if (!channel_) | |
| 467 return false; | |
| 468 DataPipeCommandHeader command = {DATA_READ, num_bytes}; | |
| 469 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 470 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
| 471 if (!channel_->WriteMessage(std::move(message))) { | |
| 472 return false; | |
| 473 } | |
| 474 return true; | |
| 475 } | |
| 476 | |
| 477 void DataPipe::NotifySharedBuffer() { | |
| 478 if (!channel_) { | |
| 479 DLOG(ERROR) << "NotifySharedBuffer: No channel"; | |
| 480 return; | |
| 481 } | |
| 482 DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER, | |
| 483 options_.capacity_num_bytes}; | |
| 484 ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle(); | |
| 485 DCHECK(handle.is_valid()); | |
| 486 ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector()); | |
| 487 fds->push_back(handle.release()); | |
| 488 | |
| 489 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 490 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
| 491 message->SetTransportData(make_scoped_ptr(new TransportData( | |
| 492 std::move(fds), channel_->GetSerializedPlatformHandleSize()))); | |
| 493 if (!channel_->WriteMessage(std::move(message))) { | |
| 494 DLOG(ERROR) << "NotifySharedBuffer: Can't write"; | |
| 495 } | |
| 496 } | |
| 497 | |
| 498 void DataPipe::UpdateSharedBuffer( | |
| 499 scoped_refptr<PlatformSharedBuffer> shared_buffer) { | |
| 500 DCHECK(shared_buffer); | |
| 501 shared_buffer_ = shared_buffer; | |
| 502 mapping_ = nullptr; | |
| 503 } | |
| 504 | |
| 505 void DataPipe::Shutdown() { | |
| 506 if (channel_) { | |
| 507 channel_->Shutdown(); | |
| 508 channel_ = nullptr; | |
| 509 } | |
| 510 } | |
| 511 | |
| 512 void DataPipe::CreateEquivalentAndClose(DataPipe* out) { | |
| 513 // Serialize, releasing the handle, otherwise we will get callbacks to the | |
| 514 // wrong delegate for RawChannel. | |
| 515 Serialize(); | |
| 516 | |
| 517 std::swap(options_, out->options_); | |
| 518 std::swap(channel_released_, out->channel_released_); | |
| 519 serialized_read_buffer_.swap(out->serialized_read_buffer_); | |
| 520 serialized_write_buffer_.swap(out->serialized_write_buffer_); | |
| 521 serialized_channel_handle_.swap(out->serialized_channel_handle_); | |
| 522 shared_buffer_.swap(out->shared_buffer_); | |
| 523 mapping_.swap(out->mapping_); | |
| 524 std::swap(ring_buffer_start_, out->ring_buffer_start_); | |
| 525 std::swap(ring_buffer_size_, out->ring_buffer_size_); | |
| 526 std::swap(is_producer_, out->is_producer_); | |
| 527 } | |
| 528 | |
| 529 void* DataPipe::GetWriteBuffer(uint32_t* num_bytes) { | |
| 530 if (!shared_buffer_) { | |
| 531 *num_bytes = 0; | |
| 532 return nullptr; | |
| 533 } | |
| 534 // Must provide sequential memory. | |
| 535 uint32_t buffer_size = options_.capacity_num_bytes; | |
| 536 uint32_t ring_buffer_end = | |
| 537 (ring_buffer_start_ + ring_buffer_size_) % buffer_size; | |
| 538 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
| 539 *num_bytes = buffer_size - ring_buffer_end; | |
| 540 } else { | |
| 541 *num_bytes = ring_buffer_start_ - ring_buffer_end; | |
| 542 } | |
| 543 | |
| 544 return GetSharedBufferBase() + ring_buffer_end; | |
| 545 } | |
| 546 | |
| 547 const void* DataPipe::GetReadBuffer(uint32_t* num_bytes) { | |
| 548 if (!shared_buffer_) { | |
| 549 *num_bytes = 0; | |
| 550 return nullptr; | |
| 551 } | |
| 552 // Must provide sequential memory. | |
| 553 uint32_t buffer_size = options_.capacity_num_bytes; | |
| 554 uint32_t ring_buffer_end = | |
| 555 (ring_buffer_start_ + ring_buffer_size_) % buffer_size; | |
| 556 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
| 557 *num_bytes = ring_buffer_size_; | |
| 558 } else { | |
| 559 *num_bytes = buffer_size - ring_buffer_start_; | |
| 560 } | |
| 561 | |
| 562 return GetSharedBufferBase() + ring_buffer_start_; | |
| 563 } | |
| 564 | |
| 565 bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command, | |
| 566 ScopedPlatformHandleVectorPtr platform_handles) { | |
| 567 PlatformHandle handle; | |
| 568 ScopedPlatformHandle scoped_handle; | |
| 569 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
| 570 bool was_unreadable = GetReadableBytes() == 0; | |
| 571 bool was_unwritable = GetWritableBytes() == 0; | |
| 572 switch (command.command) { | |
| 573 case NOTIFY_SHARED_BUFFER: | |
| 574 CHECK_EQ(platform_handles->size(), 1u); | |
| 575 std::swap(handle, (*platform_handles.get())[0]); | |
| 576 scoped_handle.reset(handle); | |
| 577 UpdateSharedBuffer( | |
| 578 internal::g_platform_support->CreateSharedBufferFromHandle( | |
| 579 command.num_bytes, std::move(scoped_handle))); | |
| 580 break; | |
| 581 case DATA_WRITTEN: | |
| 582 if (!is_producer_) { | |
| 583 UpdateFromWrite(command.num_bytes); | |
| 584 } else { | |
| 585 LOG(ERROR) << "Producer was told of a write, which shouldn't happen."; | |
| 586 DCHECK(0); | |
|
Anand Mistry (off Chromium)
2016/01/11 06:19:34
NOTREACHED();
Eliot Courtney
2016/01/13 00:00:09
Done.
| |
| 197 } | 587 } |
| 198 | 588 break; |
| 199 PlatformHandle temp_shared_memory_handle; | 589 case DATA_READ: |
| 200 std::swap(temp_shared_memory_handle, | 590 if (is_producer_) { |
| 201 (*platform_handles)[serialization->shared_memory_handle_index]); | 591 UpdateFromRead(command.num_bytes); |
| 202 *shared_memory_handle = | 592 } else { |
| 203 ScopedPlatformHandle(temp_shared_memory_handle); | 593 LOG(ERROR) << "Consumer was told of a read, which shouldn't happen."; |
| 204 } | 594 DCHECK(0); |
| 205 } | 595 } |
| 206 | 596 break; |
| 207 size -= sizeof(SerializedDataPipeHandleDispatcher); | 597 default: |
| 208 | 598 LOG(ERROR) << "Shouldn't happen"; |
| 209 return ScopedPlatformHandle(platform_handle); | 599 DCHECK(0); |
| 600 } | |
| 601 | |
| 602 // Handles write/read case and shared buffer becoming available case. | |
| 603 return (was_unreadable && GetReadableBytes()) || | |
| 604 (was_unwritable && GetWritableBytes()); | |
| 210 } | 605 } |
| 211 | 606 |
| 212 } // namespace edk | 607 } // namespace edk |
| 213 } // namespace mojo | 608 } // namespace mojo |
| OLD | NEW |