Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe.h" | 5 #include "mojo/edk/system/data_pipe.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 #include <string.h> | 9 #include <string.h> |
| 10 | 10 |
| 11 #include <algorithm> | |
| 12 | |
| 13 #include "base/bind.h" | |
| 14 #include "mojo/edk/embedder/embedder_internal.h" | |
| 15 #include "mojo/edk/embedder/platform_support.h" | |
| 11 #include "mojo/edk/system/configuration.h" | 16 #include "mojo/edk/system/configuration.h" |
| 12 #include "mojo/edk/system/options_validation.h" | 17 #include "mojo/edk/system/options_validation.h" |
| 13 #include "mojo/edk/system/raw_channel.h" | 18 #include "mojo/edk/system/raw_channel.h" |
| 14 #include "mojo/edk/system/transport_data.h" | 19 #include "mojo/edk/system/transport_data.h" |
| 15 | 20 |
| 16 namespace mojo { | 21 namespace mojo { |
| 17 namespace edk { | 22 namespace edk { |
| 18 | 23 |
| 19 namespace { | 24 namespace { |
| 20 | 25 |
| 21 const size_t kInvalidDataPipeHandleIndex = static_cast<size_t>(-1); | 26 const size_t kInvalidDataPipeHandleIndex = static_cast<size_t>(-1); |
| 22 | 27 |
| 23 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { | 28 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { |
| 24 size_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) | 29 size_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
| 25 | 30 |
| 26 // These are from MojoCreateDataPipeOptions | 31 // These are from MojoCreateDataPipeOptions |
| 27 MojoCreateDataPipeOptionsFlags flags; | 32 MojoCreateDataPipeOptionsFlags flags; |
| 28 uint32_t element_num_bytes; | 33 uint32_t element_num_bytes; |
| 29 uint32_t capacity_num_bytes; | 34 uint32_t capacity_num_bytes; |
| 30 | 35 |
| 31 size_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) | 36 size_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
| 32 uint32_t shared_memory_size; | 37 uint32_t channel_pending_read_size; |
| 38 uint32_t channel_pending_write_size; | |
| 39 | |
| 40 size_t shared_buffer_handle_index; | |
| 41 size_t ring_buffer_start; | |
| 42 size_t ring_buffer_size; | |
| 33 }; | 43 }; |
| 34 | 44 |
| 35 } // namespace | 45 } // namespace |
| 36 | 46 |
| 37 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { | 47 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
| 38 MojoCreateDataPipeOptions result = { | 48 MojoCreateDataPipeOptions result = { |
| 39 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), | 49 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
| 40 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, | 50 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
| 41 1u, | 51 1u, |
| 42 static_cast<uint32_t>( | 52 static_cast<uint32_t>( |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 90 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) | 100 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) |
| 91 return MOJO_RESULT_INVALID_ARGUMENT; | 101 return MOJO_RESULT_INVALID_ARGUMENT; |
| 92 if (reader.options().capacity_num_bytes > | 102 if (reader.options().capacity_num_bytes > |
| 93 GetConfiguration().max_data_pipe_capacity_bytes) | 103 GetConfiguration().max_data_pipe_capacity_bytes) |
| 94 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 104 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| 95 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; | 105 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
| 96 | 106 |
| 97 return MOJO_RESULT_OK; | 107 return MOJO_RESULT_OK; |
| 98 } | 108 } |
| 99 | 109 |
| 100 void DataPipe::StartSerialize(bool have_channel_handle, | 110 DataPipe::DataPipe(const MojoCreateDataPipeOptions& options) |
| 101 bool have_shared_memory, | 111 : channel_(nullptr), |
| 102 size_t* max_size, | 112 options_(options), |
| 103 size_t* max_platform_handles) { | 113 channel_released_(false), |
| 114 shared_buffer_(nullptr), | |
| 115 mapping_(nullptr), | |
| 116 ring_buffer_start_(0), | |
| 117 ring_buffer_size_(0) {} | |
| 118 | |
| 119 DataPipe::~DataPipe() {} | |
| 120 | |
| 121 void DataPipe::Init(ScopedPlatformHandle message_pipe, | |
| 122 char* serialized_write_buffer, | |
| 123 size_t serialized_write_buffer_size, | |
| 124 char* serialized_read_buffer, | |
| 125 size_t serialized_read_buffer_size, | |
| 126 ScopedPlatformHandle shared_buffer_handle, | |
| 127 size_t ring_buffer_start, | |
| 128 size_t ring_buffer_size, | |
| 129 bool is_producer, | |
| 130 const base::Closure& init_callback) { | |
| 131 is_producer_ = is_producer; | |
| 132 | |
| 133 ring_buffer_start_ = ring_buffer_start; | |
| 134 ring_buffer_size_ = ring_buffer_size; | |
| 135 | |
| 136 if (shared_buffer_handle.is_valid()) { | |
| 137 shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle( | |
| 138 options_.capacity_num_bytes, std::move(shared_buffer_handle)); | |
| 139 DCHECK(shared_buffer_); | |
| 140 } else if (is_producer_) { | |
| 141 shared_buffer_ = internal::g_platform_support->CreateSharedBuffer( | |
| 142 options_.capacity_num_bytes); | |
| 143 } | |
| 144 | |
| 145 if (message_pipe.is_valid()) { | |
| 146 channel_ = RawChannel::Create(std::move(message_pipe)); | |
| 147 channel_->SetSerializedData(serialized_read_buffer, | |
| 148 serialized_read_buffer_size, | |
| 149 serialized_write_buffer, | |
| 150 serialized_write_buffer_size, nullptr, nullptr); | |
| 151 internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback); | |
| 152 channel_->EnsureLazyInitialized(); | |
| 153 | |
| 154 if (is_producer_) { | |
| 155 if (shared_buffer_) { | |
| 156 internal::g_io_thread_task_runner->PostTask( | |
| 157 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); | |
| 158 } else { | |
| 159 internal::g_io_thread_task_runner->PostTask( | |
| 160 FROM_HERE, base::Bind(&DataPipe::RequestSharedBuffer, this)); | |
| 161 } | |
| 162 } | |
| 163 } | |
| 164 } | |
| 165 | |
| 166 void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) { | |
| 167 // We need to release the channel and get its pending reads/writes. | |
| 168 if (!channel_released_) { | |
| 169 Serialize(); | |
| 170 } | |
| 171 | |
| 104 *max_size = sizeof(SerializedDataPipeHandleDispatcher); | 172 *max_size = sizeof(SerializedDataPipeHandleDispatcher); |
| 105 *max_platform_handles = 0; | 173 *max_platform_handles = 0; |
| 106 if (have_channel_handle) | 174 if (serialized_channel_handle_.is_valid()) |
| 107 (*max_platform_handles)++; | 175 (*max_platform_handles)++; |
| 108 if (have_shared_memory) | 176 // For shared buffer to transfer pending reads / writes. |
| 177 if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty()) | |
| 178 (*max_platform_handles)++; | |
| 179 if (shared_buffer_) | |
| 109 (*max_platform_handles)++; | 180 (*max_platform_handles)++; |
| 110 } | 181 } |
| 111 | 182 |
| 112 void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, | 183 void DataPipe::Serialize() { |
| 113 ScopedPlatformHandle channel_handle, | 184 // We need to stop watching handle immediately, even though not on IO thread, |
| 114 ScopedPlatformHandle shared_memory_handle, | 185 // so that other messages aren't read after this. |
| 115 size_t shared_memory_size, | 186 if (channel_) { |
| 116 void* destination, | 187 std::vector<int> fds; |
| 188 bool write_error = false; | |
| 189 serialized_channel_handle_ = channel_->ReleaseHandle( | |
| 190 &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds, | |
| 191 &write_error); | |
| 192 | |
| 193 CHECK(fds.empty()); | |
| 194 if (write_error) { | |
| 195 serialized_channel_handle_.reset(); | |
| 196 } | |
| 197 channel_ = nullptr; | |
| 198 } | |
| 199 channel_released_ = true; | |
| 200 } | |
| 201 | |
| 202 void DataPipe::EndSerialize(void* destination, | |
| 117 size_t* actual_size, | 203 size_t* actual_size, |
| 118 PlatformHandleVector* platform_handles) { | 204 PlatformHandleVector* platform_handles) { |
| 205 ScopedPlatformHandle channel_shared_handle; | |
| 206 size_t channel_shared_size = | |
| 207 serialized_read_buffer_.size() + serialized_write_buffer_.size(); | |
| 208 if (channel_shared_size) { | |
| 209 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
| 210 internal::g_platform_support->CreateSharedBuffer(channel_shared_size)); | |
| 211 scoped_ptr<PlatformSharedBufferMapping> mapping( | |
| 212 shared_buffer->Map(0, channel_shared_size)); | |
| 213 | |
| 214 void* base = mapping->GetBase(); | |
| 215 if (!serialized_read_buffer_.empty()) { | |
| 216 memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size()); | |
| 217 } | |
| 218 if (!serialized_write_buffer_.empty()) { | |
| 219 memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(), | |
| 220 &serialized_write_buffer_[0], serialized_write_buffer_.size()); | |
| 221 } | |
| 222 | |
| 223 channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release()); | |
| 224 } | |
| 225 | |
| 119 SerializedDataPipeHandleDispatcher* serialization = | 226 SerializedDataPipeHandleDispatcher* serialization = |
| 120 static_cast<SerializedDataPipeHandleDispatcher*>(destination); | 227 static_cast<SerializedDataPipeHandleDispatcher*>(destination); |
| 121 if (channel_handle.is_valid()) { | 228 if (serialized_channel_handle_.is_valid()) { |
| 122 serialization->platform_handle_index = platform_handles->size(); | 229 serialization->channel_handle_index = platform_handles->size(); |
| 123 platform_handles->push_back(channel_handle.release()); | 230 platform_handles->push_back(serialized_channel_handle_.release()); |
| 124 } else { | 231 } else { |
| 125 serialization->platform_handle_index = kInvalidDataPipeHandleIndex; | 232 serialization->channel_handle_index = kInvalidDataPipeHandleIndex; |
| 126 } | 233 } |
| 127 | 234 |
| 128 serialization->flags = options.flags; | 235 serialization->flags = options_.flags; |
| 129 serialization->element_num_bytes = options.element_num_bytes; | 236 serialization->element_num_bytes = options_.element_num_bytes; |
| 130 serialization->capacity_num_bytes = options.capacity_num_bytes; | 237 serialization->capacity_num_bytes = options_.capacity_num_bytes; |
| 131 | 238 |
| 132 serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); | 239 serialization->channel_pending_read_size = |
| 133 if (serialization->shared_memory_size) { | 240 static_cast<uint32_t>(serialized_read_buffer_.size()); |
| 134 serialization->shared_memory_handle_index = platform_handles->size(); | 241 serialization->channel_pending_write_size = |
| 135 platform_handles->push_back(shared_memory_handle.release()); | 242 static_cast<uint32_t>(serialized_write_buffer_.size()); |
| 243 | |
| 244 if (channel_shared_handle.is_valid()) { | |
| 245 serialization->channel_shared_handle_index = platform_handles->size(); | |
| 246 platform_handles->push_back(channel_shared_handle.release()); | |
| 136 } else { | 247 } else { |
| 137 serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; | 248 serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex; |
| 138 } | 249 } |
| 139 | 250 |
| 251 ScopedPlatformHandle shared_buffer_handle; | |
| 252 if (shared_buffer_) { | |
| 253 shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release()); | |
| 254 } | |
| 255 | |
| 256 if (shared_buffer_handle.is_valid()) { | |
| 257 serialization->shared_buffer_handle_index = platform_handles->size(); | |
| 258 platform_handles->push_back(shared_buffer_handle.release()); | |
| 259 } else { | |
| 260 serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex; | |
| 261 } | |
| 262 | |
| 263 serialization->ring_buffer_start = ring_buffer_start_; | |
| 264 serialization->ring_buffer_size = ring_buffer_size_; | |
| 265 | |
| 140 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); | 266 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); |
| 141 } | 267 } |
| 142 | 268 |
| 143 ScopedPlatformHandle DataPipe::Deserialize( | 269 ScopedPlatformHandle DataPipe::Deserialize( |
| 144 const void* source, | 270 const void* source, |
| 145 size_t size, | 271 size_t size, |
| 146 PlatformHandleVector* platform_handles, | 272 PlatformHandleVector* platform_handles, |
| 147 MojoCreateDataPipeOptions* options, | 273 MojoCreateDataPipeOptions* options, |
| 148 ScopedPlatformHandle* shared_memory_handle, | 274 ScopedPlatformHandle* channel_shared_handle, |
| 149 size_t* shared_memory_size) { | 275 size_t* serialized_read_buffer_size, |
| 276 size_t* serialized_write_buffer_size, | |
| 277 ScopedPlatformHandle* shared_buffer_handle, | |
| 278 size_t* ring_buffer_start, | |
| 279 size_t* ring_buffer_size) { | |
| 150 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { | 280 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { |
| 151 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; | 281 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; |
| 152 return ScopedPlatformHandle(); | 282 return ScopedPlatformHandle(); |
| 153 } | 283 } |
| 154 | 284 |
| 155 const SerializedDataPipeHandleDispatcher* serialization = | 285 const SerializedDataPipeHandleDispatcher* serialization = |
| 156 static_cast<const SerializedDataPipeHandleDispatcher*>(source); | 286 static_cast<const SerializedDataPipeHandleDispatcher*>(source); |
| 157 size_t platform_handle_index = serialization->platform_handle_index; | |
| 158 | 287 |
| 159 // Starts off invalid, which is what we want. | 288 // Starts off invalid, which is what we want. |
| 160 PlatformHandle platform_handle; | 289 PlatformHandle channel_handle; |
| 161 if (platform_handle_index != kInvalidDataPipeHandleIndex) { | 290 size_t channel_handle_index = serialization->channel_handle_index; |
| 162 if (!platform_handles || | 291 if (channel_handle_index != kInvalidDataPipeHandleIndex) { |
| 163 platform_handle_index >= platform_handles->size()) { | 292 if (!platform_handles || channel_handle_index >= platform_handles->size()) { |
| 164 LOG(ERROR) | 293 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
| 165 << "Invalid serialized data pipe dispatcher (missing handles)"; | |
| 166 return ScopedPlatformHandle(); | 294 return ScopedPlatformHandle(); |
| 167 } | 295 } |
| 168 | 296 |
| 169 // We take ownership of the handle, so we have to invalidate the one in | 297 // We take ownership of the handle, so we have to invalidate the one in |
| 170 // |platform_handles|. | 298 // |platform_handles|. |
| 171 std::swap(platform_handle, (*platform_handles)[platform_handle_index]); | 299 std::swap(channel_handle, (*platform_handles)[channel_handle_index]); |
| 172 } | 300 } |
| 173 | 301 |
| 174 options->struct_size = sizeof(MojoCreateDataPipeOptions); | 302 options->struct_size = sizeof(MojoCreateDataPipeOptions); |
| 175 options->flags = serialization->flags; | 303 options->flags = serialization->flags; |
| 176 options->element_num_bytes = serialization->element_num_bytes; | 304 options->element_num_bytes = serialization->element_num_bytes; |
| 177 options->capacity_num_bytes = serialization->capacity_num_bytes; | 305 options->capacity_num_bytes = serialization->capacity_num_bytes; |
| 178 | 306 *serialized_read_buffer_size = serialization->channel_pending_read_size; |
| 179 if (shared_memory_size) { | 307 *serialized_write_buffer_size = serialization->channel_pending_write_size; |
| 180 *shared_memory_size = serialization->shared_memory_size; | 308 *ring_buffer_start = serialization->ring_buffer_start; |
| 181 if (*shared_memory_size) { | 309 *ring_buffer_size = serialization->ring_buffer_size; |
| 182 DCHECK(serialization->shared_memory_handle_index != | 310 |
| 183 kInvalidDataPipeHandleIndex); | 311 size_t channel_shared_handle_index = |
| 184 if (!platform_handles || | 312 serialization->channel_shared_handle_index; |
| 185 serialization->shared_memory_handle_index >= | 313 if (*serialized_read_buffer_size || *serialized_write_buffer_size) { |
| 186 platform_handles->size()) { | 314 DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex); |
| 187 LOG(ERROR) << "Invalid serialized data pipe dispatcher " | 315 if (!platform_handles || |
| 188 << "(missing handles)"; | 316 channel_shared_handle_index >= platform_handles->size()) { |
| 189 return ScopedPlatformHandle(); | 317 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
| 318 return ScopedPlatformHandle(); | |
| 319 } | |
| 320 | |
| 321 // Take ownership. | |
| 322 PlatformHandle temp_channel_shared_handle; | |
| 323 std::swap(temp_channel_shared_handle, | |
| 324 (*platform_handles)[channel_shared_handle_index]); | |
| 325 *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle); | |
| 326 } | |
| 327 | |
| 328 size_t shared_buffer_handle_index = serialization->shared_buffer_handle_index; | |
| 329 if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) { | |
| 330 if (!platform_handles || | |
| 331 shared_buffer_handle_index >= platform_handles->size()) { | |
| 332 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; | |
| 333 return ScopedPlatformHandle(); | |
| 334 } | |
| 335 | |
| 336 // Take ownership. | |
| 337 PlatformHandle temp_shared_buffer_handle; | |
| 338 std::swap(temp_shared_buffer_handle, | |
| 339 (*platform_handles)[shared_buffer_handle_index]); | |
| 340 *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle); | |
| 341 } | |
| 342 | |
| 343 return ScopedPlatformHandle(channel_handle); | |
| 344 } | |
| 345 | |
| 346 void* DataPipe::GetSharedBufferBase() { | |
| 347 if (!mapping_) { | |
| 348 DCHECK(shared_buffer_); | |
| 349 mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes); | |
| 350 } | |
| 351 if (!mapping_) { | |
| 352 return nullptr; | |
| 353 } | |
| 354 return mapping_->GetBase(); | |
| 355 } | |
| 356 | |
| 357 bool DataPipe::WriteDataIntoSharedBuffer(const void* elements, | |
| 358 size_t num_bytes) { | |
| 359 size_t bufsz = options_.capacity_num_bytes; | |
|
Anand Mistry (off Chromium)
2016/01/08 03:28:02
This isn't Go. Don't abbreviate unnecessarily.
Eliot Courtney
2016/01/08 04:44:39
Done.
| |
| 360 uint8_t* base = static_cast<uint8_t*>(GetSharedBufferBase()); | |
| 361 if (base == nullptr) { | |
|
Anand Mistry (off Chromium)
2016/01/08 03:28:02
generally prefer:
if (!base)
Eliot Courtney
2016/01/08 04:44:39
Done.
| |
| 362 return false; | |
| 363 } | |
| 364 | |
| 365 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); | |
| 366 DCHECK_LE(num_bytes, bufsz - ring_buffer_size_); | |
| 367 | |
| 368 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; | |
| 369 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
| 370 size_t bytes_until_end = bufsz - ring_buffer_end; | |
| 371 memcpy(base + ring_buffer_end, elements, | |
| 372 std::min(num_bytes, bytes_until_end)); | |
| 373 | |
| 374 if (bytes_until_end < num_bytes) { | |
| 375 memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end, | |
| 376 num_bytes - bytes_until_end); | |
| 377 } | |
| 378 } else { | |
| 379 memcpy(base + ring_buffer_end, elements, num_bytes); | |
| 380 } | |
| 381 | |
| 382 return true; | |
| 383 } | |
| 384 | |
| 385 bool DataPipe::ReadDataFromSharedBuffer(void* data, size_t num_bytes) { | |
| 386 DCHECK_LE(num_bytes, ring_buffer_size_); | |
| 387 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); | |
| 388 | |
| 389 size_t bufsz = options_.capacity_num_bytes; | |
| 390 uint8_t* base = static_cast<uint8_t*>(GetSharedBufferBase()); | |
| 391 if (base == nullptr) { | |
| 392 return false; | |
| 393 } | |
| 394 | |
| 395 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; | |
| 396 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
| 397 memcpy(data, base + ring_buffer_start_, num_bytes); | |
| 398 } else { | |
| 399 size_t bytes_until_end = bufsz - ring_buffer_start_; | |
| 400 memcpy(data, base + ring_buffer_start_, | |
| 401 std::min(num_bytes, bytes_until_end)); | |
| 402 | |
| 403 if (bytes_until_end < num_bytes) { | |
| 404 memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base, | |
| 405 num_bytes - bytes_until_end); | |
| 406 } | |
| 407 } | |
| 408 | |
| 409 return true; | |
| 410 } | |
| 411 | |
| 412 size_t DataPipe::GetReadableBytes() const { | |
| 413 return shared_buffer_ ? ring_buffer_size_ : 0; | |
| 414 } | |
| 415 | |
| 416 size_t DataPipe::GetWritableBytes() const { | |
| 417 return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0; | |
| 418 } | |
| 419 | |
| 420 void DataPipe::UpdateFromRead(size_t num_bytes) { | |
| 421 ring_buffer_start_ = | |
| 422 (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes; | |
| 423 ring_buffer_size_ -= num_bytes; | |
| 424 DCHECK_GE(ring_buffer_size_, 0u); | |
| 425 } | |
| 426 | |
| 427 void DataPipe::UpdateFromWrite(size_t num_bytes) { | |
| 428 ring_buffer_size_ += num_bytes; | |
| 429 DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes); | |
| 430 } | |
| 431 | |
| 432 bool DataPipe::NotifyWrite(size_t num_bytes) { | |
| 433 DCHECK(is_producer_); | |
| 434 | |
| 435 if (!channel_) | |
| 436 return false; | |
| 437 | |
| 438 DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes}; | |
| 439 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 440 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
| 441 if (!channel_->WriteMessage(std::move(message))) { | |
| 442 return false; | |
| 443 } | |
| 444 return true; | |
| 445 } | |
| 446 | |
| 447 bool DataPipe::NotifyRead(size_t num_bytes) { | |
| 448 DCHECK(!is_producer_); | |
| 449 | |
| 450 if (!channel_) | |
| 451 return false; | |
| 452 DataPipeCommandHeader command = {DATA_READ, num_bytes}; | |
| 453 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 454 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
| 455 if (!channel_->WriteMessage(std::move(message))) { | |
| 456 return false; | |
| 457 } | |
| 458 return true; | |
| 459 } | |
| 460 | |
| 461 void DataPipe::NotifySharedBuffer() { | |
| 462 if (!channel_) { | |
| 463 DLOG(ERROR) << "NotifySharedBuffer: No channel"; | |
| 464 return; | |
| 465 } | |
| 466 DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER, | |
| 467 options_.capacity_num_bytes}; | |
| 468 ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle(); | |
| 469 DCHECK(handle.is_valid()); | |
| 470 ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector()); | |
| 471 fds->push_back(handle.release()); | |
| 472 | |
| 473 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 474 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
| 475 message->SetTransportData(make_scoped_ptr(new TransportData( | |
| 476 std::move(fds), channel_->GetSerializedPlatformHandleSize()))); | |
| 477 if (!channel_->WriteMessage(std::move(message))) { | |
| 478 DLOG(ERROR) << "NotifySharedBuffer: Can't write"; | |
| 479 } | |
| 480 } | |
| 481 | |
| 482 void DataPipe::RequestSharedBuffer() { | |
| 483 if (!channel_) { | |
| 484 DLOG(ERROR) << "NotifySharedBuffer: No channel"; | |
| 485 return; | |
| 486 } | |
| 487 | |
| 488 DataPipeCommandHeader command = {PLEASE_CREATE_SHARED_BUFFER, | |
| 489 options_.capacity_num_bytes}; | |
| 490 | |
| 491 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 492 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
| 493 if (!channel_->WriteMessage(std::move(message))) { | |
| 494 DLOG(ERROR) << "RequestSharedBuffer: Can't write"; | |
| 495 } | |
| 496 } | |
| 497 | |
| 498 void DataPipe::UpdateSharedBuffer( | |
| 499 scoped_refptr<PlatformSharedBuffer> shared_buffer) { | |
| 500 shared_buffer_ = shared_buffer; | |
| 501 mapping_ = nullptr; | |
| 502 } | |
| 503 | |
| 504 void DataPipe::Shutdown() { | |
| 505 if (channel_) { | |
| 506 channel_->Shutdown(); | |
| 507 channel_ = nullptr; | |
| 508 } | |
| 509 } | |
| 510 | |
| 511 void DataPipe::CreateEquivalentAndClose(DataPipe* out) { | |
| 512 // Serialize, releasing the handle, otherwise we will get callbacks to the | |
| 513 // wrong delegate for RawChannel. | |
| 514 Serialize(); | |
| 515 | |
| 516 std::swap(options_, out->options_); | |
| 517 std::swap(channel_released_, out->channel_released_); | |
| 518 serialized_read_buffer_.swap(out->serialized_read_buffer_); | |
| 519 serialized_write_buffer_.swap(out->serialized_write_buffer_); | |
| 520 serialized_channel_handle_.swap(out->serialized_channel_handle_); | |
| 521 shared_buffer_.swap(out->shared_buffer_); | |
| 522 mapping_.swap(out->mapping_); | |
| 523 std::swap(ring_buffer_start_, out->ring_buffer_start_); | |
| 524 std::swap(ring_buffer_size_, out->ring_buffer_size_); | |
| 525 std::swap(is_producer_, out->is_producer_); | |
| 526 } | |
| 527 | |
| 528 void* DataPipe::GetWriteBuffer(size_t* num_bytes) { | |
| 529 if (!shared_buffer_) { | |
| 530 *num_bytes = 0; | |
| 531 return nullptr; | |
| 532 } | |
| 533 // Must provide sequential memory. | |
| 534 size_t bufsz = options_.capacity_num_bytes; | |
| 535 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; | |
| 536 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
| 537 *num_bytes = bufsz - ring_buffer_end; | |
| 538 } else { | |
| 539 *num_bytes = ring_buffer_start_ - ring_buffer_end; | |
| 540 } | |
| 541 | |
| 542 return static_cast<uint8_t*>(GetSharedBufferBase()) + ring_buffer_end; | |
| 543 } | |
| 544 | |
| 545 const void* DataPipe::GetReadBuffer(size_t* num_bytes) { | |
| 546 if (!shared_buffer_) { | |
| 547 *num_bytes = 0; | |
| 548 return nullptr; | |
| 549 } | |
| 550 // Must provide sequential memory. | |
| 551 size_t bufsz = options_.capacity_num_bytes; | |
| 552 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; | |
| 553 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
| 554 *num_bytes = ring_buffer_size_; | |
| 555 } else { | |
| 556 *num_bytes = bufsz - ring_buffer_start_; | |
| 557 } | |
| 558 | |
| 559 return static_cast<uint8_t*>(GetSharedBufferBase()) + ring_buffer_start_; | |
| 560 } | |
| 561 | |
| 562 bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command, | |
| 563 ScopedPlatformHandleVectorPtr platform_handles) { | |
| 564 PlatformHandle handle; | |
| 565 ScopedPlatformHandle scoped_handle; | |
| 566 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
| 567 bool was_unreadable = GetReadableBytes() == 0; | |
| 568 bool was_unwritable = GetWritableBytes() == 0; | |
| 569 switch (command.command) { | |
| 570 case PLEASE_CREATE_SHARED_BUFFER: | |
| 571 // Other end wasn't able to create the shared buffer. | |
| 572 shared_buffer = | |
| 573 internal::g_platform_support->CreateSharedBuffer(command.num_bytes); | |
| 574 CHECK(shared_buffer); | |
| 575 UpdateSharedBuffer(shared_buffer); | |
| 576 | |
| 577 // Can't call back into RawChannel because of locking, so do this later. | |
| 578 internal::g_io_thread_task_runner->PostTask( | |
| 579 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); | |
| 580 break; | |
| 581 case NOTIFY_SHARED_BUFFER: | |
| 582 CHECK_EQ(platform_handles->size(), 1u); | |
| 583 std::swap(handle, (*platform_handles.get())[0]); | |
| 584 scoped_handle.reset(handle); | |
| 585 UpdateSharedBuffer( | |
| 586 internal::g_platform_support->CreateSharedBufferFromHandle( | |
| 587 command.num_bytes, std::move(scoped_handle))); | |
| 588 break; | |
| 589 case DATA_WRITTEN: | |
| 590 if (!is_producer_) { | |
| 591 UpdateFromWrite(command.num_bytes); | |
| 592 } else { | |
| 593 LOG(ERROR) << "Producer was told of a write, which shouldn't happen."; | |
| 594 DCHECK(0); | |
| 190 } | 595 } |
| 191 | 596 break; |
| 192 PlatformHandle temp_shared_memory_handle; | 597 case DATA_READ: |
| 193 std::swap(temp_shared_memory_handle, | 598 if (is_producer_) { |
| 194 (*platform_handles)[serialization->shared_memory_handle_index]); | 599 UpdateFromRead(command.num_bytes); |
| 195 *shared_memory_handle = | 600 } else { |
| 196 ScopedPlatformHandle(temp_shared_memory_handle); | 601 LOG(ERROR) << "Consumer was told of a read, which shouldn't happen."; |
| 197 } | 602 DCHECK(0); |
| 198 } | 603 } |
| 199 | 604 break; |
| 200 size -= sizeof(SerializedDataPipeHandleDispatcher); | 605 default: |
| 201 | 606 LOG(ERROR) << "Shouldn't happen"; |
| 202 return ScopedPlatformHandle(platform_handle); | 607 DCHECK(0); |
| 608 } | |
| 609 | |
| 610 // Handles write/read case and shared buffer becoming available case. | |
| 611 return (was_unreadable && GetReadableBytes()) || | |
| 612 (was_unwritable && GetWritableBytes()); | |
| 203 } | 613 } |
| 204 | 614 |
| 205 } // namespace edk | 615 } // namespace edk |
| 206 } // namespace mojo | 616 } // namespace mojo |
| OLD | NEW |