Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(873)

Unified Diff: mojo/edk/system/data_pipe.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe.cc
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc
index 54a6df45ae6bb2d434b1d28d2b314ac2cd2c8703..9371b25de78e701487851fa98b0a5af831f77b11 100644
--- a/mojo/edk/system/data_pipe.cc
+++ b/mojo/edk/system/data_pipe.cc
@@ -11,6 +11,11 @@
#include <algorithm>
#include <limits>
+#include "base/bind.h"
+#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/embedder/platform_support.h"
+#include "mojo/edk/system/broker.h"
+
#include "mojo/edk/system/configuration.h"
#include "mojo/edk/system/options_validation.h"
#include "mojo/edk/system/raw_channel.h"
@@ -24,15 +29,20 @@ namespace {
const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1);
struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher {
- uint32_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
+ uint32_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
// These are from MojoCreateDataPipeOptions
MojoCreateDataPipeOptionsFlags flags;
uint32_t element_num_bytes;
uint32_t capacity_num_bytes;
- uint32_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
- uint32_t shared_memory_size;
+ uint32_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
+ uint32_t channel_pending_read_size;
+ uint32_t channel_pending_write_size;
+
+ uint32_t shared_buffer_handle_index;
+ uint32_t ring_buffer_start;
+ uint32_t ring_buffer_size;
};
} // namespace
@@ -100,50 +110,175 @@ MojoResult DataPipe::ValidateCreateOptions(
return MOJO_RESULT_OK;
}
-void DataPipe::StartSerialize(bool have_channel_handle,
- bool have_shared_memory,
- size_t* max_size,
- size_t* max_platform_handles) {
+DataPipe::DataPipe(const MojoCreateDataPipeOptions& options)
+ : channel_(nullptr),
+ options_(options),
+ channel_released_(false),
+ shared_buffer_(nullptr),
+ mapping_(nullptr),
+ ring_buffer_start_(0u),
+ ring_buffer_size_(0u) {}
+
+DataPipe::~DataPipe() {}
+
+void DataPipe::Init(ScopedPlatformHandle message_pipe,
Anand Mistry (off Chromium) 2016/01/11 06:19:34 s/message_pipe/channel where appropriate.
Eliot Courtney 2016/01/13 00:00:09 Done.
+ char* serialized_write_buffer,
+ uint32_t serialized_write_buffer_size,
+ char* serialized_read_buffer,
+ uint32_t serialized_read_buffer_size,
+ ScopedPlatformHandle shared_buffer_handle,
+ uint32_t ring_buffer_start,
+ uint32_t ring_buffer_size,
+ bool is_producer,
+ const base::Closure& init_callback) {
+ is_producer_ = is_producer;
+
+ ring_buffer_start_ = ring_buffer_start;
+ ring_buffer_size_ = ring_buffer_size;
+
+ if (shared_buffer_handle.is_valid()) {
+ shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle(
+ options_.capacity_num_bytes, std::move(shared_buffer_handle));
+ DCHECK(shared_buffer_);
+ } else if (is_producer_) {
+#if defined(OS_WIN)
+ shared_buffer_ = internal::g_platform_support->CreateSharedBuffer(
+ options_.capacity_num_bytes);
+#else
+ shared_buffer_ =
+ internal::g_broker->CreateSharedBuffer(options_.capacity_num_bytes);
+#endif
+ CHECK(shared_buffer_);
+ }
+
+ if (message_pipe.is_valid()) {
+ channel_ = RawChannel::Create(std::move(message_pipe));
+ channel_->SetSerializedData(serialized_read_buffer,
+ serialized_read_buffer_size,
+ serialized_write_buffer,
+ serialized_write_buffer_size, nullptr, nullptr);
+ internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback);
+ channel_->EnsureLazyInitialized();
+
+ if (is_producer_) {
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this));
+ }
+ }
+}
+
+void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) {
+ // We need to release the channel and get its pending reads/writes.
+ if (!channel_released_) {
+ Serialize();
+ }
+
*max_size = sizeof(SerializedDataPipeHandleDispatcher);
*max_platform_handles = 0;
- if (have_channel_handle)
+ if (serialized_channel_handle_.is_valid())
(*max_platform_handles)++;
- if (have_shared_memory)
+ // For shared buffer to transfer pending reads / writes.
+ if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty())
(*max_platform_handles)++;
+ if (shared_buffer_)
+ (*max_platform_handles)++;
+}
+
+void DataPipe::Serialize() {
+ // We need to stop watching handle immediately, even though not on IO thread,
+ // so that other messages aren't read after this.
+ if (channel_) {
+ std::vector<int> fds;
+ bool write_error = false;
+ serialized_channel_handle_ = channel_->ReleaseHandle(
Anand Mistry (off Chromium) 2016/01/11 06:19:34 From what I can tell, this is racy. Basically, thi
Eliot Courtney 2016/01/14 02:17:57 Discussed offline -- this is fairly impossible to
+ &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds,
+ &write_error);
+
+ CHECK(fds.empty());
+ if (write_error) {
+ serialized_channel_handle_.reset();
+ }
+ channel_ = nullptr;
+ }
+ channel_released_ = true;
}
-void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options,
- ScopedPlatformHandle channel_handle,
- ScopedPlatformHandle shared_memory_handle,
- size_t shared_memory_size,
- void* destination,
+void DataPipe::EndSerialize(void* destination,
size_t* actual_size,
PlatformHandleVector* platform_handles) {
+ ScopedPlatformHandle channel_shared_handle;
+ size_t channel_shared_size =
+ serialized_read_buffer_.size() + serialized_write_buffer_.size();
+ if (channel_shared_size) {
+#if defined(OS_WIN)
+ scoped_refptr<PlatformSharedBuffer> shared_buffer(
+ internal::g_platform_support->CreateSharedBuffer(channel_shared_size));
+#else
+ scoped_refptr<PlatformSharedBuffer> shared_buffer(
+ internal::g_broker->CreateSharedBuffer(channel_shared_size));
+#endif
+
+ scoped_ptr<PlatformSharedBufferMapping> mapping(
+ shared_buffer->Map(0, channel_shared_size));
+
+ void* base = mapping->GetBase();
+ if (!serialized_read_buffer_.empty()) {
+ memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size());
+ }
+ if (!serialized_write_buffer_.empty()) {
+ memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(),
+ &serialized_write_buffer_[0], serialized_write_buffer_.size());
+ }
+
+ channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release());
+ }
+
SerializedDataPipeHandleDispatcher* serialization =
static_cast<SerializedDataPipeHandleDispatcher*>(destination);
- if (channel_handle.is_valid()) {
+ if (serialized_channel_handle_.is_valid()) {
DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
- serialization->platform_handle_index =
+ serialization->channel_handle_index =
static_cast<uint32_t>(platform_handles->size());
- platform_handles->push_back(channel_handle.release());
+ platform_handles->push_back(serialized_channel_handle_.release());
} else {
- serialization->platform_handle_index = kInvalidDataPipeHandleIndex;
+ serialization->channel_handle_index = kInvalidDataPipeHandleIndex;
}
- serialization->flags = options.flags;
- serialization->element_num_bytes = options.element_num_bytes;
- serialization->capacity_num_bytes = options.capacity_num_bytes;
+ serialization->flags = options_.flags;
+ serialization->element_num_bytes = options_.element_num_bytes;
+ serialization->capacity_num_bytes = options_.capacity_num_bytes;
- serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size);
- if (serialization->shared_memory_size) {
+ serialization->channel_pending_read_size =
+ static_cast<uint32_t>(serialized_read_buffer_.size());
+ serialization->channel_pending_write_size =
+ static_cast<uint32_t>(serialized_write_buffer_.size());
+
+ if (channel_shared_handle.is_valid()) {
DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
- serialization->shared_memory_handle_index =
+ serialization->channel_shared_handle_index =
static_cast<uint32_t>(platform_handles->size());
- platform_handles->push_back(shared_memory_handle.release());
+ platform_handles->push_back(channel_shared_handle.release());
} else {
- serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex;
+ serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex;
+ }
+
+ ScopedPlatformHandle shared_buffer_handle;
+ if (shared_buffer_) {
+ shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release());
}
+ if (shared_buffer_handle.is_valid()) {
+ DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
+ serialization->shared_buffer_handle_index =
+ static_cast<uint32_t>(platform_handles->size());
+ platform_handles->push_back(shared_buffer_handle.release());
+ } else {
+ serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex;
+ }
+
+ serialization->ring_buffer_start = ring_buffer_start_;
+ serialization->ring_buffer_size = ring_buffer_size_;
+
*actual_size = sizeof(SerializedDataPipeHandleDispatcher);
}
@@ -152,8 +287,12 @@ ScopedPlatformHandle DataPipe::Deserialize(
size_t size,
PlatformHandleVector* platform_handles,
MojoCreateDataPipeOptions* options,
- ScopedPlatformHandle* shared_memory_handle,
- size_t* shared_memory_size) {
+ ScopedPlatformHandle* channel_shared_handle,
+ uint32_t* serialized_read_buffer_size,
+ uint32_t* serialized_write_buffer_size,
+ ScopedPlatformHandle* shared_buffer_handle,
+ uint32_t* ring_buffer_start,
+ uint32_t* ring_buffer_size) {
if (size != sizeof(SerializedDataPipeHandleDispatcher)) {
LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)";
return ScopedPlatformHandle();
@@ -161,52 +300,308 @@ ScopedPlatformHandle DataPipe::Deserialize(
const SerializedDataPipeHandleDispatcher* serialization =
static_cast<const SerializedDataPipeHandleDispatcher*>(source);
- size_t platform_handle_index = serialization->platform_handle_index;
// Starts off invalid, which is what we want.
- PlatformHandle platform_handle;
- if (platform_handle_index != kInvalidDataPipeHandleIndex) {
- if (!platform_handles ||
- platform_handle_index >= platform_handles->size()) {
- LOG(ERROR)
- << "Invalid serialized data pipe dispatcher (missing handles)";
+ PlatformHandle channel_handle;
+ uint32_t channel_handle_index = serialization->channel_handle_index;
+ if (channel_handle_index != kInvalidDataPipeHandleIndex) {
+ if (!platform_handles || channel_handle_index >= platform_handles->size()) {
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
return ScopedPlatformHandle();
}
// We take ownership of the handle, so we have to invalidate the one in
// |platform_handles|.
- std::swap(platform_handle, (*platform_handles)[platform_handle_index]);
+ std::swap(channel_handle, (*platform_handles)[channel_handle_index]);
}
options->struct_size = sizeof(MojoCreateDataPipeOptions);
options->flags = serialization->flags;
options->element_num_bytes = serialization->element_num_bytes;
options->capacity_num_bytes = serialization->capacity_num_bytes;
+ *serialized_read_buffer_size = serialization->channel_pending_read_size;
+ *serialized_write_buffer_size = serialization->channel_pending_write_size;
+ *ring_buffer_start = serialization->ring_buffer_start;
+ *ring_buffer_size = serialization->ring_buffer_size;
+
+ uint32_t channel_shared_handle_index =
+ serialization->channel_shared_handle_index;
+ if (*serialized_read_buffer_size || *serialized_write_buffer_size) {
+ DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex);
+ if (!platform_handles ||
+ channel_shared_handle_index >= platform_handles->size()) {
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
+ return ScopedPlatformHandle();
+ }
- if (shared_memory_size) {
- *shared_memory_size = serialization->shared_memory_size;
- if (*shared_memory_size) {
- DCHECK(serialization->shared_memory_handle_index !=
- kInvalidDataPipeHandleIndex);
- if (!platform_handles ||
- serialization->shared_memory_handle_index >=
- platform_handles->size()) {
- LOG(ERROR) << "Invalid serialized data pipe dispatcher "
- << "(missing handles)";
- return ScopedPlatformHandle();
- }
+ // Take ownership.
+ PlatformHandle temp_channel_shared_handle;
+ std::swap(temp_channel_shared_handle,
+ (*platform_handles)[channel_shared_handle_index]);
+ *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle);
+ }
- PlatformHandle temp_shared_memory_handle;
- std::swap(temp_shared_memory_handle,
- (*platform_handles)[serialization->shared_memory_handle_index]);
- *shared_memory_handle =
- ScopedPlatformHandle(temp_shared_memory_handle);
+ uint32_t shared_buffer_handle_index =
+ serialization->shared_buffer_handle_index;
+ if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) {
+ if (!platform_handles ||
+ shared_buffer_handle_index >= platform_handles->size()) {
+ LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
+ return ScopedPlatformHandle();
}
+
+ // Take ownership.
+ PlatformHandle temp_shared_buffer_handle;
+ std::swap(temp_shared_buffer_handle,
+ (*platform_handles)[shared_buffer_handle_index]);
+ *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle);
}
- size -= sizeof(SerializedDataPipeHandleDispatcher);
+ return ScopedPlatformHandle(channel_handle);
+}
+
+uint8_t* DataPipe::GetSharedBufferBase() {
+ if (!mapping_) {
+ DCHECK(shared_buffer_);
+ mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes);
+ }
+ if (!mapping_)
+ return nullptr;
+ return static_cast<uint8_t*>(mapping_->GetBase());
+}
+
+bool DataPipe::WriteDataIntoSharedBuffer(const void* elements,
+ uint32_t num_bytes) {
+ uint32_t buffer_size = options_.capacity_num_bytes;
+ uint8_t* base = GetSharedBufferBase();
+ if (!base)
+ return false;
+
+ DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u);
+ DCHECK_LE(num_bytes, buffer_size - ring_buffer_size_);
+
+ uint32_t ring_buffer_end =
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
+ uint32_t bytes_until_end = buffer_size - ring_buffer_end;
+ memcpy(base + ring_buffer_end, elements,
+ std::min(num_bytes, bytes_until_end));
+
+ if (bytes_until_end < num_bytes) {
+ memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end,
+ num_bytes - bytes_until_end);
+ }
+ } else {
+ memcpy(base + ring_buffer_end, elements, num_bytes);
+ }
+
+ return true;
+}
+
+bool DataPipe::ReadDataFromSharedBuffer(void* data, uint32_t num_bytes) {
+ DCHECK_LE(num_bytes, ring_buffer_size_);
+ DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u);
+
+ uint32_t buffer_size = options_.capacity_num_bytes;
+ uint8_t* base = GetSharedBufferBase();
+ if (!base)
+ return false;
+
+ uint32_t ring_buffer_end =
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
+ memcpy(data, base + ring_buffer_start_, num_bytes);
+ } else {
+ uint32_t bytes_until_end = buffer_size - ring_buffer_start_;
+ memcpy(data, base + ring_buffer_start_,
+ std::min(num_bytes, bytes_until_end));
+
+ if (bytes_until_end < num_bytes) {
+ memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base,
+ num_bytes - bytes_until_end);
+ }
+ }
+
+ return true;
+}
+
+uint32_t DataPipe::GetReadableBytes() const {
+ return shared_buffer_ ? ring_buffer_size_ : 0;
+}
+
+uint32_t DataPipe::GetWritableBytes() const {
+ return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0;
+}
+
+void DataPipe::UpdateFromRead(uint32_t num_bytes) {
+ ring_buffer_start_ =
+ (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes;
+ ring_buffer_size_ -= num_bytes;
+ DCHECK_GE(ring_buffer_size_, 0u);
+}
+
+void DataPipe::UpdateFromWrite(uint32_t num_bytes) {
+ ring_buffer_size_ += num_bytes;
+ DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes);
+}
+
+bool DataPipe::NotifyWrite(uint32_t num_bytes) {
+ DCHECK(is_producer_);
+
+ if (!channel_)
+ return false;
+
+ DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes};
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command));
+ if (!channel_->WriteMessage(std::move(message))) {
+ return false;
+ }
+ return true;
+}
+
+bool DataPipe::NotifyRead(uint32_t num_bytes) {
+ DCHECK(!is_producer_);
+
+ if (!channel_)
+ return false;
+ DataPipeCommandHeader command = {DATA_READ, num_bytes};
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command));
+ if (!channel_->WriteMessage(std::move(message))) {
+ return false;
+ }
+ return true;
+}
+
+void DataPipe::NotifySharedBuffer() {
+ if (!channel_) {
+ DLOG(ERROR) << "NotifySharedBuffer: No channel";
+ return;
+ }
+ DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER,
+ options_.capacity_num_bytes};
+ ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle();
+ DCHECK(handle.is_valid());
+ ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector());
+ fds->push_back(handle.release());
+
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(command), &command));
+ message->SetTransportData(make_scoped_ptr(new TransportData(
+ std::move(fds), channel_->GetSerializedPlatformHandleSize())));
+ if (!channel_->WriteMessage(std::move(message))) {
+ DLOG(ERROR) << "NotifySharedBuffer: Can't write";
+ }
+}
+
+void DataPipe::UpdateSharedBuffer(
+ scoped_refptr<PlatformSharedBuffer> shared_buffer) {
+ DCHECK(shared_buffer);
+ shared_buffer_ = shared_buffer;
+ mapping_ = nullptr;
+}
+
+void DataPipe::Shutdown() {
+ if (channel_) {
+ channel_->Shutdown();
+ channel_ = nullptr;
+ }
+}
+
+void DataPipe::CreateEquivalentAndClose(DataPipe* out) {
+ // Serialize, releasing the handle, otherwise we will get callbacks to the
+ // wrong delegate for RawChannel.
+ Serialize();
+
+ std::swap(options_, out->options_);
+ std::swap(channel_released_, out->channel_released_);
+ serialized_read_buffer_.swap(out->serialized_read_buffer_);
+ serialized_write_buffer_.swap(out->serialized_write_buffer_);
+ serialized_channel_handle_.swap(out->serialized_channel_handle_);
+ shared_buffer_.swap(out->shared_buffer_);
+ mapping_.swap(out->mapping_);
+ std::swap(ring_buffer_start_, out->ring_buffer_start_);
+ std::swap(ring_buffer_size_, out->ring_buffer_size_);
+ std::swap(is_producer_, out->is_producer_);
+}
+
+void* DataPipe::GetWriteBuffer(uint32_t* num_bytes) {
+ if (!shared_buffer_) {
+ *num_bytes = 0;
+ return nullptr;
+ }
+ // Must provide sequential memory.
+ uint32_t buffer_size = options_.capacity_num_bytes;
+ uint32_t ring_buffer_end =
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
+ *num_bytes = buffer_size - ring_buffer_end;
+ } else {
+ *num_bytes = ring_buffer_start_ - ring_buffer_end;
+ }
+
+ return GetSharedBufferBase() + ring_buffer_end;
+}
+
+const void* DataPipe::GetReadBuffer(uint32_t* num_bytes) {
+ if (!shared_buffer_) {
+ *num_bytes = 0;
+ return nullptr;
+ }
+ // Must provide sequential memory.
+ uint32_t buffer_size = options_.capacity_num_bytes;
+ uint32_t ring_buffer_end =
+ (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
+ if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
+ *num_bytes = ring_buffer_size_;
+ } else {
+ *num_bytes = buffer_size - ring_buffer_start_;
+ }
+
+ return GetSharedBufferBase() + ring_buffer_start_;
+}
+
+bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ PlatformHandle handle;
+ ScopedPlatformHandle scoped_handle;
+ scoped_refptr<PlatformSharedBuffer> shared_buffer;
+ bool was_unreadable = GetReadableBytes() == 0;
+ bool was_unwritable = GetWritableBytes() == 0;
+ switch (command.command) {
+ case NOTIFY_SHARED_BUFFER:
+ CHECK_EQ(platform_handles->size(), 1u);
+ std::swap(handle, (*platform_handles.get())[0]);
+ scoped_handle.reset(handle);
+ UpdateSharedBuffer(
+ internal::g_platform_support->CreateSharedBufferFromHandle(
+ command.num_bytes, std::move(scoped_handle)));
+ break;
+ case DATA_WRITTEN:
+ if (!is_producer_) {
+ UpdateFromWrite(command.num_bytes);
+ } else {
+ LOG(ERROR) << "Producer was told of a write, which shouldn't happen.";
+ DCHECK(0);
Anand Mistry (off Chromium) 2016/01/11 06:19:34 NOTREACHED();
Eliot Courtney 2016/01/13 00:00:09 Done.
+ }
+ break;
+ case DATA_READ:
+ if (is_producer_) {
+ UpdateFromRead(command.num_bytes);
+ } else {
+ LOG(ERROR) << "Consumer was told of a read, which shouldn't happen.";
+ DCHECK(0);
+ }
+ break;
+ default:
+ LOG(ERROR) << "Shouldn't happen";
+ DCHECK(0);
+ }
- return ScopedPlatformHandle(platform_handle);
+ // Handles write/read case and shared buffer becoming available case.
+ return (was_unreadable && GetReadableBytes()) ||
+ (was_unwritable && GetWritableBytes());
}
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698