Chromium Code Reviews| Index: mojo/edk/system/data_pipe.cc |
| diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc |
| index 3f53798093497e0729dac002a02a1081fd36bc7a..b1afff677d3e9fa4fe94c6954f3de85af68b1fc9 100644 |
| --- a/mojo/edk/system/data_pipe.cc |
| +++ b/mojo/edk/system/data_pipe.cc |
| @@ -8,6 +8,11 @@ |
| #include <stdint.h> |
| #include <string.h> |
| +#include <algorithm> |
| + |
| +#include "base/bind.h" |
| +#include "mojo/edk/embedder/embedder_internal.h" |
| +#include "mojo/edk/embedder/platform_support.h" |
| #include "mojo/edk/system/configuration.h" |
| #include "mojo/edk/system/options_validation.h" |
| #include "mojo/edk/system/raw_channel.h" |
| @@ -21,15 +26,20 @@ namespace { |
| const size_t kInvalidDataPipeHandleIndex = static_cast<size_t>(-1); |
| struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { |
| - size_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
| + size_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
| // These are from MojoCreateDataPipeOptions |
| MojoCreateDataPipeOptionsFlags flags; |
| uint32_t element_num_bytes; |
| uint32_t capacity_num_bytes; |
| - size_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
| - uint32_t shared_memory_size; |
| + size_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
| + uint32_t channel_pending_read_size; |
| + uint32_t channel_pending_write_size; |
| + |
| + size_t shared_buffer_handle_index; |
| + size_t ring_buffer_start; |
| + size_t ring_buffer_size; |
| }; |
| } // namespace |
| @@ -97,46 +107,162 @@ MojoResult DataPipe::ValidateCreateOptions( |
| return MOJO_RESULT_OK; |
| } |
| -void DataPipe::StartSerialize(bool have_channel_handle, |
| - bool have_shared_memory, |
| - size_t* max_size, |
| - size_t* max_platform_handles) { |
| +DataPipe::DataPipe(const MojoCreateDataPipeOptions& options) |
| + : channel_(nullptr), |
| + options_(options), |
| + channel_released_(false), |
| + shared_buffer_(nullptr), |
| + mapping_(nullptr), |
| + ring_buffer_start_(0), |
| + ring_buffer_size_(0) {} |
| + |
| +DataPipe::~DataPipe() {} |
| + |
| +void DataPipe::Init(ScopedPlatformHandle message_pipe, |
| + char* serialized_write_buffer, |
| + size_t serialized_write_buffer_size, |
| + char* serialized_read_buffer, |
| + size_t serialized_read_buffer_size, |
| + ScopedPlatformHandle shared_buffer_handle, |
| + size_t ring_buffer_start, |
| + size_t ring_buffer_size, |
| + bool is_producer, |
| + const base::Closure& init_callback) { |
| + is_producer_ = is_producer; |
| + |
| + ring_buffer_start_ = ring_buffer_start; |
| + ring_buffer_size_ = ring_buffer_size; |
| + |
| + if (shared_buffer_handle.is_valid()) { |
| + shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle( |
| + options_.capacity_num_bytes, std::move(shared_buffer_handle)); |
| + DCHECK(shared_buffer_); |
| + } else if (is_producer_) { |
| + shared_buffer_ = internal::g_platform_support->CreateSharedBuffer( |
| + options_.capacity_num_bytes); |
| + } |
| + |
| + if (message_pipe.is_valid()) { |
| + channel_ = RawChannel::Create(std::move(message_pipe)); |
| + channel_->SetSerializedData(serialized_read_buffer, |
| + serialized_read_buffer_size, |
| + serialized_write_buffer, |
| + serialized_write_buffer_size, nullptr, nullptr); |
| + internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback); |
| + channel_->EnsureLazyInitialized(); |
| + |
| + if (is_producer_) { |
| + if (shared_buffer_) { |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); |
| + } else { |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, base::Bind(&DataPipe::RequestSharedBuffer, this)); |
| + } |
| + } |
| + } |
| +} |
| + |
| +void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) { |
| + // We need to release the channel and get its pending reads/writes. |
| + if (!channel_released_) { |
| + Serialize(); |
| + } |
| + |
| *max_size = sizeof(SerializedDataPipeHandleDispatcher); |
| *max_platform_handles = 0; |
| - if (have_channel_handle) |
| + if (serialized_channel_handle_.is_valid()) |
| (*max_platform_handles)++; |
| - if (have_shared_memory) |
| + // For shared buffer to transfer pending reads / writes. |
| + if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty()) |
| (*max_platform_handles)++; |
| + if (shared_buffer_) |
| + (*max_platform_handles)++; |
| +} |
| + |
| +void DataPipe::Serialize() { |
| + // We need to stop watching handle immediately, even though not on IO thread, |
| + // so that other messages aren't read after this. |
| + if (channel_) { |
| + std::vector<int> fds; |
| + bool write_error = false; |
| + serialized_channel_handle_ = channel_->ReleaseHandle( |
| + &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds, |
| + &write_error); |
| + |
| + CHECK(fds.empty()); |
| + if (write_error) { |
| + serialized_channel_handle_.reset(); |
| + } |
| + channel_ = nullptr; |
| + } |
| + channel_released_ = true; |
| } |
| -void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, |
| - ScopedPlatformHandle channel_handle, |
| - ScopedPlatformHandle shared_memory_handle, |
| - size_t shared_memory_size, |
| - void* destination, |
| +void DataPipe::EndSerialize(void* destination, |
| size_t* actual_size, |
| PlatformHandleVector* platform_handles) { |
| + ScopedPlatformHandle channel_shared_handle; |
| + size_t channel_shared_size = |
| + serialized_read_buffer_.size() + serialized_write_buffer_.size(); |
| + if (channel_shared_size) { |
| + scoped_refptr<PlatformSharedBuffer> shared_buffer( |
| + internal::g_platform_support->CreateSharedBuffer(channel_shared_size)); |
| + scoped_ptr<PlatformSharedBufferMapping> mapping( |
| + shared_buffer->Map(0, channel_shared_size)); |
| + |
| + void* base = mapping->GetBase(); |
| + if (!serialized_read_buffer_.empty()) { |
| + memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size()); |
| + } |
| + if (!serialized_write_buffer_.empty()) { |
| + memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(), |
| + &serialized_write_buffer_[0], serialized_write_buffer_.size()); |
| + } |
| + |
| + channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release()); |
| + } |
| + |
| SerializedDataPipeHandleDispatcher* serialization = |
| static_cast<SerializedDataPipeHandleDispatcher*>(destination); |
| - if (channel_handle.is_valid()) { |
| - serialization->platform_handle_index = platform_handles->size(); |
| - platform_handles->push_back(channel_handle.release()); |
| + if (serialized_channel_handle_.is_valid()) { |
| + serialization->channel_handle_index = platform_handles->size(); |
| + platform_handles->push_back(serialized_channel_handle_.release()); |
| } else { |
| - serialization->platform_handle_index = kInvalidDataPipeHandleIndex; |
| + serialization->channel_handle_index = kInvalidDataPipeHandleIndex; |
| } |
| - serialization->flags = options.flags; |
| - serialization->element_num_bytes = options.element_num_bytes; |
| - serialization->capacity_num_bytes = options.capacity_num_bytes; |
| + serialization->flags = options_.flags; |
| + serialization->element_num_bytes = options_.element_num_bytes; |
| + serialization->capacity_num_bytes = options_.capacity_num_bytes; |
| - serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); |
| - if (serialization->shared_memory_size) { |
| - serialization->shared_memory_handle_index = platform_handles->size(); |
| - platform_handles->push_back(shared_memory_handle.release()); |
| + serialization->channel_pending_read_size = |
| + static_cast<uint32_t>(serialized_read_buffer_.size()); |
| + serialization->channel_pending_write_size = |
| + static_cast<uint32_t>(serialized_write_buffer_.size()); |
| + |
| + if (channel_shared_handle.is_valid()) { |
| + serialization->channel_shared_handle_index = platform_handles->size(); |
| + platform_handles->push_back(channel_shared_handle.release()); |
| } else { |
| - serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; |
| + serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex; |
| + } |
| + |
| + ScopedPlatformHandle shared_buffer_handle; |
| + if (shared_buffer_) { |
| + shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release()); |
| } |
| + if (shared_buffer_handle.is_valid()) { |
| + serialization->shared_buffer_handle_index = platform_handles->size(); |
| + platform_handles->push_back(shared_buffer_handle.release()); |
| + } else { |
| + serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex; |
| + } |
| + |
| + serialization->ring_buffer_start = ring_buffer_start_; |
| + serialization->ring_buffer_size = ring_buffer_size_; |
| + |
| *actual_size = sizeof(SerializedDataPipeHandleDispatcher); |
| } |
| @@ -145,8 +271,12 @@ ScopedPlatformHandle DataPipe::Deserialize( |
| size_t size, |
| PlatformHandleVector* platform_handles, |
| MojoCreateDataPipeOptions* options, |
| - ScopedPlatformHandle* shared_memory_handle, |
| - size_t* shared_memory_size) { |
| + ScopedPlatformHandle* channel_shared_handle, |
| + size_t* serialized_read_buffer_size, |
| + size_t* serialized_write_buffer_size, |
| + ScopedPlatformHandle* shared_buffer_handle, |
| + size_t* ring_buffer_start, |
| + size_t* ring_buffer_size) { |
| if (size != sizeof(SerializedDataPipeHandleDispatcher)) { |
| LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; |
| return ScopedPlatformHandle(); |
| @@ -154,52 +284,332 @@ ScopedPlatformHandle DataPipe::Deserialize( |
| const SerializedDataPipeHandleDispatcher* serialization = |
| static_cast<const SerializedDataPipeHandleDispatcher*>(source); |
| - size_t platform_handle_index = serialization->platform_handle_index; |
| // Starts off invalid, which is what we want. |
| - PlatformHandle platform_handle; |
| - if (platform_handle_index != kInvalidDataPipeHandleIndex) { |
| - if (!platform_handles || |
| - platform_handle_index >= platform_handles->size()) { |
| - LOG(ERROR) |
| - << "Invalid serialized data pipe dispatcher (missing handles)"; |
| + PlatformHandle channel_handle; |
| + size_t channel_handle_index = serialization->channel_handle_index; |
| + if (channel_handle_index != kInvalidDataPipeHandleIndex) { |
| + if (!platform_handles || channel_handle_index >= platform_handles->size()) { |
| + LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
| return ScopedPlatformHandle(); |
| } |
| // We take ownership of the handle, so we have to invalidate the one in |
| // |platform_handles|. |
| - std::swap(platform_handle, (*platform_handles)[platform_handle_index]); |
| + std::swap(channel_handle, (*platform_handles)[channel_handle_index]); |
| } |
| options->struct_size = sizeof(MojoCreateDataPipeOptions); |
| options->flags = serialization->flags; |
| options->element_num_bytes = serialization->element_num_bytes; |
| options->capacity_num_bytes = serialization->capacity_num_bytes; |
| + *serialized_read_buffer_size = serialization->channel_pending_read_size; |
| + *serialized_write_buffer_size = serialization->channel_pending_write_size; |
| + *ring_buffer_start = serialization->ring_buffer_start; |
| + *ring_buffer_size = serialization->ring_buffer_size; |
| + |
| + size_t channel_shared_handle_index = |
| + serialization->channel_shared_handle_index; |
| + if (*serialized_read_buffer_size || *serialized_write_buffer_size) { |
| + DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex); |
| + if (!platform_handles || |
| + channel_shared_handle_index >= platform_handles->size()) { |
| + LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
| + return ScopedPlatformHandle(); |
| + } |
| - if (shared_memory_size) { |
| - *shared_memory_size = serialization->shared_memory_size; |
| - if (*shared_memory_size) { |
| - DCHECK(serialization->shared_memory_handle_index != |
| - kInvalidDataPipeHandleIndex); |
| - if (!platform_handles || |
| - serialization->shared_memory_handle_index >= |
| - platform_handles->size()) { |
| - LOG(ERROR) << "Invalid serialized data pipe dispatcher " |
| - << "(missing handles)"; |
| - return ScopedPlatformHandle(); |
| - } |
| + // Take ownership. |
| + PlatformHandle temp_channel_shared_handle; |
| + std::swap(temp_channel_shared_handle, |
| + (*platform_handles)[channel_shared_handle_index]); |
| + *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle); |
| + } |
| + |
| + size_t shared_buffer_handle_index = serialization->shared_buffer_handle_index; |
| + if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) { |
| + if (!platform_handles || |
| + shared_buffer_handle_index >= platform_handles->size()) { |
| + LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
| + return ScopedPlatformHandle(); |
| + } |
| + |
| + // Take ownership. |
| + PlatformHandle temp_shared_buffer_handle; |
| + std::swap(temp_shared_buffer_handle, |
| + (*platform_handles)[shared_buffer_handle_index]); |
| + *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle); |
| + } |
| + |
| + return ScopedPlatformHandle(channel_handle); |
| +} |
| + |
| +void* DataPipe::GetSharedBufferBase() { |
| + if (!mapping_) { |
| + DCHECK(shared_buffer_); |
| + mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes); |
| + } |
| + if (!mapping_) { |
| + return nullptr; |
| + } |
| + return mapping_->GetBase(); |
| +} |
| + |
| +bool DataPipe::WriteDataIntoSharedBuffer(const void* elements, |
| + size_t num_bytes) { |
| + size_t bufsz = options_.capacity_num_bytes; |
|
Anand Mistry (off Chromium)
2016/01/08 03:28:02
This isn't Go. Don't abbreviate unnecessarily.
Eliot Courtney
2016/01/08 04:44:39
Done.
|
| + uint8_t* base = static_cast<uint8_t*>(GetSharedBufferBase()); |
| + if (base == nullptr) { |
|
Anand Mistry (off Chromium)
2016/01/08 03:28:02
generally prefer:
if (!base)
Eliot Courtney
2016/01/08 04:44:39
Done.
|
| + return false; |
| + } |
| + |
| + DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); |
| + DCHECK_LE(num_bytes, bufsz - ring_buffer_size_); |
| + |
| + size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; |
| + if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
| + size_t bytes_until_end = bufsz - ring_buffer_end; |
| + memcpy(base + ring_buffer_end, elements, |
| + std::min(num_bytes, bytes_until_end)); |
| + |
| + if (bytes_until_end < num_bytes) { |
| + memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end, |
| + num_bytes - bytes_until_end); |
| + } |
| + } else { |
| + memcpy(base + ring_buffer_end, elements, num_bytes); |
| + } |
| + |
| + return true; |
| +} |
| - PlatformHandle temp_shared_memory_handle; |
| - std::swap(temp_shared_memory_handle, |
| - (*platform_handles)[serialization->shared_memory_handle_index]); |
| - *shared_memory_handle = |
| - ScopedPlatformHandle(temp_shared_memory_handle); |
| +bool DataPipe::ReadDataFromSharedBuffer(void* data, size_t num_bytes) { |
| + DCHECK_LE(num_bytes, ring_buffer_size_); |
| + DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); |
| + |
| + size_t bufsz = options_.capacity_num_bytes; |
| + uint8_t* base = static_cast<uint8_t*>(GetSharedBufferBase()); |
| + if (base == nullptr) { |
| + return false; |
| + } |
| + |
| + size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; |
| + if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
| + memcpy(data, base + ring_buffer_start_, num_bytes); |
| + } else { |
| + size_t bytes_until_end = bufsz - ring_buffer_start_; |
| + memcpy(data, base + ring_buffer_start_, |
| + std::min(num_bytes, bytes_until_end)); |
| + |
| + if (bytes_until_end < num_bytes) { |
| + memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base, |
| + num_bytes - bytes_until_end); |
| } |
| } |
| - size -= sizeof(SerializedDataPipeHandleDispatcher); |
| + return true; |
| +} |
| + |
| +size_t DataPipe::GetReadableBytes() const { |
| + return shared_buffer_ ? ring_buffer_size_ : 0; |
| +} |
| + |
| +size_t DataPipe::GetWritableBytes() const { |
| + return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0; |
| +} |
| + |
| +void DataPipe::UpdateFromRead(size_t num_bytes) { |
| + ring_buffer_start_ = |
| + (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes; |
| + ring_buffer_size_ -= num_bytes; |
| + DCHECK_GE(ring_buffer_size_, 0u); |
| +} |
| + |
| +void DataPipe::UpdateFromWrite(size_t num_bytes) { |
| + ring_buffer_size_ += num_bytes; |
| + DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes); |
| +} |
| + |
| +bool DataPipe::NotifyWrite(size_t num_bytes) { |
| + DCHECK(is_producer_); |
| + |
| + if (!channel_) |
| + return false; |
| + |
| + DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes}; |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
| + if (!channel_->WriteMessage(std::move(message))) { |
| + return false; |
| + } |
| + return true; |
| +} |
| + |
| +bool DataPipe::NotifyRead(size_t num_bytes) { |
| + DCHECK(!is_producer_); |
| + |
| + if (!channel_) |
| + return false; |
| + DataPipeCommandHeader command = {DATA_READ, num_bytes}; |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
| + if (!channel_->WriteMessage(std::move(message))) { |
| + return false; |
| + } |
| + return true; |
| +} |
| + |
| +void DataPipe::NotifySharedBuffer() { |
| + if (!channel_) { |
| + DLOG(ERROR) << "NotifySharedBuffer: No channel"; |
| + return; |
| + } |
| + DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER, |
| + options_.capacity_num_bytes}; |
| + ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle(); |
| + DCHECK(handle.is_valid()); |
| + ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector()); |
| + fds->push_back(handle.release()); |
| + |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
| + message->SetTransportData(make_scoped_ptr(new TransportData( |
| + std::move(fds), channel_->GetSerializedPlatformHandleSize()))); |
| + if (!channel_->WriteMessage(std::move(message))) { |
| + DLOG(ERROR) << "NotifySharedBuffer: Can't write"; |
| + } |
| +} |
| + |
| +void DataPipe::RequestSharedBuffer() { |
| + if (!channel_) { |
| + DLOG(ERROR) << "NotifySharedBuffer: No channel"; |
| + return; |
| + } |
| + |
| + DataPipeCommandHeader command = {PLEASE_CREATE_SHARED_BUFFER, |
| + options_.capacity_num_bytes}; |
| + |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
| + if (!channel_->WriteMessage(std::move(message))) { |
| + DLOG(ERROR) << "RequestSharedBuffer: Can't write"; |
| + } |
| +} |
| + |
| +void DataPipe::UpdateSharedBuffer( |
| + scoped_refptr<PlatformSharedBuffer> shared_buffer) { |
| + shared_buffer_ = shared_buffer; |
| + mapping_ = nullptr; |
| +} |
| + |
| +void DataPipe::Shutdown() { |
| + if (channel_) { |
| + channel_->Shutdown(); |
| + channel_ = nullptr; |
| + } |
| +} |
| + |
| +void DataPipe::CreateEquivalentAndClose(DataPipe* out) { |
| + // Serialize, releasing the handle, otherwise we will get callbacks to the |
| + // wrong delegate for RawChannel. |
| + Serialize(); |
| + |
| + std::swap(options_, out->options_); |
| + std::swap(channel_released_, out->channel_released_); |
| + serialized_read_buffer_.swap(out->serialized_read_buffer_); |
| + serialized_write_buffer_.swap(out->serialized_write_buffer_); |
| + serialized_channel_handle_.swap(out->serialized_channel_handle_); |
| + shared_buffer_.swap(out->shared_buffer_); |
| + mapping_.swap(out->mapping_); |
| + std::swap(ring_buffer_start_, out->ring_buffer_start_); |
| + std::swap(ring_buffer_size_, out->ring_buffer_size_); |
| + std::swap(is_producer_, out->is_producer_); |
| +} |
| + |
| +void* DataPipe::GetWriteBuffer(size_t* num_bytes) { |
| + if (!shared_buffer_) { |
| + *num_bytes = 0; |
| + return nullptr; |
| + } |
| + // Must provide sequential memory. |
| + size_t bufsz = options_.capacity_num_bytes; |
| + size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; |
| + if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
| + *num_bytes = bufsz - ring_buffer_end; |
| + } else { |
| + *num_bytes = ring_buffer_start_ - ring_buffer_end; |
| + } |
| + |
| + return static_cast<uint8_t*>(GetSharedBufferBase()) + ring_buffer_end; |
| +} |
| + |
| +const void* DataPipe::GetReadBuffer(size_t* num_bytes) { |
| + if (!shared_buffer_) { |
| + *num_bytes = 0; |
| + return nullptr; |
| + } |
| + // Must provide sequential memory. |
| + size_t bufsz = options_.capacity_num_bytes; |
| + size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; |
| + if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
| + *num_bytes = ring_buffer_size_; |
| + } else { |
| + *num_bytes = bufsz - ring_buffer_start_; |
| + } |
| + |
| + return static_cast<uint8_t*>(GetSharedBufferBase()) + ring_buffer_start_; |
| +} |
| + |
| +bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command, |
| + ScopedPlatformHandleVectorPtr platform_handles) { |
| + PlatformHandle handle; |
| + ScopedPlatformHandle scoped_handle; |
| + scoped_refptr<PlatformSharedBuffer> shared_buffer; |
| + bool was_unreadable = GetReadableBytes() == 0; |
| + bool was_unwritable = GetWritableBytes() == 0; |
| + switch (command.command) { |
| + case PLEASE_CREATE_SHARED_BUFFER: |
| + // Other end wasn't able to create the shared buffer. |
| + shared_buffer = |
| + internal::g_platform_support->CreateSharedBuffer(command.num_bytes); |
| + CHECK(shared_buffer); |
| + UpdateSharedBuffer(shared_buffer); |
| + |
| + // Can't call back into RawChannel because of locking, so do this later. |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); |
| + break; |
| + case NOTIFY_SHARED_BUFFER: |
| + CHECK_EQ(platform_handles->size(), 1u); |
| + std::swap(handle, (*platform_handles.get())[0]); |
| + scoped_handle.reset(handle); |
| + UpdateSharedBuffer( |
| + internal::g_platform_support->CreateSharedBufferFromHandle( |
| + command.num_bytes, std::move(scoped_handle))); |
| + break; |
| + case DATA_WRITTEN: |
| + if (!is_producer_) { |
| + UpdateFromWrite(command.num_bytes); |
| + } else { |
| + LOG(ERROR) << "Producer was told of a write, which shouldn't happen."; |
| + DCHECK(0); |
| + } |
| + break; |
| + case DATA_READ: |
| + if (is_producer_) { |
| + UpdateFromRead(command.num_bytes); |
| + } else { |
| + LOG(ERROR) << "Consumer was told of a read, which shouldn't happen."; |
| + DCHECK(0); |
| + } |
| + break; |
| + default: |
| + LOG(ERROR) << "Shouldn't happen"; |
| + DCHECK(0); |
| + } |
| - return ScopedPlatformHandle(platform_handle); |
| + // Handles write/read case and shared buffer becoming available case. |
| + return (was_unreadable && GetReadableBytes()) || |
| + (was_unwritable && GetWritableBytes()); |
| } |
| } // namespace edk |