Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(305)

Unified Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
index 68c9e0e038875f75385c383b3bb018b985d020f2..386ab14b82e95ad1a7c2d12dfb1144ac763cc383 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -8,158 +8,92 @@
#include <stdint.h>
#include <algorithm>
+#include <limits>
#include <utility>
#include "base/bind.h"
#include "base/logging.h"
+#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h"
#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_shared_buffer.h"
#include "mojo/edk/embedder/platform_support.h"
-#include "mojo/edk/system/data_pipe.h"
+#include "mojo/edk/system/core.h"
+#include "mojo/edk/system/data_pipe_control_message.h"
+#include "mojo/edk/system/node_controller.h"
+#include "mojo/edk/system/ports_message.h"
+#include "mojo/public/c/system/data_pipe.h"
namespace mojo {
namespace edk {
-struct SharedMemoryHeader {
- uint32_t data_size;
- uint32_t read_buffer_size;
+namespace {
+
+struct MOJO_ALIGNAS(8) SerializedState {
+ MojoCreateDataPipeOptions options;
+ uint64_t pipe_id;
+ bool peer_closed;
+ uint32_t read_offset;
+ uint32_t bytes_available;
};
-void DataPipeConsumerDispatcher::Init(
- ScopedPlatformHandle message_pipe,
- char* serialized_read_buffer, size_t serialized_read_buffer_size) {
- if (message_pipe.is_valid()) {
- channel_ = RawChannel::Create(std::move(message_pipe));
- channel_->SetSerializedData(
- serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u,
- nullptr, nullptr);
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
- } else {
- // The data pipe consumer could have read all the data and the producer
- // closed its end subsequently (before the consumer was sent). In that case
- // when we deserialize the consumer we must make sure to set error_ or
- // otherwise the peer-closed signal will never be satisfied.
- error_ = true;
- }
-}
+} // namespace
-void DataPipeConsumerDispatcher::InitOnIO() {
- base::AutoLock locker(lock());
- calling_init_ = true;
- if (channel_)
- channel_->Init(this);
- calling_init_ = false;
-}
+// A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
+// reference to the dispatcher to ensure it lives as long as the observed port.
+class DataPipeConsumerDispatcher::PortObserverThunk
+ : public NodeController::PortObserver {
+ public:
+ explicit PortObserverThunk(
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
+ : dispatcher_(dispatcher) {}
-void DataPipeConsumerDispatcher::CloseOnIO() {
- base::AutoLock locker(lock());
- if (channel_) {
- channel_->Shutdown();
- channel_ = nullptr;
- }
-}
+ private:
+ ~PortObserverThunk() override {}
-Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
- return Type::DATA_PIPE_CONSUMER;
-}
+ // NodeController::PortObserver:
+ void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
-scoped_refptr<DataPipeConsumerDispatcher>
-DataPipeConsumerDispatcher::Deserialize(
- const void* source,
- size_t size,
- PlatformHandleVector* platform_handles) {
- MojoCreateDataPipeOptions options;
- ScopedPlatformHandle shared_memory_handle;
- size_t shared_memory_size = 0;
-
- ScopedPlatformHandle platform_handle =
- DataPipe::Deserialize(source, size, platform_handles, &options,
- &shared_memory_handle, &shared_memory_size);
-
- scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
-
- char* serialized_read_buffer = nullptr;
- size_t serialized_read_buffer_size = 0;
- scoped_refptr<PlatformSharedBuffer> shared_buffer;
- scoped_ptr<PlatformSharedBufferMapping> mapping;
- if (shared_memory_size) {
- shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
- shared_memory_size, std::move(shared_memory_handle));
- mapping = shared_buffer->Map(0, shared_memory_size);
- char* buffer = static_cast<char*>(mapping->GetBase());
- SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
- buffer += sizeof(SharedMemoryHeader);
- if (header->data_size) {
- rv->data_.assign(buffer, buffer + header->data_size);
- buffer += header->data_size;
- }
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
- if (header->read_buffer_size) {
- serialized_read_buffer = buffer;
- serialized_read_buffer_size = header->read_buffer_size;
- buffer += header->read_buffer_size;
- }
- }
-
- rv->Init(std::move(platform_handle), serialized_read_buffer,
- serialized_read_buffer_size);
- return rv;
-}
+ DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
+};
DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
- const MojoCreateDataPipeOptions& options)
+ NodeController* node_controller,
+ const ports::PortRef& control_port,
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
+ const MojoCreateDataPipeOptions& options,
+ bool initialized,
+ uint64_t pipe_id)
: options_(options),
- channel_(nullptr),
- calling_init_(false),
- in_two_phase_read_(false),
- two_phase_max_bytes_read_(0),
- error_(false),
- serialized_(false) {
-}
-
-DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
- // See comment in ~MessagePipeDispatcher.
- if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
- channel_->Shutdown();
- else
- DCHECK(!channel_);
+ node_controller_(node_controller),
+ control_port_(control_port),
+ pipe_id_(pipe_id),
+ shared_ring_buffer_(shared_ring_buffer) {
+ if (initialized) {
+ base::AutoLock lock(lock_);
+ InitializeNoLock();
+ }
}
-void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
- lock().AssertAcquired();
- awakable_list_.CancelAll();
+Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
+ return Type::DATA_PIPE_CONSUMER;
}
-void DataPipeConsumerDispatcher::CloseImplNoLock() {
- lock().AssertAcquired();
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this));
+MojoResult DataPipeConsumerDispatcher::Close() {
+ base::AutoLock lock(lock_);
+ DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
+ return CloseNoLock();
}
-scoped_refptr<Dispatcher>
-DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
- lock().AssertAcquired();
-
- SerializeInternal();
-
- scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
- data_.swap(rv->data_);
- serialized_read_buffer_.swap(rv->serialized_read_buffer_);
- rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
- rv->serialized_ = true;
-
- return scoped_refptr<Dispatcher>(rv.get());
-}
+MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
+ uint32_t* num_bytes,
+ MojoReadDataFlags flags) {
+ base::AutoLock lock(lock_);
+ if (!shared_ring_buffer_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
-MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
- void* elements,
- uint32_t* num_bytes,
- MojoReadDataFlags flags) {
- lock().AssertAcquired();
- if (channel_)
- channel_->EnsureLazyInitialized();
if (in_two_phase_read_)
return MOJO_RESULT_BUSY;
@@ -170,7 +104,7 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
DVLOG_IF(2, elements)
<< "Query mode: ignoring non-null |elements|";
- *num_bytes = static_cast<uint32_t>(data_.size());
+ *num_bytes = static_cast<uint32_t>(bytes_available_);
return MOJO_RESULT_OK;
}
@@ -192,32 +126,54 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
uint32_t min_num_bytes_to_read =
all_or_none ? max_num_bytes_to_read : 0;
- if (min_num_bytes_to_read > data_.size())
- return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
+ if (min_num_bytes_to_read > bytes_available_) {
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
+ : MOJO_RESULT_OUT_OF_RANGE;
+ }
- uint32_t bytes_to_read = std::min(max_num_bytes_to_read,
- static_cast<uint32_t>(data_.size()));
- if (bytes_to_read == 0)
- return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
+ uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
+ if (bytes_to_read == 0) {
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
+ : MOJO_RESULT_SHOULD_WAIT;
+ }
- if (!discard)
- memcpy(elements, &data_[0], bytes_to_read);
+ if (!discard) {
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
+ CHECK(data);
+
+ uint8_t* destination = static_cast<uint8_t*>(elements);
+ CHECK(destination);
+
+ DCHECK_LE(read_offset_, options_.capacity_num_bytes);
+ uint32_t tail_bytes_to_copy =
+ std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
+ uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
+ if (tail_bytes_to_copy > 0)
+ memcpy(destination, data + read_offset_, tail_bytes_to_copy);
+ if (head_bytes_to_copy > 0)
+ memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
+ }
*num_bytes = bytes_to_read;
bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
- if (discard || !peek)
- data_.erase(data_.begin(), data_.begin() + bytes_to_read);
+ if (discard || !peek) {
+ read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
+ bytes_available_ -= bytes_to_read;
+
+ base::AutoUnlock unlock(lock_);
+ NotifyRead(bytes_to_read);
+ }
return MOJO_RESULT_OK;
}
-MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
- const void** buffer,
- uint32_t* buffer_num_bytes,
- MojoReadDataFlags flags) {
- lock().AssertAcquired();
- if (channel_)
- channel_->EnsureLazyInitialized();
+MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
+ uint32_t* buffer_num_bytes,
+ MojoReadDataFlags flags) {
+ base::AutoLock lock(lock_);
+ if (!shared_ring_buffer_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
if (in_two_phase_read_)
return MOJO_RESULT_BUSY;
@@ -227,81 +183,82 @@ MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
(flags & MOJO_READ_DATA_FLAG_PEEK))
return MOJO_RESULT_INVALID_ARGUMENT;
- uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size());
- if (max_num_bytes_to_read == 0)
- return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
+ if (bytes_available_ == 0) {
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
+ : MOJO_RESULT_SHOULD_WAIT;
+ }
+
+ DCHECK_LT(read_offset_, options_.capacity_num_bytes);
+ uint32_t bytes_to_read = std::min(bytes_available_,
+ options_.capacity_num_bytes - read_offset_);
+
+ CHECK(ring_buffer_mapping_);
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
+ CHECK(data);
in_two_phase_read_ = true;
- *buffer = &data_[0];
- *buffer_num_bytes = max_num_bytes_to_read;
- two_phase_max_bytes_read_ = max_num_bytes_to_read;
+ *buffer = data + read_offset_;
+ *buffer_num_bytes = bytes_to_read;
+ two_phase_max_bytes_read_ = bytes_to_read;
return MOJO_RESULT_OK;
}
-MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
- uint32_t num_bytes_read) {
- lock().AssertAcquired();
+MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
+ base::AutoLock lock(lock_);
if (!in_two_phase_read_)
return MOJO_RESULT_FAILED_PRECONDITION;
- HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
+ if (in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ CHECK(shared_ring_buffer_);
+
+ HandleSignalsState old_state = GetHandleSignalsStateNoLock();
MojoResult rv;
if (num_bytes_read > two_phase_max_bytes_read_ ||
num_bytes_read % options_.element_num_bytes != 0) {
rv = MOJO_RESULT_INVALID_ARGUMENT;
} else {
rv = MOJO_RESULT_OK;
- data_.erase(data_.begin(), data_.begin() + num_bytes_read);
+ read_offset_ =
+ (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
+
+ DCHECK_GE(bytes_available_, num_bytes_read);
+ bytes_available_ -= num_bytes_read;
+
+ base::AutoUnlock unlock(lock_);
+ NotifyRead(num_bytes_read);
}
in_two_phase_read_ = false;
two_phase_max_bytes_read_ = 0;
- if (!data_received_during_two_phase_read_.empty()) {
- if (data_.empty()) {
- data_received_during_two_phase_read_.swap(data_);
- } else {
- data_.insert(data_.end(), data_received_during_two_phase_read_.begin(),
- data_received_during_two_phase_read_.end());
- data_received_during_two_phase_read_.clear();
- }
- }
- HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
+ HandleSignalsState new_state = GetHandleSignalsStateNoLock();
if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
return rv;
}
-HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
- const {
- lock().AssertAcquired();
-
- HandleSignalsState rv;
- if (!data_.empty()) {
- if (!in_two_phase_read_)
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
- } else if (!error_) {
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
- }
-
- if (error_)
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
- return rv;
+HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
+ base::AutoLock lock(lock_);
+ return GetHandleSignalsStateNoLock();
}
-MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
+MojoResult DataPipeConsumerDispatcher::AddAwakable(
Awakable* awakable,
MojoHandleSignals signals,
uintptr_t context,
HandleSignalsState* signals_state) {
- lock().AssertAcquired();
- if (channel_)
- channel_->EnsureLazyInitialized();
- HandleSignalsState state = GetHandleSignalsStateImplNoLock();
+ base::AutoLock lock(lock_);
+ if (!shared_ring_buffer_ || in_transit_) {
+ if (signals_state)
+ *signals_state = HandleSignalsState();
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+ UpdateSignalsStateNoLock();
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
if (state.satisfies(signals)) {
if (signals_state)
*signals_state = state;
@@ -317,186 +274,260 @@ MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
return MOJO_RESULT_OK;
}
-void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock(
+void DataPipeConsumerDispatcher::RemoveAwakable(
Awakable* awakable,
HandleSignalsState* signals_state) {
- lock().AssertAcquired();
+ base::AutoLock lock(lock_);
+ if ((!shared_ring_buffer_ || in_transit_) && signals_state)
+ *signals_state = HandleSignalsState();
+ else if (signals_state)
+ *signals_state = GetHandleSignalsStateNoLock();
awakable_list_.Remove(awakable);
- if (signals_state)
- *signals_state = GetHandleSignalsStateImplNoLock();
}
-void DataPipeConsumerDispatcher::StartSerializeImplNoLock(
- size_t* max_size,
- size_t* max_platform_handles) {
- if (!serialized_) {
- // Handles the case where we have messages read off RawChannel but not ready
- // by MojoReadMessage.
- SerializeInternal();
- }
-
- DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
- !data_.empty() || !serialized_read_buffer_.empty(),
- max_size, max_platform_handles);
+void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
+ uint32_t* num_ports,
+ uint32_t* num_handles) {
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
+ *num_ports = 1;
+ *num_handles = 1;
}
-bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
+bool DataPipeConsumerDispatcher::EndSerialize(
void* destination,
- size_t* actual_size,
- PlatformHandleVector* platform_handles) {
- ScopedPlatformHandle shared_memory_handle;
- size_t shared_memory_size = data_.size() + serialized_read_buffer_.size();
- if (shared_memory_size) {
- shared_memory_size += sizeof(SharedMemoryHeader);
- SharedMemoryHeader header;
- header.data_size = static_cast<uint32_t>(data_.size());
- header.read_buffer_size =
- static_cast<uint32_t>(serialized_read_buffer_.size());
-
- scoped_refptr<PlatformSharedBuffer> shared_buffer(
- internal::g_platform_support->CreateSharedBuffer(
- shared_memory_size));
- scoped_ptr<PlatformSharedBufferMapping> mapping(
- shared_buffer->Map(0, shared_memory_size));
-
- char* start = static_cast<char*>(mapping->GetBase());
- memcpy(start, &header, sizeof(SharedMemoryHeader));
- start += sizeof(SharedMemoryHeader);
-
- if (!data_.empty()) {
- memcpy(start, &data_[0], data_.size());
- start += data_.size();
- }
+ ports::PortName* ports,
+ PlatformHandle* platform_handles) {
+ SerializedState* state = static_cast<SerializedState*>(destination);
+ memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
- if (!serialized_read_buffer_.empty()) {
- memcpy(start, &serialized_read_buffer_[0],
- serialized_read_buffer_.size());
- start += serialized_read_buffer_.size();
- }
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ state->pipe_id = pipe_id_;
+ state->peer_closed = peer_closed_;
+ state->read_offset = read_offset_;
+ state->bytes_available = bytes_available_;
- shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
- }
+ ports[0] = control_port_.name();
+
+ buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
+ platform_handles[0] = buffer_handle_for_transit_.get();
- DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
- std::move(shared_memory_handle), shared_memory_size,
- destination, actual_size, platform_handles);
- CloseImplNoLock();
return true;
}
-void DataPipeConsumerDispatcher::TransportStarted() {
- started_transport_.Acquire();
+bool DataPipeConsumerDispatcher::BeginTransit() {
+ base::AutoLock lock(lock_);
+ if (in_transit_)
+ return false;
+ in_transit_ = !in_two_phase_read_;
+ return in_transit_;
}
-void DataPipeConsumerDispatcher::TransportEnded() {
- started_transport_.Release();
+void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
+ node_controller_->SetPortObserver(control_port_, nullptr);
- base::AutoLock locker(lock());
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ in_transit_ = false;
+ transferred_ = true;
+ ignore_result(buffer_handle_for_transit_.release());
+ CloseNoLock();
+}
- // If transporting of DP failed, we might have got more data and didn't awake
- // for.
- // TODO(jam): should we care about only alerting if it was empty before
- // TransportStarted?
- if (!data_.empty())
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+void DataPipeConsumerDispatcher::CancelTransit() {
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ in_transit_ = false;
+ buffer_handle_for_transit_.reset();
+ UpdateSignalsStateNoLock();
}
-bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
- lock().AssertAcquired();
- return in_two_phase_read_;
+// static
+scoped_refptr<DataPipeConsumerDispatcher>
+DataPipeConsumerDispatcher::Deserialize(const void* data,
+ size_t num_bytes,
+ const ports::PortName* ports,
+ size_t num_ports,
+ PlatformHandle* handles,
+ size_t num_handles) {
+ if (num_ports != 1 || num_handles != 1 ||
+ num_bytes != sizeof(SerializedState)) {
+ return nullptr;
+ }
+
+ const SerializedState* state = static_cast<const SerializedState*>(data);
+
+ NodeController* node_controller = internal::g_core->GetNodeController();
+ ports::PortRef port;
+ if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
+ return nullptr;
+
+ PlatformHandle buffer_handle;
+ std::swap(buffer_handle, handles[0]);
+ scoped_refptr<PlatformSharedBuffer> ring_buffer =
+ internal::g_platform_support->CreateSharedBufferFromHandle(
+ state->options.capacity_num_bytes,
+ ScopedPlatformHandle(buffer_handle));
+ if (!ring_buffer) {
+ DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
+ return nullptr;
+ }
+
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
+ new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
+ state->options, false /* initialized */,
+ state->pipe_id);
+
+ {
+ base::AutoLock lock(dispatcher->lock_);
+ dispatcher->peer_closed_ = state->peer_closed;
+ dispatcher->read_offset_ = state->read_offset;
+ dispatcher->bytes_available_ = state->bytes_available;
+ dispatcher->InitializeNoLock();
+ }
+
+ return dispatcher;
}
-void DataPipeConsumerDispatcher::OnReadMessage(
- const MessageInTransit::View& message_view,
- ScopedPlatformHandleVectorPtr platform_handles) {
- const char* bytes_start = static_cast<const char*>(message_view.bytes());
- const char* bytes_end = bytes_start + message_view.num_bytes();
- if (started_transport_.Try()) {
- // We're not in the middle of being sent.
-
- // Can get synchronously called back in Init if there was initial data.
- scoped_ptr<base::AutoLock> locker;
- if (!calling_init_) {
- locker.reset(new base::AutoLock(lock()));
- }
+DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
+ DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
+ !in_transit_);
+}
- if (in_two_phase_read_) {
- data_received_during_two_phase_read_.insert(
- data_received_during_two_phase_read_.end(), bytes_start, bytes_end);
- } else {
- bool was_empty = data_.empty();
- data_.insert(data_.end(), bytes_start, bytes_end);
- if (was_empty)
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+void DataPipeConsumerDispatcher::InitializeNoLock() {
+ lock_.AssertAcquired();
+
+ if (shared_ring_buffer_) {
+ DCHECK(!ring_buffer_mapping_);
+ ring_buffer_mapping_ =
+ shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
+ if (!ring_buffer_mapping_) {
+ DLOG(ERROR) << "Failed to map shared buffer.";
+ shared_ring_buffer_ = nullptr;
}
- started_transport_.Release();
- } else {
- // See comment in MessagePipeDispatcher about why we can't and don't need
- // to lock here.
- data_.insert(data_.end(), bytes_start, bytes_end);
}
+
+ base::AutoUnlock unlock(lock_);
+ node_controller_->SetPortObserver(
+ control_port_,
+ make_scoped_refptr(new PortObserverThunk(this)));
}
-void DataPipeConsumerDispatcher::OnError(Error error) {
- switch (error) {
- case ERROR_READ_SHUTDOWN:
- // The other side was cleanly closed, so this isn't actually an error.
- DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)";
- break;
- case ERROR_READ_BROKEN:
- LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)";
- break;
- case ERROR_READ_BAD_MESSAGE:
- // Receiving a bad message means either a bug, data corruption, or
- // malicious attack (probably due to some other bug).
- LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad "
- << "message)";
- break;
- case ERROR_READ_UNKNOWN:
- LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)";
- break;
- case ERROR_WRITE:
- LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages";
- break;
+MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
+ lock_.AssertAcquired();
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ is_closed_ = true;
+ ring_buffer_mapping_.reset();
+ shared_ring_buffer_ = nullptr;
+
+ awakable_list_.CancelAll();
+ if (!transferred_) {
+ base::AutoUnlock unlock(lock_);
+ node_controller_->ClosePort(control_port_);
}
- error_ = true;
- if (started_transport_.Try()) {
- base::AutoLock locker(lock());
- // We can get two OnError callbacks before the post task below completes.
- // Although RawChannel still has a pointer to this object until Shutdown is
- // called, that is safe since this class always does a PostTask to the IO
- // thread to self destruct.
- if (channel_) {
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
- channel_->Shutdown();
- channel_ = nullptr;
- }
- started_transport_.Release();
- } else {
- // We must be waiting to call ReleaseHandle. It will call Shutdown.
+ return MOJO_RESULT_OK;
+}
+
+HandleSignalsState
+DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
+ lock_.AssertAcquired();
+
+ HandleSignalsState rv;
+ if (shared_ring_buffer_ && bytes_available_) {
+ if (!in_two_phase_read_)
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
+ } else if (!peer_closed_ && shared_ring_buffer_) {
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
}
+
+ if (peer_closed_)
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ return rv;
}
-void DataPipeConsumerDispatcher::SerializeInternal() {
- DCHECK(!in_two_phase_read_);
- // We need to stop watching handle immediately, even though not on IO thread,
- // so that other messages aren't read after this.
- if (channel_) {
- std::vector<char> serialized_write_buffer;
- std::vector<int> fds;
- bool write_error = false;
- serialized_platform_handle_ = channel_->ReleaseHandle(
- &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds,
- &write_error);
- CHECK(serialized_write_buffer.empty());
- CHECK(fds.empty());
- CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write.";
-
- channel_ = nullptr;
+void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
+ << num_bytes << " bytes read. [control_port="
+ << control_port_.name() << "]";
+
+ SendDataPipeControlMessage(node_controller_, control_port_,
+ DataPipeCommand::DATA_WAS_READ, num_bytes);
+}
+
+void DataPipeConsumerDispatcher::OnPortStatusChanged() {
+ base::AutoLock lock(lock_);
+
+ // We stop observing the control port as soon it's transferred, but this can
+ // race with events which are raised right before that happens. This is fine
+ // to ignore.
+ if (transferred_)
+ return;
+
+ DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
+
+ UpdateSignalsStateNoLock();
+}
+
+void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
+ lock_.AssertAcquired();
+
+ bool was_peer_closed = peer_closed_;
+ size_t previous_bytes_available = bytes_available_;
+
+ ports::PortStatus port_status;
+ if (node_controller_->node()->GetStatus(control_port_, &port_status) !=
+ ports::OK ||
+ !port_status.receiving_messages) {
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
+ << " [control_port=" << control_port_.name() << "]";
+
+ peer_closed_ = true;
}
- serialized_ = true;
+ if (port_status.has_messages && !in_transit_) {
+ ports::ScopedMessage message;
+ do {
+ int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
+ &message);
+ if (rv != ports::OK)
+ peer_closed_ = true;
+ if (message) {
+ const DataPipeControlMessage* m =
+ static_cast<const DataPipeControlMessage*>(
+ message->payload_bytes());
+
+ if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
+ DLOG(ERROR) << "Unexpected control message from producer.";
+ peer_closed_ = true;
+ break;
+ }
+
+ if (static_cast<size_t>(bytes_available_) + m->num_bytes >
+ options_.capacity_num_bytes) {
+ DLOG(ERROR) << "Producer claims to have written too many bytes.";
+ peer_closed_ = true;
+ break;
+ }
+
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
+ << m->num_bytes << " bytes were written. [control_port="
+ << control_port_.name() << "]";
+
+ bytes_available_ += m->num_bytes;
+ }
+ } while (message);
+ }
+
+ if (peer_closed_ != was_peer_closed ||
+ bytes_available_ != previous_bytes_available) {
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ }
}
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698