Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(979)

Unified Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
index b66ffe169b48d6b28621e0691bd0ffddc1d6f3d4..f26e7e28c38c5831b7ee84e29defe2b38b9d9cd0 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -11,136 +11,99 @@
#include "base/bind.h"
#include "base/logging.h"
+#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h"
#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_shared_buffer.h"
#include "mojo/edk/embedder/platform_support.h"
#include "mojo/edk/system/configuration.h"
-#include "mojo/edk/system/data_pipe.h"
+#include "mojo/edk/system/core.h"
+#include "mojo/edk/system/data_pipe_control_message.h"
+#include "mojo/edk/system/node_controller.h"
+#include "mojo/edk/system/ports_message.h"
namespace mojo {
namespace edk {
-void DataPipeProducerDispatcher::Init(
- ScopedPlatformHandle message_pipe,
- char* serialized_write_buffer, size_t serialized_write_buffer_size) {
- if (message_pipe.is_valid()) {
- channel_ = RawChannel::Create(std::move(message_pipe));
- channel_->SetSerializedData(
- nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size,
- nullptr, nullptr);
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
- } else {
- error_ = true;
- }
-}
+namespace {
-void DataPipeProducerDispatcher::InitOnIO() {
- base::AutoLock locker(lock());
- if (channel_)
- channel_->Init(this);
-}
+struct SerializedState {
+ MojoCreateDataPipeOptions options;
+ uint64_t pipe_id;
+ bool peer_closed;
+ uint32_t write_offset;
+ uint32_t available_capacity;
+};
-void DataPipeProducerDispatcher::CloseOnIO() {
- base::AutoLock locker(lock());
- if (channel_) {
- channel_->Shutdown();
- channel_ = nullptr;
- }
-}
+} // namespace
-Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
- return Type::DATA_PIPE_PRODUCER;
-}
+// A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
+// reference to the dispatcher to ensure it lives as long as the observed port.
+class DataPipeProducerDispatcher::PortObserverThunk
+ : public NodeController::PortObserver {
+ public:
+ explicit PortObserverThunk(
+ scoped_refptr<DataPipeProducerDispatcher> dispatcher)
+ : dispatcher_(dispatcher) {}
-scoped_refptr<DataPipeProducerDispatcher>
-DataPipeProducerDispatcher::Deserialize(
- const void* source,
- size_t size,
- PlatformHandleVector* platform_handles) {
- MojoCreateDataPipeOptions options;
- ScopedPlatformHandle shared_memory_handle;
- size_t shared_memory_size = 0;
- ScopedPlatformHandle platform_handle =
- DataPipe::Deserialize(source, size, platform_handles, &options,
- &shared_memory_handle, &shared_memory_size);
-
- scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
-
- char* serialized_write_buffer = nullptr;
- size_t serialized_write_buffer_size = 0;
- scoped_refptr<PlatformSharedBuffer> shared_buffer;
- scoped_ptr<PlatformSharedBufferMapping> mapping;
- if (shared_memory_size) {
- shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
- shared_memory_size, std::move(shared_memory_handle));
- mapping = shared_buffer->Map(0, shared_memory_size);
- serialized_write_buffer = static_cast<char*>(mapping->GetBase());
- serialized_write_buffer_size = shared_memory_size;
- }
+ private:
+ ~PortObserverThunk() override {}
- rv->Init(std::move(platform_handle), serialized_write_buffer,
- serialized_write_buffer_size);
- return rv;
-}
+ // NodeController::PortObserver:
+ void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
-DataPipeProducerDispatcher::DataPipeProducerDispatcher(
- const MojoCreateDataPipeOptions& options)
- : options_(options), channel_(nullptr), error_(false), serialized_(false) {
-}
+ scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
-DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
- // See comment in ~MessagePipeDispatcher.
- if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
- channel_->Shutdown();
- else
- DCHECK(!channel_);
-}
+ DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
+};
-void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
- lock().AssertAcquired();
- awakable_list_.CancelAll();
+DataPipeProducerDispatcher::DataPipeProducerDispatcher(
+ NodeController* node_controller,
+ const ports::PortRef& control_port,
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
+ const MojoCreateDataPipeOptions& options,
+ bool initialized,
+ uint64_t pipe_id)
+ : options_(options),
+ node_controller_(node_controller),
+ control_port_(control_port),
+ pipe_id_(pipe_id),
+ shared_ring_buffer_(shared_ring_buffer),
+ available_capacity_(options_.capacity_num_bytes) {
+ if (initialized) {
+ base::AutoLock lock(lock_);
+ InitializeNoLock();
+ }
}
-void DataPipeProducerDispatcher::CloseImplNoLock() {
- lock().AssertAcquired();
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
+Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
+ return Type::DATA_PIPE_PRODUCER;
}
-scoped_refptr<Dispatcher>
-DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
- lock().AssertAcquired();
-
- SerializeInternal();
-
- scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
- serialized_write_buffer_.swap(rv->serialized_write_buffer_);
- rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
- rv->serialized_ = true;
- return scoped_refptr<Dispatcher>(rv.get());
+MojoResult DataPipeProducerDispatcher::Close() {
+ base::AutoLock lock(lock_);
+ DVLOG(1) << "Closing data pipe producer " << pipe_id_;
+ return CloseNoLock();
}
-MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
- const void* elements,
- uint32_t* num_bytes,
- MojoWriteDataFlags flags) {
- lock().AssertAcquired();
- if (InTwoPhaseWrite())
+MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
+ uint32_t* num_bytes,
+ MojoWriteDataFlags flags) {
+ base::AutoLock lock(lock_);
+ if (!shared_ring_buffer_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ if (in_two_phase_write_)
return MOJO_RESULT_BUSY;
- if (error_)
+
+ if (peer_closed_)
return MOJO_RESULT_FAILED_PRECONDITION;
+
if (*num_bytes % options_.element_num_bytes != 0)
return MOJO_RESULT_INVALID_ARGUMENT;
if (*num_bytes == 0)
return MOJO_RESULT_OK; // Nothing to do.
- // For now, we ignore options.capacity_num_bytes as a total of all pending
- // writes (and just treat it per message). We will implement that later if
- // we need to. All current uses want all their data to be sent, and it's not
- // clear that this backpressure should be done at the mojo layer or at a
- // higher application layer.
bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
if (min_num_bytes_to_write > options_.capacity_num_bytes) {
@@ -149,98 +112,135 @@ MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
return MOJO_RESULT_OUT_OF_RANGE;
}
- uint32_t num_bytes_to_write =
- std::min(*num_bytes, options_.capacity_num_bytes);
+ DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
+ uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
if (num_bytes_to_write == 0)
return MOJO_RESULT_SHOULD_WAIT;
- HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
+ HandleSignalsState old_state = GetHandleSignalsStateNoLock();
*num_bytes = num_bytes_to_write;
- WriteDataIntoMessages(elements, num_bytes_to_write);
- HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
+ CHECK(ring_buffer_mapping_);
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
+ CHECK(data);
+
+ const uint8_t* source = static_cast<const uint8_t*>(elements);
+ CHECK(source);
+
+ DCHECK_LE(write_offset_, options_.capacity_num_bytes);
+ uint32_t tail_bytes_to_write =
+ std::min(options_.capacity_num_bytes - write_offset_,
+ num_bytes_to_write);
+ uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
+
+ DCHECK_GT(tail_bytes_to_write, 0u);
+ memcpy(data + write_offset_, source, tail_bytes_to_write);
+ if (head_bytes_to_write > 0)
+ memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
+
+ DCHECK_LE(num_bytes_to_write, available_capacity_);
+ available_capacity_ -= num_bytes_to_write;
+ write_offset_ = (write_offset_ + num_bytes_to_write) %
+ options_.capacity_num_bytes;
+
+ HandleSignalsState new_state = GetHandleSignalsStateNoLock();
if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
+
+ base::AutoUnlock unlock(lock_);
+ NotifyWrite(num_bytes_to_write);
+
return MOJO_RESULT_OK;
}
-MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
+MojoResult DataPipeProducerDispatcher::BeginWriteData(
void** buffer,
uint32_t* buffer_num_bytes,
MojoWriteDataFlags flags) {
- lock().AssertAcquired();
- if (InTwoPhaseWrite())
+ base::AutoLock lock(lock_);
+ if (!shared_ring_buffer_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (in_two_phase_write_)
return MOJO_RESULT_BUSY;
- if (error_)
+ if (peer_closed_)
return MOJO_RESULT_FAILED_PRECONDITION;
- // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes.
- if (*buffer_num_bytes == 0)
- *buffer_num_bytes = options_.capacity_num_bytes;
+ if (available_capacity_ == 0) {
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
+ : MOJO_RESULT_SHOULD_WAIT;
+ }
- two_phase_data_.resize(*buffer_num_bytes);
- *buffer = &two_phase_data_[0];
+ in_two_phase_write_ = true;
+ *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
+ available_capacity_);
+ DCHECK_GT(*buffer_num_bytes, 0u);
- // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
- // we can construct a MessageInTransit here. But then we need to make
- // MessageInTransit support changing its data size later.
+ CHECK(ring_buffer_mapping_);
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
+ *buffer = data + write_offset_;
return MOJO_RESULT_OK;
}
-MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
+MojoResult DataPipeProducerDispatcher::EndWriteData(
uint32_t num_bytes_written) {
- lock().AssertAcquired();
- if (!InTwoPhaseWrite())
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ if (!in_two_phase_write_)
return MOJO_RESULT_FAILED_PRECONDITION;
+ DCHECK(shared_ring_buffer_);
+ DCHECK(ring_buffer_mapping_);
+
// Note: Allow successful completion of the two-phase write even if the other
// side has been closed.
MojoResult rv = MOJO_RESULT_OK;
- if (num_bytes_written > two_phase_data_.size() ||
- num_bytes_written % options_.element_num_bytes != 0) {
+ if (num_bytes_written > available_capacity_ ||
+ num_bytes_written % options_.element_num_bytes != 0 ||
+ write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
rv = MOJO_RESULT_INVALID_ARGUMENT;
- } else if (channel_) {
- WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
+ } else {
+ DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
+ available_capacity_ -= num_bytes_written;
+ write_offset_ = (write_offset_ + num_bytes_written) %
+ options_.capacity_num_bytes;
+
+ base::AutoUnlock unlock(lock_);
+ NotifyWrite(num_bytes_written);
}
- // Two-phase write ended even on failure.
- two_phase_data_.clear();
+ in_two_phase_write_ = false;
+
// If we're now writable, we *became* writable (since we weren't writable
// during the two-phase write), so awake producer awakables.
- HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
+ HandleSignalsState new_state = GetHandleSignalsStateNoLock();
if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
awakable_list_.AwakeForStateChange(new_state);
return rv;
}
-HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
- const {
- lock().AssertAcquired();
-
- HandleSignalsState rv;
- if (!error_) {
- if (!InTwoPhaseWrite())
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
- } else {
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
- }
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
- return rv;
+HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
+ base::AutoLock lock(lock_);
+ return GetHandleSignalsStateNoLock();
}
-MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
+MojoResult DataPipeProducerDispatcher::AddAwakable(
Awakable* awakable,
MojoHandleSignals signals,
uintptr_t context,
HandleSignalsState* signals_state) {
- lock().AssertAcquired();
- if (channel_)
- channel_->EnsureLazyInitialized();
- HandleSignalsState state = GetHandleSignalsStateImplNoLock();
+ base::AutoLock lock(lock_);
+ if (!shared_ring_buffer_ || in_transit_) {
+ if (signals_state)
+ *signals_state = HandleSignalsState();
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+ UpdateSignalsStateNoLock();
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
if (state.satisfies(signals)) {
if (signals_state)
*signals_state = state;
@@ -256,154 +256,256 @@ MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
return MOJO_RESULT_OK;
}
-void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
+void DataPipeProducerDispatcher::RemoveAwakable(
Awakable* awakable,
HandleSignalsState* signals_state) {
- lock().AssertAcquired();
+ base::AutoLock lock(lock_);
+ if ((!shared_ring_buffer_ || in_transit_) && signals_state)
+ *signals_state = HandleSignalsState();
+ else if (signals_state)
+ *signals_state = GetHandleSignalsStateNoLock();
awakable_list_.Remove(awakable);
- if (signals_state)
- *signals_state = GetHandleSignalsStateImplNoLock();
}
-void DataPipeProducerDispatcher::StartSerializeImplNoLock(
- size_t* max_size,
- size_t* max_platform_handles) {
- if (!serialized_)
- SerializeInternal();
-
- DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
- !serialized_write_buffer_.empty(), max_size,
- max_platform_handles);
+void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
+ uint32_t* num_ports,
+ uint32_t* num_handles) {
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ *num_bytes = sizeof(SerializedState);
+ *num_ports = 1;
+ *num_handles = 1;
}
-bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
+bool DataPipeProducerDispatcher::EndSerialize(
void* destination,
- size_t* actual_size,
- PlatformHandleVector* platform_handles) {
- ScopedPlatformHandle shared_memory_handle;
- size_t shared_memory_size = serialized_write_buffer_.size();
- if (shared_memory_size) {
- scoped_refptr<PlatformSharedBuffer> shared_buffer(
- internal::g_platform_support->CreateSharedBuffer(
- shared_memory_size));
- scoped_ptr<PlatformSharedBufferMapping> mapping(
- shared_buffer->Map(0, shared_memory_size));
- memcpy(mapping->GetBase(), &serialized_write_buffer_[0],
- shared_memory_size);
- shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
- }
+ ports::PortName* ports,
+ PlatformHandle* platform_handles) {
+ SerializedState* state = static_cast<SerializedState*>(destination);
+ memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
+
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ state->pipe_id = pipe_id_;
+ state->peer_closed = peer_closed_;
+ state->write_offset = write_offset_;
+ state->available_capacity = available_capacity_;
+
+ ports[0] = control_port_.name();
+
+ buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
+ platform_handles[0] = buffer_handle_for_transit_.get();
- DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
- std::move(shared_memory_handle), shared_memory_size,
- destination, actual_size, platform_handles);
- CloseImplNoLock();
return true;
}
-void DataPipeProducerDispatcher::TransportStarted() {
- started_transport_.Acquire();
+bool DataPipeProducerDispatcher::BeginTransit() {
+ base::AutoLock lock(lock_);
+ if (in_transit_)
+ return false;
+ in_transit_ = !in_two_phase_write_;
+ return in_transit_;
}
-void DataPipeProducerDispatcher::TransportEnded() {
- started_transport_.Release();
-}
+void DataPipeProducerDispatcher::CompleteTransitAndClose() {
+ node_controller_->SetPortObserver(control_port_, nullptr);
-bool DataPipeProducerDispatcher::IsBusyNoLock() const {
- lock().AssertAcquired();
- return InTwoPhaseWrite();
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ transferred_ = true;
+ in_transit_ = false;
+ ignore_result(buffer_handle_for_transit_.release());
+ CloseNoLock();
}
-void DataPipeProducerDispatcher::OnReadMessage(
- const MessageInTransit::View& message_view,
- ScopedPlatformHandleVectorPtr platform_handles) {
- CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages.";
+void DataPipeProducerDispatcher::CancelTransit() {
+ base::AutoLock lock(lock_);
+ DCHECK(in_transit_);
+ in_transit_ = false;
+ buffer_handle_for_transit_.reset();
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
}
-void DataPipeProducerDispatcher::OnError(Error error) {
- switch (error) {
- case ERROR_READ_BROKEN:
- case ERROR_READ_BAD_MESSAGE:
- case ERROR_READ_UNKNOWN:
- LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error.";
- break;
- case ERROR_READ_SHUTDOWN:
- // The other side was cleanly closed, so this isn't actually an error.
- DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)";
- break;
- case ERROR_WRITE:
- // Write errors are slightly notable: they probably shouldn't happen under
- // normal operation (but maybe the other side crashed).
- LOG(WARNING) << "DataPipeProducerDispatcher write error";
- break;
+// static
+scoped_refptr<DataPipeProducerDispatcher>
+DataPipeProducerDispatcher::Deserialize(const void* data,
+ size_t num_bytes,
+ const ports::PortName* ports,
+ size_t num_ports,
+ PlatformHandle* handles,
+ size_t num_handles) {
+ if (num_ports != 1 || num_handles != 1 ||
+ num_bytes != sizeof(SerializedState)) {
+ return nullptr;
}
- error_ = true;
- if (started_transport_.Try()) {
- base::AutoLock locker(lock());
- // We can get two OnError callbacks before the post task below completes.
- // Although RawChannel still has a pointer to this object until Shutdown is
- // called, that is safe since this class always does a PostTask to the IO
- // thread to self destruct.
- if (channel_) {
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
- channel_->Shutdown();
- channel_ = nullptr;
- }
- started_transport_.Release();
- } else {
- // We must be waiting to call ReleaseHandle. It will call Shutdown.
+ const SerializedState* state = static_cast<const SerializedState*>(data);
+
+ NodeController* node_controller = internal::g_core->GetNodeController();
+ ports::PortRef port;
+ if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
+ return nullptr;
+
+ PlatformHandle buffer_handle;
+ std::swap(buffer_handle, handles[0]);
+ scoped_refptr<PlatformSharedBuffer> ring_buffer =
+ internal::g_platform_support->CreateSharedBufferFromHandle(
+ state->options.capacity_num_bytes,
+ ScopedPlatformHandle(buffer_handle));
+ if (!ring_buffer) {
+ DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
+ return nullptr;
}
+
+ scoped_refptr<DataPipeProducerDispatcher> dispatcher =
+ new DataPipeProducerDispatcher(node_controller, port, ring_buffer,
+ state->options, false /* initialized */,
+ state->pipe_id);
+
+ {
+ base::AutoLock lock(dispatcher->lock_);
+ dispatcher->peer_closed_ = state->peer_closed;
+ dispatcher->write_offset_ = state->write_offset;
+ dispatcher->available_capacity_ = state->available_capacity;
+ dispatcher->InitializeNoLock();
+ }
+
+ return dispatcher;
}
-bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
- return !two_phase_data_.empty();
+DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
+ DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ &&
+ !ring_buffer_mapping_);
}
-bool DataPipeProducerDispatcher::WriteDataIntoMessages(
- const void* elements,
- uint32_t num_bytes) {
- // The maximum amount of data to send per message (make it a multiple of the
- // element size.
- size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
- max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
- DCHECK_GT(max_message_num_bytes, 0u);
-
- uint32_t offset = 0;
- while (offset < num_bytes) {
- uint32_t message_num_bytes =
- std::min(static_cast<uint32_t>(max_message_num_bytes),
- num_bytes - offset);
- scoped_ptr<MessageInTransit> message(new MessageInTransit(
- MessageInTransit::Type::MESSAGE, message_num_bytes,
- static_cast<const char*>(elements) + offset));
- if (!channel_->WriteMessage(std::move(message))) {
- error_ = true;
- return false;
+void DataPipeProducerDispatcher::InitializeNoLock() {
+ lock_.AssertAcquired();
+
+ if (shared_ring_buffer_) {
+ ring_buffer_mapping_ =
+ shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
+ if (!ring_buffer_mapping_) {
+ DLOG(ERROR) << "Failed to map shared buffer.";
+ shared_ring_buffer_ = nullptr;
}
+ }
- offset += message_num_bytes;
+ base::AutoUnlock unlock(lock_);
+ node_controller_->SetPortObserver(
+ control_port_,
+ make_scoped_refptr(new PortObserverThunk(this)));
+}
+
+MojoResult DataPipeProducerDispatcher::CloseNoLock() {
+ lock_.AssertAcquired();
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ is_closed_ = true;
+ ring_buffer_mapping_.reset();
+ shared_ring_buffer_ = nullptr;
+
+ awakable_list_.CancelAll();
+ if (!transferred_) {
+ base::AutoUnlock unlock(lock_);
+ node_controller_->ClosePort(control_port_);
}
- return true;
+ return MOJO_RESULT_OK;
}
-void DataPipeProducerDispatcher::SerializeInternal() {
- // We need to stop watching handle immediately, even though not on IO thread,
- // so that other messages aren't read after this.
- if (channel_) {
- std::vector<char> serialized_read_buffer;
- std::vector<int> fds;
- bool write_error = false;
- serialized_platform_handle_ = channel_->ReleaseHandle(
- &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds,
- &write_error);
- CHECK(serialized_read_buffer.empty());
- CHECK(fds.empty());
- if (write_error)
- serialized_platform_handle_.reset();
- channel_ = nullptr;
+HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
+ const {
+ lock_.AssertAcquired();
+ HandleSignalsState rv;
+ if (!peer_closed_) {
+ if (!in_two_phase_write_ && shared_ring_buffer_ &&
+ available_capacity_ > 0)
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
+ } else {
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ }
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ return rv;
+}
+
+void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
+ DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: "
+ << num_bytes << " bytes written. [control_port="
+ << control_port_.name() << "]";
+
+ SendDataPipeControlMessage(node_controller_, control_port_,
+ DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
+}
+
+void DataPipeProducerDispatcher::OnPortStatusChanged() {
+ base::AutoLock lock(lock_);
+
+ // We stop observing the control port as soon it's transferred, but this can
+ // race with events which are raised right before that happens. This is fine
+ // to ignore.
+ if (transferred_)
+ return;
+
+ DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
+
+ UpdateSignalsStateNoLock();
+}
+
+void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
+ lock_.AssertAcquired();
+
+ bool was_peer_closed = peer_closed_;
+ size_t previous_capacity = available_capacity_;
+
+ ports::PortStatus port_status;
+ if (node_controller_->node()->GetStatus(control_port_, &port_status) !=
+ ports::OK ||
+ !port_status.receiving_messages) {
+ DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
+ << " [control_port=" << control_port_.name() << "]";
+
+ peer_closed_ = true;
+ }
+
+ if (port_status.has_messages && !in_transit_) {
+ ports::ScopedMessage message;
+ do {
+ int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
+ &message);
+ if (rv != ports::OK)
+ peer_closed_ = true;
+ if (message) {
+ PortsMessage* ports_message = static_cast<PortsMessage*>(message.get());
+ const DataPipeControlMessage* m =
+ static_cast<const DataPipeControlMessage*>(
+ ports_message->payload_bytes());
+
+ if (m->command != DataPipeCommand::DATA_WAS_READ) {
+ DLOG(ERROR) << "Unexpected message from consumer.";
+ peer_closed_ = true;
+ break;
+ }
+
+ if (static_cast<size_t>(available_capacity_) + m->num_bytes >
+ options_.capacity_num_bytes) {
+ DLOG(ERROR) << "Consumer claims to have read too many bytes.";
+ break;
+ }
+
+ DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
+ << m->num_bytes << " bytes were read. [control_port="
+ << control_port_.name() << "]";
+
+ available_capacity_ += m->num_bytes;
+ }
+ } while (message);
+ }
+
+ if (peer_closed_ != was_peer_closed ||
+ available_capacity_ != previous_capacity) {
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
}
- serialized_ = true;
}
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698