Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1078)

Unified Diff: mojo/edk/system/data_pipe.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/data_pipe.h ('k') | mojo/edk/system/data_pipe_consumer_dispatcher.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/data_pipe.cc
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc
index 54a6df45ae6bb2d434b1d28d2b314ac2cd2c8703..756ed64a42ac6f0e66f157fba3edc108d1cd02d5 100644
--- a/mojo/edk/system/data_pipe.cc
+++ b/mojo/edk/system/data_pipe.cc
@@ -11,6 +11,11 @@
#include <algorithm>
#include <limits>
+#include "base/bind.h"
+#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/embedder/platform_support.h"
+#include "mojo/edk/system/broker.h"
+
#include "mojo/edk/system/configuration.h"
#include "mojo/edk/system/options_validation.h"
#include "mojo/edk/system/raw_channel.h"
@@ -24,15 +29,20 @@ namespace {
const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1);
struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher {
- uint32_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
+ uint32_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
// These are from MojoCreateDataPipeOptions
MojoCreateDataPipeOptionsFlags flags;
uint32_t element_num_bytes;
uint32_t capacity_num_bytes;
- uint32_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
- uint32_t shared_memory_size;
+ uint32_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
+ uint32_t channel_pending_read_size;
+ uint32_t channel_pending_write_size;
+
+ uint32_t shared_buffer_handle_index;
+ uint32_t ring_buffer_start;
+ uint32_t ring_buffer_size;
};
} // namespace
@@ -100,113 +110,421 @@ MojoResult DataPipe::ValidateCreateOptions(
return MOJO_RESULT_OK;
}
-void DataPipe::StartSerialize(bool have_channel_handle,
- bool have_shared_memory,
- size_t* max_size,
- size_t* max_platform_handles) {
+DataPipe::DataPipe(const MojoCreateDataPipeOptions& options)
+ : channel_(nullptr),
+ options_(options),
+ channel_released_(false),
+ shared_buffer_(nullptr),
+ mapping_(nullptr),
+ ring_buffer_start_(0u),
+ ring_buffer_size_(0u) {}
+
+DataPipe::~DataPipe() {}
+
+void DataPipe::Init() {
+ CHECK(shared_buffer_);
+ if (channel_)
+ channel_->EnsureLazyInitialized();
+}
+
+void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) {
+ // We need to release the channel and get its pending reads/writes.
+ if (!channel_released_) {
+ Serialize();
+ }
+
*max_size = sizeof(SerializedDataPipeHandleDispatcher);
*max_platform_handles = 0;
- if (have_channel_handle)
+ if (serialized_channel_handle_.is_valid())
(*max_platform_handles)++;
- if (have_shared_memory)
+ // For shared buffer to transfer pending reads / writes.
+ if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty())
(*max_platform_handles)++;
+ if (shared_buffer_)
+ (*max_platform_handles)++;
+}
+
+void DataPipe::Serialize() {
+ // We need to stop watching handle immediately, even though not on IO thread,
+ // so that other messages aren't read after this.
+ if (channel_) {
+ std::vector<int> fds;
+ bool write_error = false;
+ serialized_channel_handle_ = channel_->ReleaseHandle(
+ &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds,
+ &write_error);
+
+ CHECK(fds.empty());
+ if (write_error) {
+ serialized_channel_handle_.reset();
+ }
+ channel_ = nullptr;
+ }
+ channel_released_ = true;
}
-void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options,
- ScopedPlatformHandle channel_handle,
- ScopedPlatformHandle shared_memory_handle,
- size_t shared_memory_size,
- void* destination,
+void DataPipe::EndSerialize(void* destination,
size_t* actual_size,
PlatformHandleVector* platform_handles) {
+ ScopedPlatformHandle channel_shared_handle;
+ size_t channel_shared_size =
+ serialized_read_buffer_.size() + serialized_write_buffer_.size();
+ if (channel_shared_size) {
+#if defined(OS_WIN)
+ scoped_refptr<PlatformSharedBuffer> shared_buffer(
+ internal::g_platform_support->CreateSharedBuffer(channel_shared_size));
+#else
+ scoped_refptr<PlatformSharedBuffer> shared_buffer(
+ internal::g_broker->CreateSharedBuffer(channel_shared_size));
+#endif
+ CHECK(shared_buffer);
+
+ scoped_ptr<PlatformSharedBufferMapping> mapping(
+ shared_buffer->Map(0, channel_shared_size));
+
+ void* base = mapping->GetBase();
+ if (!serialized_read_buffer_.empty()) {
+ memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size());
+ }
+ if (!serialized_write_buffer_.empty()) {
+ memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(),
+ &serialized_write_buffer_[0], serialized_write_buffer_.size());
+ }
+
+ channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release());
+ }
+
SerializedDataPipeHandleDispatcher* serialization =
static_cast<SerializedDataPipeHandleDispatcher*>(destination);
- if (channel_handle.is_valid()) {
+ if (serialized_channel_handle_.is_valid()) {
DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
- serialization->platform_handle_index =
+ serialization->channel_handle_index =
static_cast<uint32_t>(platform_handles->size());
- platform_handles->push_back(channel_handle.release());
+ platform_handles->push_back(serialized_channel_handle_.release());
} else {
- serialization->platform_handle_index = kInvalidDataPipeHandleIndex;
+ serialization->channel_handle_index = kInvalidDataPipeHandleIndex;
}
- serialization->flags = options.flags;
- serialization->element_num_bytes = options.element_num_bytes;
- serialization->capacity_num_bytes = options.capacity_num_bytes;
+ serialization->flags = options_.flags;
+ serialization->element_num_bytes = options_.element_num_bytes;
+ serialization->capacity_num_bytes = options_.capacity_num_bytes;
+
+ serialization->channel_pending_read_size =
+ static_cast<uint32_t>(serialized_read_buffer_.size());
+ serialization->channel_pending_write_size =
+ static_cast<uint32_t>(serialized_write_buffer_.size());
- serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size);
- if (serialization->shared_memory_size) {
+ if (channel_shared_handle.is_valid()) {
DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
- serialization->shared_memory_handle_index =
+ serialization->channel_shared_handle_index =
static_cast<uint32_t>(platform_handles->size());
- platform_handles->push_back(shared_memory_handle.release());
+ platform_handles->push_back(channel_shared_handle.release());
} else {
- serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex;
+ serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex;
}
+ ScopedPlatformHandle shared_buffer_handle;
+ if (shared_buffer_)
+ shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release());
+
+ if (shared_buffer_handle.is_valid()) {
+ DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
+ serialization->shared_buffer_handle_index =
+ static_cast<uint32_t>(platform_handles->size());
+ platform_handles->push_back(shared_buffer_handle.release());
+ } else {
+ serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex;
+ }
+
+ serialization->ring_buffer_start = ring_buffer_start_;
+ serialization->ring_buffer_size = ring_buffer_size_;
+
*actual_size = sizeof(SerializedDataPipeHandleDispatcher);
}
-ScopedPlatformHandle DataPipe::Deserialize(
+scoped_refptr<DataPipe> DataPipe::Deserialize(
const void* source,
size_t size,
- PlatformHandleVector* platform_handles,
- MojoCreateDataPipeOptions* options,
- ScopedPlatformHandle* shared_memory_handle,
- size_t* shared_memory_size) {
+ PlatformHandleVector* platform_handles) {
if (size != sizeof(SerializedDataPipeHandleDispatcher)) {
LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)";
- return ScopedPlatformHandle();
+ return nullptr;
}
const SerializedDataPipeHandleDispatcher* serialization =
static_cast<const SerializedDataPipeHandleDispatcher*>(source);
- size_t platform_handle_index = serialization->platform_handle_index;
- // Starts off invalid, which is what we want.
- PlatformHandle platform_handle;
- if (platform_handle_index != kInvalidDataPipeHandleIndex) {
+ MojoCreateDataPipeOptions options = {
+ sizeof(MojoCreateDataPipeOptions), serialization->flags,
+ serialization->element_num_bytes, serialization->capacity_num_bytes};
+
+ scoped_refptr<DataPipe> data_pipe(new DataPipe(options));
+ data_pipe->ring_buffer_start_ = serialization->ring_buffer_start;
+ data_pipe->ring_buffer_size_ = serialization->ring_buffer_size;
+
+ // See if there's any serialised data for the RawChannel.
+ uint32_t serialized_read_buffer_size =
+ serialization->channel_pending_read_size;
+ uint32_t serialized_write_buffer_size =
+ serialization->channel_pending_write_size;
+ uint32_t buffer_size =
+ serialized_write_buffer_size + serialized_read_buffer_size;
+ char* serialized_read_buffer = nullptr;
+ char* serialized_write_buffer = nullptr;
+ // There's serialized data to be read and put back into the RawChannel.
+ if (buffer_size > 0) {
+ uint32_t channel_shared_handle_index =
+ serialization->channel_shared_handle_index;
+ DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex);
if (!platform_handles ||
- platform_handle_index >= platform_handles->size()) {
- LOG(ERROR)
- << "Invalid serialized data pipe dispatcher (missing handles)";
- return ScopedPlatformHandle();
+ channel_shared_handle_index >= platform_handles->size()) {
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
+ return nullptr;
+ }
+
+ PlatformHandle channel_shared_handle;
+ std::swap(channel_shared_handle,
+ (*platform_handles)[channel_shared_handle_index]);
+ scoped_refptr<PlatformSharedBuffer> channel_shared_buffer =
+ internal::g_platform_support->CreateSharedBufferFromHandle(
+ buffer_size, ScopedPlatformHandle(channel_shared_handle));
+ scoped_ptr<PlatformSharedBufferMapping> mapping =
+ channel_shared_buffer->Map(0, buffer_size);
+
+ serialized_read_buffer = static_cast<char*>(mapping->GetBase());
+ serialized_write_buffer =
+ static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size;
+ }
+
+ // Get the channel handle for RawChannel.
+ // Starts off invalid, which is what we want.
+ uint32_t channel_handle_index = serialization->channel_handle_index;
+ if (channel_handle_index != kInvalidDataPipeHandleIndex) {
+ if (!platform_handles || channel_handle_index >= platform_handles->size()) {
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
+ return nullptr;
}
// We take ownership of the handle, so we have to invalidate the one in
// |platform_handles|.
- std::swap(platform_handle, (*platform_handles)[platform_handle_index]);
- }
-
- options->struct_size = sizeof(MojoCreateDataPipeOptions);
- options->flags = serialization->flags;
- options->element_num_bytes = serialization->element_num_bytes;
- options->capacity_num_bytes = serialization->capacity_num_bytes;
-
- if (shared_memory_size) {
- *shared_memory_size = serialization->shared_memory_size;
- if (*shared_memory_size) {
- DCHECK(serialization->shared_memory_handle_index !=
- kInvalidDataPipeHandleIndex);
- if (!platform_handles ||
- serialization->shared_memory_handle_index >=
- platform_handles->size()) {
- LOG(ERROR) << "Invalid serialized data pipe dispatcher "
- << "(missing handles)";
- return ScopedPlatformHandle();
- }
-
- PlatformHandle temp_shared_memory_handle;
- std::swap(temp_shared_memory_handle,
- (*platform_handles)[serialization->shared_memory_handle_index]);
- *shared_memory_handle =
- ScopedPlatformHandle(temp_shared_memory_handle);
+ PlatformHandle channel_handle;
+ std::swap(channel_handle, (*platform_handles)[channel_handle_index]);
+ if (channel_handle.is_valid()) {
+ RawChannel* channel =
+ RawChannel::Create(ScopedPlatformHandle(channel_handle));
+ channel->SetSerializedData(
+ serialized_read_buffer, serialized_read_buffer_size,
+ serialized_write_buffer, serialized_write_buffer_size, nullptr,
+ nullptr);
+ data_pipe->set_channel(channel);
}
}
- size -= sizeof(SerializedDataPipeHandleDispatcher);
+ // Get the shared memory.
+ uint32_t shared_buffer_handle_index =
+ serialization->shared_buffer_handle_index;
+ scoped_refptr<PlatformSharedBuffer> shared_buffer;
+ if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) {
+ if (!platform_handles ||
+ shared_buffer_handle_index >= platform_handles->size()) {
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
+ return nullptr;
+ }
+
+ // Take ownership.
+ PlatformHandle shared_buffer_handle;
+ std::swap(shared_buffer_handle,
+ (*platform_handles)[shared_buffer_handle_index]);
+ shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
+ serialization->capacity_num_bytes,
+ ScopedPlatformHandle(shared_buffer_handle));
+
+ data_pipe->set_shared_buffer(shared_buffer);
+ }
+
+ return data_pipe;
+}
+
+uint8_t* DataPipe::GetSharedBufferBase() {
+ DCHECK(shared_buffer_);
+ if (!mapping_)
+ mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes);
+ if (!mapping_)
+ return nullptr;
+ return static_cast<uint8_t*>(mapping_->GetBase());
+}
+
+bool DataPipe::WriteDataIntoSharedBuffer(const void* elements,
+ uint32_t num_bytes) {
+ uint32_t buffer_size = options_.capacity_num_bytes;
+ uint8_t* base = GetSharedBufferBase();
+ if (!base)
+ return false;
+
+ DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u);
+ DCHECK_LE(num_bytes, buffer_size - ring_buffer_size_);
+
+ uint32_t ring_buffer_end =
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
+ uint32_t bytes_until_end = buffer_size - ring_buffer_end;
+ memcpy(base + ring_buffer_end, elements,
+ std::min(num_bytes, bytes_until_end));
+
+ if (bytes_until_end < num_bytes) {
+ memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end,
+ num_bytes - bytes_until_end);
+ }
+ } else {
+ memcpy(base + ring_buffer_end, elements, num_bytes);
+ }
+
+ return true;
+}
+
+bool DataPipe::ReadDataFromSharedBuffer(void* data, uint32_t num_bytes) {
+ DCHECK_LE(num_bytes, ring_buffer_size_);
+ DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u);
+
+ uint32_t buffer_size = options_.capacity_num_bytes;
+ uint8_t* base = GetSharedBufferBase();
+ if (!base)
+ return false;
+
+ uint32_t ring_buffer_end =
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
+ memcpy(data, base + ring_buffer_start_, num_bytes);
+ } else {
+ uint32_t bytes_until_end = buffer_size - ring_buffer_start_;
+ memcpy(data, base + ring_buffer_start_,
+ std::min(num_bytes, bytes_until_end));
+
+ if (bytes_until_end < num_bytes) {
+ memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base,
+ num_bytes - bytes_until_end);
+ }
+ }
+
+ return true;
+}
+
+uint32_t DataPipe::GetReadableBytes() const {
+ return ring_buffer_size_;
+}
+
+uint32_t DataPipe::GetWritableBytes() const {
+ return options_.capacity_num_bytes - ring_buffer_size_;
+}
+
+void DataPipe::UpdateFromRead(uint32_t num_bytes) {
+ ring_buffer_start_ =
+ (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes;
+ ring_buffer_size_ -= num_bytes;
+ DCHECK_GE(ring_buffer_size_, 0u);
+}
+
+void DataPipe::UpdateFromWrite(uint32_t num_bytes) {
+ ring_buffer_size_ += num_bytes;
+ DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes);
+}
+
+bool DataPipe::NotifyWrite(uint32_t num_bytes) {
+ if (!channel_)
+ return false;
+ DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes};
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command));
+ if (!channel_->WriteMessage(std::move(message))) {
+ return false;
+ }
+ return true;
+}
+
+bool DataPipe::NotifyRead(uint32_t num_bytes) {
+ if (!channel_)
+ return false;
+ DataPipeCommandHeader command = {DATA_READ, num_bytes};
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command));
+ if (!channel_->WriteMessage(std::move(message))) {
+ return false;
+ }
+ return true;
+}
+
+void DataPipe::Shutdown() {
+ if (channel_) {
+ channel_->Shutdown();
+ channel_ = nullptr;
+ }
+}
+
+void DataPipe::CreateEquivalentAndClose(DataPipe* out) {
+ // Serialize, releasing the handle, otherwise we will get callbacks to the
+ // wrong delegate for RawChannel.
+ Serialize();
+
+ std::swap(options_, out->options_);
+ std::swap(channel_released_, out->channel_released_);
+ serialized_read_buffer_.swap(out->serialized_read_buffer_);
+ serialized_write_buffer_.swap(out->serialized_write_buffer_);
+ serialized_channel_handle_.swap(out->serialized_channel_handle_);
+ shared_buffer_.swap(out->shared_buffer_);
+ mapping_.swap(out->mapping_);
+ std::swap(ring_buffer_start_, out->ring_buffer_start_);
+ std::swap(ring_buffer_size_, out->ring_buffer_size_);
+}
+
+void* DataPipe::GetWriteBuffer(uint32_t* num_bytes) {
+ DCHECK(shared_buffer_);
+ // Must provide sequential memory.
+ uint32_t buffer_size = options_.capacity_num_bytes;
+ uint32_t ring_buffer_end =
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
+ *num_bytes = buffer_size - ring_buffer_end;
+ } else {
+ *num_bytes = ring_buffer_start_ - ring_buffer_end;
+ }
+
+ return GetSharedBufferBase() + ring_buffer_end;
+}
+
+const void* DataPipe::GetReadBuffer(uint32_t* num_bytes) {
+ DCHECK(shared_buffer_);
+ // Must provide sequential memory.
+ uint32_t buffer_size = options_.capacity_num_bytes;
+ uint32_t ring_buffer_end =
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
+ *num_bytes = ring_buffer_size_;
+ } else {
+ *num_bytes = buffer_size - ring_buffer_start_;
+ }
+
+ return GetSharedBufferBase() + ring_buffer_start_;
+}
+
+bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ bool was_unreadable = GetReadableBytes() == 0;
+ bool was_unwritable = GetWritableBytes() == 0;
+ switch (command.command) {
+ case DATA_WRITTEN:
+ UpdateFromWrite(command.num_bytes);
+ break;
+ case DATA_READ:
+ UpdateFromRead(command.num_bytes);
+ break;
+ default:
+ LOG(ERROR) << "Unknown DataPipe command";
+ NOTREACHED();
+ }
- return ScopedPlatformHandle(platform_handle);
+ // Handles write/read case and shared buffer becoming available case.
+ return (was_unreadable && GetReadableBytes()) ||
+ (was_unwritable && GetWritableBytes());
}
} // namespace edk
« no previous file with comments | « mojo/edk/system/data_pipe.h ('k') | mojo/edk/system/data_pipe_consumer_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698