Index: mojo/edk/system/data_pipe.cc |
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc |
index 54a6df45ae6bb2d434b1d28d2b314ac2cd2c8703..9371b25de78e701487851fa98b0a5af831f77b11 100644 |
--- a/mojo/edk/system/data_pipe.cc |
+++ b/mojo/edk/system/data_pipe.cc |
@@ -11,6 +11,11 @@ |
#include <algorithm> |
#include <limits> |
+#include "base/bind.h" |
+#include "mojo/edk/embedder/embedder_internal.h" |
+#include "mojo/edk/embedder/platform_support.h" |
+#include "mojo/edk/system/broker.h" |
+ |
#include "mojo/edk/system/configuration.h" |
#include "mojo/edk/system/options_validation.h" |
#include "mojo/edk/system/raw_channel.h" |
@@ -24,15 +29,20 @@ namespace { |
const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1); |
struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { |
- uint32_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
+ uint32_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
// These are from MojoCreateDataPipeOptions |
MojoCreateDataPipeOptionsFlags flags; |
uint32_t element_num_bytes; |
uint32_t capacity_num_bytes; |
- uint32_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
- uint32_t shared_memory_size; |
+ uint32_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
+ uint32_t channel_pending_read_size; |
+ uint32_t channel_pending_write_size; |
+ |
+ uint32_t shared_buffer_handle_index; |
+ uint32_t ring_buffer_start; |
+ uint32_t ring_buffer_size; |
}; |
} // namespace |
@@ -100,50 +110,175 @@ MojoResult DataPipe::ValidateCreateOptions( |
return MOJO_RESULT_OK; |
} |
-void DataPipe::StartSerialize(bool have_channel_handle, |
- bool have_shared_memory, |
- size_t* max_size, |
- size_t* max_platform_handles) { |
+DataPipe::DataPipe(const MojoCreateDataPipeOptions& options) |
+ : channel_(nullptr), |
+ options_(options), |
+ channel_released_(false), |
+ shared_buffer_(nullptr), |
+ mapping_(nullptr), |
+ ring_buffer_start_(0u), |
+ ring_buffer_size_(0u) {} |
+ |
+DataPipe::~DataPipe() {} |
+ |
+void DataPipe::Init(ScopedPlatformHandle message_pipe, |
Anand Mistry (off Chromium)
2016/01/11 06:19:34
s/message_pipe/channel
where appropriate.
Eliot Courtney
2016/01/13 00:00:09
Done.
|
+ char* serialized_write_buffer, |
+ uint32_t serialized_write_buffer_size, |
+ char* serialized_read_buffer, |
+ uint32_t serialized_read_buffer_size, |
+ ScopedPlatformHandle shared_buffer_handle, |
+ uint32_t ring_buffer_start, |
+ uint32_t ring_buffer_size, |
+ bool is_producer, |
+ const base::Closure& init_callback) { |
+ is_producer_ = is_producer; |
+ |
+ ring_buffer_start_ = ring_buffer_start; |
+ ring_buffer_size_ = ring_buffer_size; |
+ |
+ if (shared_buffer_handle.is_valid()) { |
+ shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle( |
+ options_.capacity_num_bytes, std::move(shared_buffer_handle)); |
+ DCHECK(shared_buffer_); |
+ } else if (is_producer_) { |
+#if defined(OS_WIN) |
+ shared_buffer_ = internal::g_platform_support->CreateSharedBuffer( |
+ options_.capacity_num_bytes); |
+#else |
+ shared_buffer_ = |
+ internal::g_broker->CreateSharedBuffer(options_.capacity_num_bytes); |
+#endif |
+ CHECK(shared_buffer_); |
+ } |
+ |
+ if (message_pipe.is_valid()) { |
+ channel_ = RawChannel::Create(std::move(message_pipe)); |
+ channel_->SetSerializedData(serialized_read_buffer, |
+ serialized_read_buffer_size, |
+ serialized_write_buffer, |
+ serialized_write_buffer_size, nullptr, nullptr); |
+ internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback); |
+ channel_->EnsureLazyInitialized(); |
+ |
+ if (is_producer_) { |
+ internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); |
+ } |
+ } |
+} |
+ |
+void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) { |
+ // We need to release the channel and get its pending reads/writes. |
+ if (!channel_released_) { |
+ Serialize(); |
+ } |
+ |
*max_size = sizeof(SerializedDataPipeHandleDispatcher); |
*max_platform_handles = 0; |
- if (have_channel_handle) |
+ if (serialized_channel_handle_.is_valid()) |
(*max_platform_handles)++; |
- if (have_shared_memory) |
+ // For shared buffer to transfer pending reads / writes. |
+ if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty()) |
(*max_platform_handles)++; |
+ if (shared_buffer_) |
+ (*max_platform_handles)++; |
+} |
+ |
+void DataPipe::Serialize() { |
+ // We need to stop watching handle immediately, even though not on IO thread, |
+ // so that other messages aren't read after this. |
+ if (channel_) { |
+ std::vector<int> fds; |
+ bool write_error = false; |
+ serialized_channel_handle_ = channel_->ReleaseHandle( |
Anand Mistry (off Chromium)
2016/01/11 06:19:34
From what I can tell, this is racy. Basically, thi
Eliot Courtney
2016/01/14 02:17:57
Discussed offline -- this is fairly impossible to
|
+ &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds, |
+ &write_error); |
+ |
+ CHECK(fds.empty()); |
+ if (write_error) { |
+ serialized_channel_handle_.reset(); |
+ } |
+ channel_ = nullptr; |
+ } |
+ channel_released_ = true; |
} |
-void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, |
- ScopedPlatformHandle channel_handle, |
- ScopedPlatformHandle shared_memory_handle, |
- size_t shared_memory_size, |
- void* destination, |
+void DataPipe::EndSerialize(void* destination, |
size_t* actual_size, |
PlatformHandleVector* platform_handles) { |
+ ScopedPlatformHandle channel_shared_handle; |
+ size_t channel_shared_size = |
+ serialized_read_buffer_.size() + serialized_write_buffer_.size(); |
+ if (channel_shared_size) { |
+#if defined(OS_WIN) |
+ scoped_refptr<PlatformSharedBuffer> shared_buffer( |
+ internal::g_platform_support->CreateSharedBuffer(channel_shared_size)); |
+#else |
+ scoped_refptr<PlatformSharedBuffer> shared_buffer( |
+ internal::g_broker->CreateSharedBuffer(channel_shared_size)); |
+#endif |
+ |
+ scoped_ptr<PlatformSharedBufferMapping> mapping( |
+ shared_buffer->Map(0, channel_shared_size)); |
+ |
+ void* base = mapping->GetBase(); |
+ if (!serialized_read_buffer_.empty()) { |
+ memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size()); |
+ } |
+ if (!serialized_write_buffer_.empty()) { |
+ memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(), |
+ &serialized_write_buffer_[0], serialized_write_buffer_.size()); |
+ } |
+ |
+ channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release()); |
+ } |
+ |
SerializedDataPipeHandleDispatcher* serialization = |
static_cast<SerializedDataPipeHandleDispatcher*>(destination); |
- if (channel_handle.is_valid()) { |
+ if (serialized_channel_handle_.is_valid()) { |
DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
- serialization->platform_handle_index = |
+ serialization->channel_handle_index = |
static_cast<uint32_t>(platform_handles->size()); |
- platform_handles->push_back(channel_handle.release()); |
+ platform_handles->push_back(serialized_channel_handle_.release()); |
} else { |
- serialization->platform_handle_index = kInvalidDataPipeHandleIndex; |
+ serialization->channel_handle_index = kInvalidDataPipeHandleIndex; |
} |
- serialization->flags = options.flags; |
- serialization->element_num_bytes = options.element_num_bytes; |
- serialization->capacity_num_bytes = options.capacity_num_bytes; |
+ serialization->flags = options_.flags; |
+ serialization->element_num_bytes = options_.element_num_bytes; |
+ serialization->capacity_num_bytes = options_.capacity_num_bytes; |
- serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); |
- if (serialization->shared_memory_size) { |
+ serialization->channel_pending_read_size = |
+ static_cast<uint32_t>(serialized_read_buffer_.size()); |
+ serialization->channel_pending_write_size = |
+ static_cast<uint32_t>(serialized_write_buffer_.size()); |
+ |
+ if (channel_shared_handle.is_valid()) { |
DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
- serialization->shared_memory_handle_index = |
+ serialization->channel_shared_handle_index = |
static_cast<uint32_t>(platform_handles->size()); |
- platform_handles->push_back(shared_memory_handle.release()); |
+ platform_handles->push_back(channel_shared_handle.release()); |
} else { |
- serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; |
+ serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex; |
+ } |
+ |
+ ScopedPlatformHandle shared_buffer_handle; |
+ if (shared_buffer_) { |
+ shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release()); |
} |
+ if (shared_buffer_handle.is_valid()) { |
+ DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
+ serialization->shared_buffer_handle_index = |
+ static_cast<uint32_t>(platform_handles->size()); |
+ platform_handles->push_back(shared_buffer_handle.release()); |
+ } else { |
+ serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex; |
+ } |
+ |
+ serialization->ring_buffer_start = ring_buffer_start_; |
+ serialization->ring_buffer_size = ring_buffer_size_; |
+ |
*actual_size = sizeof(SerializedDataPipeHandleDispatcher); |
} |
@@ -152,8 +287,12 @@ ScopedPlatformHandle DataPipe::Deserialize( |
size_t size, |
PlatformHandleVector* platform_handles, |
MojoCreateDataPipeOptions* options, |
- ScopedPlatformHandle* shared_memory_handle, |
- size_t* shared_memory_size) { |
+ ScopedPlatformHandle* channel_shared_handle, |
+ uint32_t* serialized_read_buffer_size, |
+ uint32_t* serialized_write_buffer_size, |
+ ScopedPlatformHandle* shared_buffer_handle, |
+ uint32_t* ring_buffer_start, |
+ uint32_t* ring_buffer_size) { |
if (size != sizeof(SerializedDataPipeHandleDispatcher)) { |
LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; |
return ScopedPlatformHandle(); |
@@ -161,52 +300,308 @@ ScopedPlatformHandle DataPipe::Deserialize( |
const SerializedDataPipeHandleDispatcher* serialization = |
static_cast<const SerializedDataPipeHandleDispatcher*>(source); |
- size_t platform_handle_index = serialization->platform_handle_index; |
// Starts off invalid, which is what we want. |
- PlatformHandle platform_handle; |
- if (platform_handle_index != kInvalidDataPipeHandleIndex) { |
- if (!platform_handles || |
- platform_handle_index >= platform_handles->size()) { |
- LOG(ERROR) |
- << "Invalid serialized data pipe dispatcher (missing handles)"; |
+ PlatformHandle channel_handle; |
+ uint32_t channel_handle_index = serialization->channel_handle_index; |
+ if (channel_handle_index != kInvalidDataPipeHandleIndex) { |
+ if (!platform_handles || channel_handle_index >= platform_handles->size()) { |
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
return ScopedPlatformHandle(); |
} |
// We take ownership of the handle, so we have to invalidate the one in |
// |platform_handles|. |
- std::swap(platform_handle, (*platform_handles)[platform_handle_index]); |
+ std::swap(channel_handle, (*platform_handles)[channel_handle_index]); |
} |
options->struct_size = sizeof(MojoCreateDataPipeOptions); |
options->flags = serialization->flags; |
options->element_num_bytes = serialization->element_num_bytes; |
options->capacity_num_bytes = serialization->capacity_num_bytes; |
+ *serialized_read_buffer_size = serialization->channel_pending_read_size; |
+ *serialized_write_buffer_size = serialization->channel_pending_write_size; |
+ *ring_buffer_start = serialization->ring_buffer_start; |
+ *ring_buffer_size = serialization->ring_buffer_size; |
+ |
+ uint32_t channel_shared_handle_index = |
+ serialization->channel_shared_handle_index; |
+ if (*serialized_read_buffer_size || *serialized_write_buffer_size) { |
+ DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex); |
+ if (!platform_handles || |
+ channel_shared_handle_index >= platform_handles->size()) { |
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
+ return ScopedPlatformHandle(); |
+ } |
- if (shared_memory_size) { |
- *shared_memory_size = serialization->shared_memory_size; |
- if (*shared_memory_size) { |
- DCHECK(serialization->shared_memory_handle_index != |
- kInvalidDataPipeHandleIndex); |
- if (!platform_handles || |
- serialization->shared_memory_handle_index >= |
- platform_handles->size()) { |
- LOG(ERROR) << "Invalid serialized data pipe dispatcher " |
- << "(missing handles)"; |
- return ScopedPlatformHandle(); |
- } |
+ // Take ownership. |
+ PlatformHandle temp_channel_shared_handle; |
+ std::swap(temp_channel_shared_handle, |
+ (*platform_handles)[channel_shared_handle_index]); |
+ *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle); |
+ } |
- PlatformHandle temp_shared_memory_handle; |
- std::swap(temp_shared_memory_handle, |
- (*platform_handles)[serialization->shared_memory_handle_index]); |
- *shared_memory_handle = |
- ScopedPlatformHandle(temp_shared_memory_handle); |
+ uint32_t shared_buffer_handle_index = |
+ serialization->shared_buffer_handle_index; |
+ if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) { |
+ if (!platform_handles || |
+ shared_buffer_handle_index >= platform_handles->size()) { |
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
+ return ScopedPlatformHandle(); |
} |
+ |
+ // Take ownership. |
+ PlatformHandle temp_shared_buffer_handle; |
+ std::swap(temp_shared_buffer_handle, |
+ (*platform_handles)[shared_buffer_handle_index]); |
+ *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle); |
} |
- size -= sizeof(SerializedDataPipeHandleDispatcher); |
+ return ScopedPlatformHandle(channel_handle); |
+} |
+ |
+uint8_t* DataPipe::GetSharedBufferBase() { |
+ if (!mapping_) { |
+ DCHECK(shared_buffer_); |
+ mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes); |
+ } |
+ if (!mapping_) |
+ return nullptr; |
+ return static_cast<uint8_t*>(mapping_->GetBase()); |
+} |
+ |
+bool DataPipe::WriteDataIntoSharedBuffer(const void* elements, |
+ uint32_t num_bytes) { |
+ uint32_t buffer_size = options_.capacity_num_bytes; |
+ uint8_t* base = GetSharedBufferBase(); |
+ if (!base) |
+ return false; |
+ |
+ DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); |
+ DCHECK_LE(num_bytes, buffer_size - ring_buffer_size_); |
+ |
+ uint32_t ring_buffer_end = |
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size; |
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
+ uint32_t bytes_until_end = buffer_size - ring_buffer_end; |
+ memcpy(base + ring_buffer_end, elements, |
+ std::min(num_bytes, bytes_until_end)); |
+ |
+ if (bytes_until_end < num_bytes) { |
+ memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end, |
+ num_bytes - bytes_until_end); |
+ } |
+ } else { |
+ memcpy(base + ring_buffer_end, elements, num_bytes); |
+ } |
+ |
+ return true; |
+} |
+ |
+bool DataPipe::ReadDataFromSharedBuffer(void* data, uint32_t num_bytes) { |
+ DCHECK_LE(num_bytes, ring_buffer_size_); |
+ DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); |
+ |
+ uint32_t buffer_size = options_.capacity_num_bytes; |
+ uint8_t* base = GetSharedBufferBase(); |
+ if (!base) |
+ return false; |
+ |
+ uint32_t ring_buffer_end = |
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size; |
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
+ memcpy(data, base + ring_buffer_start_, num_bytes); |
+ } else { |
+ uint32_t bytes_until_end = buffer_size - ring_buffer_start_; |
+ memcpy(data, base + ring_buffer_start_, |
+ std::min(num_bytes, bytes_until_end)); |
+ |
+ if (bytes_until_end < num_bytes) { |
+ memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base, |
+ num_bytes - bytes_until_end); |
+ } |
+ } |
+ |
+ return true; |
+} |
+ |
+uint32_t DataPipe::GetReadableBytes() const { |
+ return shared_buffer_ ? ring_buffer_size_ : 0; |
+} |
+ |
+uint32_t DataPipe::GetWritableBytes() const { |
+ return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0; |
+} |
+ |
+void DataPipe::UpdateFromRead(uint32_t num_bytes) { |
+ ring_buffer_start_ = |
+ (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes; |
+ ring_buffer_size_ -= num_bytes; |
+ DCHECK_GE(ring_buffer_size_, 0u); |
+} |
+ |
+void DataPipe::UpdateFromWrite(uint32_t num_bytes) { |
+ ring_buffer_size_ += num_bytes; |
+ DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes); |
+} |
+ |
+bool DataPipe::NotifyWrite(uint32_t num_bytes) { |
+ DCHECK(is_producer_); |
+ |
+ if (!channel_) |
+ return false; |
+ |
+ DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes}; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
+ if (!channel_->WriteMessage(std::move(message))) { |
+ return false; |
+ } |
+ return true; |
+} |
+ |
+bool DataPipe::NotifyRead(uint32_t num_bytes) { |
+ DCHECK(!is_producer_); |
+ |
+ if (!channel_) |
+ return false; |
+ DataPipeCommandHeader command = {DATA_READ, num_bytes}; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
+ if (!channel_->WriteMessage(std::move(message))) { |
+ return false; |
+ } |
+ return true; |
+} |
+ |
+void DataPipe::NotifySharedBuffer() { |
+ if (!channel_) { |
+ DLOG(ERROR) << "NotifySharedBuffer: No channel"; |
+ return; |
+ } |
+ DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER, |
+ options_.capacity_num_bytes}; |
+ ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle(); |
+ DCHECK(handle.is_valid()); |
+ ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector()); |
+ fds->push_back(handle.release()); |
+ |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
+ message->SetTransportData(make_scoped_ptr(new TransportData( |
+ std::move(fds), channel_->GetSerializedPlatformHandleSize()))); |
+ if (!channel_->WriteMessage(std::move(message))) { |
+ DLOG(ERROR) << "NotifySharedBuffer: Can't write"; |
+ } |
+} |
+ |
+void DataPipe::UpdateSharedBuffer( |
+ scoped_refptr<PlatformSharedBuffer> shared_buffer) { |
+ DCHECK(shared_buffer); |
+ shared_buffer_ = shared_buffer; |
+ mapping_ = nullptr; |
+} |
+ |
+void DataPipe::Shutdown() { |
+ if (channel_) { |
+ channel_->Shutdown(); |
+ channel_ = nullptr; |
+ } |
+} |
+ |
+void DataPipe::CreateEquivalentAndClose(DataPipe* out) { |
+ // Serialize, releasing the handle, otherwise we will get callbacks to the |
+ // wrong delegate for RawChannel. |
+ Serialize(); |
+ |
+ std::swap(options_, out->options_); |
+ std::swap(channel_released_, out->channel_released_); |
+ serialized_read_buffer_.swap(out->serialized_read_buffer_); |
+ serialized_write_buffer_.swap(out->serialized_write_buffer_); |
+ serialized_channel_handle_.swap(out->serialized_channel_handle_); |
+ shared_buffer_.swap(out->shared_buffer_); |
+ mapping_.swap(out->mapping_); |
+ std::swap(ring_buffer_start_, out->ring_buffer_start_); |
+ std::swap(ring_buffer_size_, out->ring_buffer_size_); |
+ std::swap(is_producer_, out->is_producer_); |
+} |
+ |
+void* DataPipe::GetWriteBuffer(uint32_t* num_bytes) { |
+ if (!shared_buffer_) { |
+ *num_bytes = 0; |
+ return nullptr; |
+ } |
+ // Must provide sequential memory. |
+ uint32_t buffer_size = options_.capacity_num_bytes; |
+ uint32_t ring_buffer_end = |
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size; |
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
+ *num_bytes = buffer_size - ring_buffer_end; |
+ } else { |
+ *num_bytes = ring_buffer_start_ - ring_buffer_end; |
+ } |
+ |
+ return GetSharedBufferBase() + ring_buffer_end; |
+} |
+ |
+const void* DataPipe::GetReadBuffer(uint32_t* num_bytes) { |
+ if (!shared_buffer_) { |
+ *num_bytes = 0; |
+ return nullptr; |
+ } |
+ // Must provide sequential memory. |
+ uint32_t buffer_size = options_.capacity_num_bytes; |
+ uint32_t ring_buffer_end = |
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size; |
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
+ *num_bytes = ring_buffer_size_; |
+ } else { |
+ *num_bytes = buffer_size - ring_buffer_start_; |
+ } |
+ |
+ return GetSharedBufferBase() + ring_buffer_start_; |
+} |
+ |
+bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command, |
+ ScopedPlatformHandleVectorPtr platform_handles) { |
+ PlatformHandle handle; |
+ ScopedPlatformHandle scoped_handle; |
+ scoped_refptr<PlatformSharedBuffer> shared_buffer; |
+ bool was_unreadable = GetReadableBytes() == 0; |
+ bool was_unwritable = GetWritableBytes() == 0; |
+ switch (command.command) { |
+ case NOTIFY_SHARED_BUFFER: |
+ CHECK_EQ(platform_handles->size(), 1u); |
+ std::swap(handle, (*platform_handles.get())[0]); |
+ scoped_handle.reset(handle); |
+ UpdateSharedBuffer( |
+ internal::g_platform_support->CreateSharedBufferFromHandle( |
+ command.num_bytes, std::move(scoped_handle))); |
+ break; |
+ case DATA_WRITTEN: |
+ if (!is_producer_) { |
+ UpdateFromWrite(command.num_bytes); |
+ } else { |
+ LOG(ERROR) << "Producer was told of a write, which shouldn't happen."; |
+ DCHECK(0); |
Anand Mistry (off Chromium)
2016/01/11 06:19:34
NOTREACHED();
Eliot Courtney
2016/01/13 00:00:09
Done.
|
+ } |
+ break; |
+ case DATA_READ: |
+ if (is_producer_) { |
+ UpdateFromRead(command.num_bytes); |
+ } else { |
+ LOG(ERROR) << "Consumer was told of a read, which shouldn't happen."; |
+ DCHECK(0); |
+ } |
+ break; |
+ default: |
+ LOG(ERROR) << "Shouldn't happen"; |
+ DCHECK(0); |
+ } |
- return ScopedPlatformHandle(platform_handle); |
+ // Handles write/read case and shared buffer becoming available case. |
+ return (was_unreadable && GetReadableBytes()) || |
+ (was_unwritable && GetWritableBytes()); |
} |
} // namespace edk |