Index: mojo/edk/system/data_pipe.cc |
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc |
index 54a6df45ae6bb2d434b1d28d2b314ac2cd2c8703..756ed64a42ac6f0e66f157fba3edc108d1cd02d5 100644 |
--- a/mojo/edk/system/data_pipe.cc |
+++ b/mojo/edk/system/data_pipe.cc |
@@ -11,6 +11,11 @@ |
#include <algorithm> |
#include <limits> |
+#include "base/bind.h" |
+#include "mojo/edk/embedder/embedder_internal.h" |
+#include "mojo/edk/embedder/platform_support.h" |
+#include "mojo/edk/system/broker.h" |
+ |
#include "mojo/edk/system/configuration.h" |
#include "mojo/edk/system/options_validation.h" |
#include "mojo/edk/system/raw_channel.h" |
@@ -24,15 +29,20 @@ namespace { |
const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1); |
struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { |
- uint32_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
+ uint32_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
// These are from MojoCreateDataPipeOptions |
MojoCreateDataPipeOptionsFlags flags; |
uint32_t element_num_bytes; |
uint32_t capacity_num_bytes; |
- uint32_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
- uint32_t shared_memory_size; |
+ uint32_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
+ uint32_t channel_pending_read_size; |
+ uint32_t channel_pending_write_size; |
+ |
+ uint32_t shared_buffer_handle_index; |
+ uint32_t ring_buffer_start; |
+ uint32_t ring_buffer_size; |
}; |
} // namespace |
@@ -100,113 +110,421 @@ MojoResult DataPipe::ValidateCreateOptions( |
return MOJO_RESULT_OK; |
} |
-void DataPipe::StartSerialize(bool have_channel_handle, |
- bool have_shared_memory, |
- size_t* max_size, |
- size_t* max_platform_handles) { |
+DataPipe::DataPipe(const MojoCreateDataPipeOptions& options) |
+ : channel_(nullptr), |
+ options_(options), |
+ channel_released_(false), |
+ shared_buffer_(nullptr), |
+ mapping_(nullptr), |
+ ring_buffer_start_(0u), |
+ ring_buffer_size_(0u) {} |
+ |
+DataPipe::~DataPipe() {} |
+ |
+void DataPipe::Init() { |
+ CHECK(shared_buffer_); |
+ if (channel_) |
+ channel_->EnsureLazyInitialized(); |
+} |
+ |
+void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) { |
+ // We need to release the channel and get its pending reads/writes. |
+ if (!channel_released_) { |
+ Serialize(); |
+ } |
+ |
*max_size = sizeof(SerializedDataPipeHandleDispatcher); |
*max_platform_handles = 0; |
- if (have_channel_handle) |
+ if (serialized_channel_handle_.is_valid()) |
(*max_platform_handles)++; |
- if (have_shared_memory) |
+ // For shared buffer to transfer pending reads / writes. |
+ if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty()) |
(*max_platform_handles)++; |
+ if (shared_buffer_) |
+ (*max_platform_handles)++; |
+} |
+ |
+void DataPipe::Serialize() { |
+ // We need to stop watching handle immediately, even though not on IO thread, |
+ // so that other messages aren't read after this. |
+ if (channel_) { |
+ std::vector<int> fds; |
+ bool write_error = false; |
+ serialized_channel_handle_ = channel_->ReleaseHandle( |
+ &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds, |
+ &write_error); |
+ |
+ CHECK(fds.empty()); |
+ if (write_error) { |
+ serialized_channel_handle_.reset(); |
+ } |
+ channel_ = nullptr; |
+ } |
+ channel_released_ = true; |
} |
-void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, |
- ScopedPlatformHandle channel_handle, |
- ScopedPlatformHandle shared_memory_handle, |
- size_t shared_memory_size, |
- void* destination, |
+void DataPipe::EndSerialize(void* destination, |
size_t* actual_size, |
PlatformHandleVector* platform_handles) { |
+ ScopedPlatformHandle channel_shared_handle; |
+ size_t channel_shared_size = |
+ serialized_read_buffer_.size() + serialized_write_buffer_.size(); |
+ if (channel_shared_size) { |
+#if defined(OS_WIN) |
+ scoped_refptr<PlatformSharedBuffer> shared_buffer( |
+ internal::g_platform_support->CreateSharedBuffer(channel_shared_size)); |
+#else |
+ scoped_refptr<PlatformSharedBuffer> shared_buffer( |
+ internal::g_broker->CreateSharedBuffer(channel_shared_size)); |
+#endif |
+ CHECK(shared_buffer); |
+ |
+ scoped_ptr<PlatformSharedBufferMapping> mapping( |
+ shared_buffer->Map(0, channel_shared_size)); |
+ |
+ void* base = mapping->GetBase(); |
+ if (!serialized_read_buffer_.empty()) { |
+ memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size()); |
+ } |
+ if (!serialized_write_buffer_.empty()) { |
+ memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(), |
+ &serialized_write_buffer_[0], serialized_write_buffer_.size()); |
+ } |
+ |
+ channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release()); |
+ } |
+ |
SerializedDataPipeHandleDispatcher* serialization = |
static_cast<SerializedDataPipeHandleDispatcher*>(destination); |
- if (channel_handle.is_valid()) { |
+ if (serialized_channel_handle_.is_valid()) { |
DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
- serialization->platform_handle_index = |
+ serialization->channel_handle_index = |
static_cast<uint32_t>(platform_handles->size()); |
- platform_handles->push_back(channel_handle.release()); |
+ platform_handles->push_back(serialized_channel_handle_.release()); |
} else { |
- serialization->platform_handle_index = kInvalidDataPipeHandleIndex; |
+ serialization->channel_handle_index = kInvalidDataPipeHandleIndex; |
} |
- serialization->flags = options.flags; |
- serialization->element_num_bytes = options.element_num_bytes; |
- serialization->capacity_num_bytes = options.capacity_num_bytes; |
+ serialization->flags = options_.flags; |
+ serialization->element_num_bytes = options_.element_num_bytes; |
+ serialization->capacity_num_bytes = options_.capacity_num_bytes; |
+ |
+ serialization->channel_pending_read_size = |
+ static_cast<uint32_t>(serialized_read_buffer_.size()); |
+ serialization->channel_pending_write_size = |
+ static_cast<uint32_t>(serialized_write_buffer_.size()); |
- serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); |
- if (serialization->shared_memory_size) { |
+ if (channel_shared_handle.is_valid()) { |
DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
- serialization->shared_memory_handle_index = |
+ serialization->channel_shared_handle_index = |
static_cast<uint32_t>(platform_handles->size()); |
- platform_handles->push_back(shared_memory_handle.release()); |
+ platform_handles->push_back(channel_shared_handle.release()); |
} else { |
- serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; |
+ serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex; |
} |
+ ScopedPlatformHandle shared_buffer_handle; |
+ if (shared_buffer_) |
+ shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release()); |
+ |
+ if (shared_buffer_handle.is_valid()) { |
+ DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); |
+ serialization->shared_buffer_handle_index = |
+ static_cast<uint32_t>(platform_handles->size()); |
+ platform_handles->push_back(shared_buffer_handle.release()); |
+ } else { |
+ serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex; |
+ } |
+ |
+ serialization->ring_buffer_start = ring_buffer_start_; |
+ serialization->ring_buffer_size = ring_buffer_size_; |
+ |
*actual_size = sizeof(SerializedDataPipeHandleDispatcher); |
} |
-ScopedPlatformHandle DataPipe::Deserialize( |
+scoped_refptr<DataPipe> DataPipe::Deserialize( |
const void* source, |
size_t size, |
- PlatformHandleVector* platform_handles, |
- MojoCreateDataPipeOptions* options, |
- ScopedPlatformHandle* shared_memory_handle, |
- size_t* shared_memory_size) { |
+ PlatformHandleVector* platform_handles) { |
if (size != sizeof(SerializedDataPipeHandleDispatcher)) { |
LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; |
- return ScopedPlatformHandle(); |
+ return nullptr; |
} |
const SerializedDataPipeHandleDispatcher* serialization = |
static_cast<const SerializedDataPipeHandleDispatcher*>(source); |
- size_t platform_handle_index = serialization->platform_handle_index; |
- // Starts off invalid, which is what we want. |
- PlatformHandle platform_handle; |
- if (platform_handle_index != kInvalidDataPipeHandleIndex) { |
+ MojoCreateDataPipeOptions options = { |
+ sizeof(MojoCreateDataPipeOptions), serialization->flags, |
+ serialization->element_num_bytes, serialization->capacity_num_bytes}; |
+ |
+ scoped_refptr<DataPipe> data_pipe(new DataPipe(options)); |
+ data_pipe->ring_buffer_start_ = serialization->ring_buffer_start; |
+ data_pipe->ring_buffer_size_ = serialization->ring_buffer_size; |
+ |
+ // See if there's any serialised data for the RawChannel. |
+ uint32_t serialized_read_buffer_size = |
+ serialization->channel_pending_read_size; |
+ uint32_t serialized_write_buffer_size = |
+ serialization->channel_pending_write_size; |
+ uint32_t buffer_size = |
+ serialized_write_buffer_size + serialized_read_buffer_size; |
+ char* serialized_read_buffer = nullptr; |
+ char* serialized_write_buffer = nullptr; |
+ // There's serialized data to be read and put back into the RawChannel. |
+ if (buffer_size > 0) { |
+ uint32_t channel_shared_handle_index = |
+ serialization->channel_shared_handle_index; |
+ DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex); |
if (!platform_handles || |
- platform_handle_index >= platform_handles->size()) { |
- LOG(ERROR) |
- << "Invalid serialized data pipe dispatcher (missing handles)"; |
- return ScopedPlatformHandle(); |
+ channel_shared_handle_index >= platform_handles->size()) { |
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
+ return nullptr; |
+ } |
+ |
+ PlatformHandle channel_shared_handle; |
+ std::swap(channel_shared_handle, |
+ (*platform_handles)[channel_shared_handle_index]); |
+ scoped_refptr<PlatformSharedBuffer> channel_shared_buffer = |
+ internal::g_platform_support->CreateSharedBufferFromHandle( |
+ buffer_size, ScopedPlatformHandle(channel_shared_handle)); |
+ scoped_ptr<PlatformSharedBufferMapping> mapping = |
+ channel_shared_buffer->Map(0, buffer_size); |
+ |
+ serialized_read_buffer = static_cast<char*>(mapping->GetBase()); |
+ serialized_write_buffer = |
+ static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size; |
+ } |
+ |
+ // Get the channel handle for RawChannel. |
+ // Starts off invalid, which is what we want. |
+ uint32_t channel_handle_index = serialization->channel_handle_index; |
+ if (channel_handle_index != kInvalidDataPipeHandleIndex) { |
+ if (!platform_handles || channel_handle_index >= platform_handles->size()) { |
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
+ return nullptr; |
} |
// We take ownership of the handle, so we have to invalidate the one in |
// |platform_handles|. |
- std::swap(platform_handle, (*platform_handles)[platform_handle_index]); |
- } |
- |
- options->struct_size = sizeof(MojoCreateDataPipeOptions); |
- options->flags = serialization->flags; |
- options->element_num_bytes = serialization->element_num_bytes; |
- options->capacity_num_bytes = serialization->capacity_num_bytes; |
- |
- if (shared_memory_size) { |
- *shared_memory_size = serialization->shared_memory_size; |
- if (*shared_memory_size) { |
- DCHECK(serialization->shared_memory_handle_index != |
- kInvalidDataPipeHandleIndex); |
- if (!platform_handles || |
- serialization->shared_memory_handle_index >= |
- platform_handles->size()) { |
- LOG(ERROR) << "Invalid serialized data pipe dispatcher " |
- << "(missing handles)"; |
- return ScopedPlatformHandle(); |
- } |
- |
- PlatformHandle temp_shared_memory_handle; |
- std::swap(temp_shared_memory_handle, |
- (*platform_handles)[serialization->shared_memory_handle_index]); |
- *shared_memory_handle = |
- ScopedPlatformHandle(temp_shared_memory_handle); |
+ PlatformHandle channel_handle; |
+ std::swap(channel_handle, (*platform_handles)[channel_handle_index]); |
+ if (channel_handle.is_valid()) { |
+ RawChannel* channel = |
+ RawChannel::Create(ScopedPlatformHandle(channel_handle)); |
+ channel->SetSerializedData( |
+ serialized_read_buffer, serialized_read_buffer_size, |
+ serialized_write_buffer, serialized_write_buffer_size, nullptr, |
+ nullptr); |
+ data_pipe->set_channel(channel); |
} |
} |
- size -= sizeof(SerializedDataPipeHandleDispatcher); |
+ // Get the shared memory. |
+ uint32_t shared_buffer_handle_index = |
+ serialization->shared_buffer_handle_index; |
+ scoped_refptr<PlatformSharedBuffer> shared_buffer; |
+ if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) { |
+ if (!platform_handles || |
+ shared_buffer_handle_index >= platform_handles->size()) { |
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
+ return nullptr; |
+ } |
+ |
+ // Take ownership. |
+ PlatformHandle shared_buffer_handle; |
+ std::swap(shared_buffer_handle, |
+ (*platform_handles)[shared_buffer_handle_index]); |
+ shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
+ serialization->capacity_num_bytes, |
+ ScopedPlatformHandle(shared_buffer_handle)); |
+ |
+ data_pipe->set_shared_buffer(shared_buffer); |
+ } |
+ |
+ return data_pipe; |
+} |
+ |
+uint8_t* DataPipe::GetSharedBufferBase() { |
+ DCHECK(shared_buffer_); |
+ if (!mapping_) |
+ mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes); |
+ if (!mapping_) |
+ return nullptr; |
+ return static_cast<uint8_t*>(mapping_->GetBase()); |
+} |
+ |
+bool DataPipe::WriteDataIntoSharedBuffer(const void* elements, |
+ uint32_t num_bytes) { |
+ uint32_t buffer_size = options_.capacity_num_bytes; |
+ uint8_t* base = GetSharedBufferBase(); |
+ if (!base) |
+ return false; |
+ |
+ DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); |
+ DCHECK_LE(num_bytes, buffer_size - ring_buffer_size_); |
+ |
+ uint32_t ring_buffer_end = |
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size; |
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
+ uint32_t bytes_until_end = buffer_size - ring_buffer_end; |
+ memcpy(base + ring_buffer_end, elements, |
+ std::min(num_bytes, bytes_until_end)); |
+ |
+ if (bytes_until_end < num_bytes) { |
+ memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end, |
+ num_bytes - bytes_until_end); |
+ } |
+ } else { |
+ memcpy(base + ring_buffer_end, elements, num_bytes); |
+ } |
+ |
+ return true; |
+} |
+ |
+bool DataPipe::ReadDataFromSharedBuffer(void* data, uint32_t num_bytes) { |
+ DCHECK_LE(num_bytes, ring_buffer_size_); |
+ DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); |
+ |
+ uint32_t buffer_size = options_.capacity_num_bytes; |
+ uint8_t* base = GetSharedBufferBase(); |
+ if (!base) |
+ return false; |
+ |
+ uint32_t ring_buffer_end = |
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size; |
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
+ memcpy(data, base + ring_buffer_start_, num_bytes); |
+ } else { |
+ uint32_t bytes_until_end = buffer_size - ring_buffer_start_; |
+ memcpy(data, base + ring_buffer_start_, |
+ std::min(num_bytes, bytes_until_end)); |
+ |
+ if (bytes_until_end < num_bytes) { |
+ memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base, |
+ num_bytes - bytes_until_end); |
+ } |
+ } |
+ |
+ return true; |
+} |
+ |
+uint32_t DataPipe::GetReadableBytes() const { |
+ return ring_buffer_size_; |
+} |
+ |
+uint32_t DataPipe::GetWritableBytes() const { |
+ return options_.capacity_num_bytes - ring_buffer_size_; |
+} |
+ |
+void DataPipe::UpdateFromRead(uint32_t num_bytes) { |
+ ring_buffer_start_ = |
+ (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes; |
+ ring_buffer_size_ -= num_bytes; |
+ DCHECK_GE(ring_buffer_size_, 0u); |
+} |
+ |
+void DataPipe::UpdateFromWrite(uint32_t num_bytes) { |
+ ring_buffer_size_ += num_bytes; |
+ DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes); |
+} |
+ |
+bool DataPipe::NotifyWrite(uint32_t num_bytes) { |
+ if (!channel_) |
+ return false; |
+ DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes}; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
+ if (!channel_->WriteMessage(std::move(message))) { |
+ return false; |
+ } |
+ return true; |
+} |
+ |
+bool DataPipe::NotifyRead(uint32_t num_bytes) { |
+ if (!channel_) |
+ return false; |
+ DataPipeCommandHeader command = {DATA_READ, num_bytes}; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command)); |
+ if (!channel_->WriteMessage(std::move(message))) { |
+ return false; |
+ } |
+ return true; |
+} |
+ |
+void DataPipe::Shutdown() { |
+ if (channel_) { |
+ channel_->Shutdown(); |
+ channel_ = nullptr; |
+ } |
+} |
+ |
+void DataPipe::CreateEquivalentAndClose(DataPipe* out) { |
+ // Serialize, releasing the handle, otherwise we will get callbacks to the |
+ // wrong delegate for RawChannel. |
+ Serialize(); |
+ |
+ std::swap(options_, out->options_); |
+ std::swap(channel_released_, out->channel_released_); |
+ serialized_read_buffer_.swap(out->serialized_read_buffer_); |
+ serialized_write_buffer_.swap(out->serialized_write_buffer_); |
+ serialized_channel_handle_.swap(out->serialized_channel_handle_); |
+ shared_buffer_.swap(out->shared_buffer_); |
+ mapping_.swap(out->mapping_); |
+ std::swap(ring_buffer_start_, out->ring_buffer_start_); |
+ std::swap(ring_buffer_size_, out->ring_buffer_size_); |
+} |
+ |
+void* DataPipe::GetWriteBuffer(uint32_t* num_bytes) { |
+ DCHECK(shared_buffer_); |
+ // Must provide sequential memory. |
+ uint32_t buffer_size = options_.capacity_num_bytes; |
+ uint32_t ring_buffer_end = |
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size; |
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
+ *num_bytes = buffer_size - ring_buffer_end; |
+ } else { |
+ *num_bytes = ring_buffer_start_ - ring_buffer_end; |
+ } |
+ |
+ return GetSharedBufferBase() + ring_buffer_end; |
+} |
+ |
+const void* DataPipe::GetReadBuffer(uint32_t* num_bytes) { |
+ DCHECK(shared_buffer_); |
+ // Must provide sequential memory. |
+ uint32_t buffer_size = options_.capacity_num_bytes; |
+ uint32_t ring_buffer_end = |
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size; |
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { |
+ *num_bytes = ring_buffer_size_; |
+ } else { |
+ *num_bytes = buffer_size - ring_buffer_start_; |
+ } |
+ |
+ return GetSharedBufferBase() + ring_buffer_start_; |
+} |
+ |
+bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command, |
+ ScopedPlatformHandleVectorPtr platform_handles) { |
+ bool was_unreadable = GetReadableBytes() == 0; |
+ bool was_unwritable = GetWritableBytes() == 0; |
+ switch (command.command) { |
+ case DATA_WRITTEN: |
+ UpdateFromWrite(command.num_bytes); |
+ break; |
+ case DATA_READ: |
+ UpdateFromRead(command.num_bytes); |
+ break; |
+ default: |
+ LOG(ERROR) << "Unknown DataPipe command"; |
+ NOTREACHED(); |
+ } |
- return ScopedPlatformHandle(platform_handle); |
+ // Handles write/read case and shared buffer becoming available case. |
+ return (was_unreadable && GetReadableBytes()) || |
+ (was_unwritable && GetWritableBytes()); |
} |
} // namespace edk |