Index: mojo/edk/system/data_pipe_producer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
index b66ffe169b48d6b28621e0691bd0ffddc1d6f3d4..f26e7e28c38c5831b7ee84e29defe2b38b9d9cd0 100644 |
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
@@ -11,136 +11,99 @@ |
#include "base/bind.h" |
#include "base/logging.h" |
+#include "base/memory/ref_counted.h" |
#include "base/message_loop/message_loop.h" |
#include "mojo/edk/embedder/embedder_internal.h" |
#include "mojo/edk/embedder/platform_shared_buffer.h" |
#include "mojo/edk/embedder/platform_support.h" |
#include "mojo/edk/system/configuration.h" |
-#include "mojo/edk/system/data_pipe.h" |
+#include "mojo/edk/system/core.h" |
+#include "mojo/edk/system/data_pipe_control_message.h" |
+#include "mojo/edk/system/node_controller.h" |
+#include "mojo/edk/system/ports_message.h" |
namespace mojo { |
namespace edk { |
-void DataPipeProducerDispatcher::Init( |
- ScopedPlatformHandle message_pipe, |
- char* serialized_write_buffer, size_t serialized_write_buffer_size) { |
- if (message_pipe.is_valid()) { |
- channel_ = RawChannel::Create(std::move(message_pipe)); |
- channel_->SetSerializedData( |
- nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, |
- nullptr, nullptr); |
- internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); |
- } else { |
- error_ = true; |
- } |
-} |
+namespace { |
-void DataPipeProducerDispatcher::InitOnIO() { |
- base::AutoLock locker(lock()); |
- if (channel_) |
- channel_->Init(this); |
-} |
+struct SerializedState { |
+ MojoCreateDataPipeOptions options; |
+ uint64_t pipe_id; |
+ bool peer_closed; |
+ uint32_t write_offset; |
+ uint32_t available_capacity; |
+}; |
-void DataPipeProducerDispatcher::CloseOnIO() { |
- base::AutoLock locker(lock()); |
- if (channel_) { |
- channel_->Shutdown(); |
- channel_ = nullptr; |
- } |
-} |
+} // namespace |
-Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
- return Type::DATA_PIPE_PRODUCER; |
-} |
+// A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a |
+// reference to the dispatcher to ensure it lives as long as the observed port. |
+class DataPipeProducerDispatcher::PortObserverThunk |
+ : public NodeController::PortObserver { |
+ public: |
+ explicit PortObserverThunk( |
+ scoped_refptr<DataPipeProducerDispatcher> dispatcher) |
+ : dispatcher_(dispatcher) {} |
-scoped_refptr<DataPipeProducerDispatcher> |
-DataPipeProducerDispatcher::Deserialize( |
- const void* source, |
- size_t size, |
- PlatformHandleVector* platform_handles) { |
- MojoCreateDataPipeOptions options; |
- ScopedPlatformHandle shared_memory_handle; |
- size_t shared_memory_size = 0; |
- ScopedPlatformHandle platform_handle = |
- DataPipe::Deserialize(source, size, platform_handles, &options, |
- &shared_memory_handle, &shared_memory_size); |
- |
- scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); |
- |
- char* serialized_write_buffer = nullptr; |
- size_t serialized_write_buffer_size = 0; |
- scoped_refptr<PlatformSharedBuffer> shared_buffer; |
- scoped_ptr<PlatformSharedBufferMapping> mapping; |
- if (shared_memory_size) { |
- shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
- shared_memory_size, std::move(shared_memory_handle)); |
- mapping = shared_buffer->Map(0, shared_memory_size); |
- serialized_write_buffer = static_cast<char*>(mapping->GetBase()); |
- serialized_write_buffer_size = shared_memory_size; |
- } |
+ private: |
+ ~PortObserverThunk() override {} |
- rv->Init(std::move(platform_handle), serialized_write_buffer, |
- serialized_write_buffer_size); |
- return rv; |
-} |
+ // NodeController::PortObserver: |
+ void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
-DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
- const MojoCreateDataPipeOptions& options) |
- : options_(options), channel_(nullptr), error_(false), serialized_(false) { |
-} |
+ scoped_refptr<DataPipeProducerDispatcher> dispatcher_; |
-DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
- // See comment in ~MessagePipeDispatcher. |
- if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
- channel_->Shutdown(); |
- else |
- DCHECK(!channel_); |
-} |
+ DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
+}; |
-void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { |
- lock().AssertAcquired(); |
- awakable_list_.CancelAll(); |
+DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
+ NodeController* node_controller, |
+ const ports::PortRef& control_port, |
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
+ const MojoCreateDataPipeOptions& options, |
+ bool initialized, |
+ uint64_t pipe_id) |
+ : options_(options), |
+ node_controller_(node_controller), |
+ control_port_(control_port), |
+ pipe_id_(pipe_id), |
+ shared_ring_buffer_(shared_ring_buffer), |
+ available_capacity_(options_.capacity_num_bytes) { |
+ if (initialized) { |
+ base::AutoLock lock(lock_); |
+ InitializeNoLock(); |
+ } |
} |
-void DataPipeProducerDispatcher::CloseImplNoLock() { |
- lock().AssertAcquired(); |
- internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); |
+Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
+ return Type::DATA_PIPE_PRODUCER; |
} |
-scoped_refptr<Dispatcher> |
-DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
- lock().AssertAcquired(); |
- |
- SerializeInternal(); |
- |
- scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); |
- serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
- rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); |
- rv->serialized_ = true; |
- return scoped_refptr<Dispatcher>(rv.get()); |
+MojoResult DataPipeProducerDispatcher::Close() { |
+ base::AutoLock lock(lock_); |
+ DVLOG(1) << "Closing data pipe producer " << pipe_id_; |
+ return CloseNoLock(); |
} |
-MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( |
- const void* elements, |
- uint32_t* num_bytes, |
- MojoWriteDataFlags flags) { |
- lock().AssertAcquired(); |
- if (InTwoPhaseWrite()) |
+MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
+ uint32_t* num_bytes, |
+ MojoWriteDataFlags flags) { |
+ base::AutoLock lock(lock_); |
+ if (!shared_ring_buffer_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ if (in_two_phase_write_) |
return MOJO_RESULT_BUSY; |
- if (error_) |
+ |
+ if (peer_closed_) |
return MOJO_RESULT_FAILED_PRECONDITION; |
+ |
if (*num_bytes % options_.element_num_bytes != 0) |
return MOJO_RESULT_INVALID_ARGUMENT; |
if (*num_bytes == 0) |
return MOJO_RESULT_OK; // Nothing to do. |
- // For now, we ignore options.capacity_num_bytes as a total of all pending |
- // writes (and just treat it per message). We will implement that later if |
- // we need to. All current uses want all their data to be sent, and it's not |
- // clear that this backpressure should be done at the mojo layer or at a |
- // higher application layer. |
bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; |
uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; |
if (min_num_bytes_to_write > options_.capacity_num_bytes) { |
@@ -149,98 +112,135 @@ MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( |
return MOJO_RESULT_OUT_OF_RANGE; |
} |
- uint32_t num_bytes_to_write = |
- std::min(*num_bytes, options_.capacity_num_bytes); |
+ DCHECK_LE(available_capacity_, options_.capacity_num_bytes); |
+ uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); |
if (num_bytes_to_write == 0) |
return MOJO_RESULT_SHOULD_WAIT; |
- HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
+ HandleSignalsState old_state = GetHandleSignalsStateNoLock(); |
*num_bytes = num_bytes_to_write; |
- WriteDataIntoMessages(elements, num_bytes_to_write); |
- HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
+ CHECK(ring_buffer_mapping_); |
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
+ CHECK(data); |
+ |
+ const uint8_t* source = static_cast<const uint8_t*>(elements); |
+ CHECK(source); |
+ |
+ DCHECK_LE(write_offset_, options_.capacity_num_bytes); |
+ uint32_t tail_bytes_to_write = |
+ std::min(options_.capacity_num_bytes - write_offset_, |
+ num_bytes_to_write); |
+ uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; |
+ |
+ DCHECK_GT(tail_bytes_to_write, 0u); |
+ memcpy(data + write_offset_, source, tail_bytes_to_write); |
+ if (head_bytes_to_write > 0) |
+ memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
+ |
+ DCHECK_LE(num_bytes_to_write, available_capacity_); |
+ available_capacity_ -= num_bytes_to_write; |
+ write_offset_ = (write_offset_ + num_bytes_to_write) % |
+ options_.capacity_num_bytes; |
+ |
+ HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
+ |
+ base::AutoUnlock unlock(lock_); |
+ NotifyWrite(num_bytes_to_write); |
+ |
return MOJO_RESULT_OK; |
} |
-MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( |
+MojoResult DataPipeProducerDispatcher::BeginWriteData( |
void** buffer, |
uint32_t* buffer_num_bytes, |
MojoWriteDataFlags flags) { |
- lock().AssertAcquired(); |
- if (InTwoPhaseWrite()) |
+ base::AutoLock lock(lock_); |
+ if (!shared_ring_buffer_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ if (in_two_phase_write_) |
return MOJO_RESULT_BUSY; |
- if (error_) |
+ if (peer_closed_) |
return MOJO_RESULT_FAILED_PRECONDITION; |
- // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. |
- if (*buffer_num_bytes == 0) |
- *buffer_num_bytes = options_.capacity_num_bytes; |
+ if (available_capacity_ == 0) { |
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
+ : MOJO_RESULT_SHOULD_WAIT; |
+ } |
- two_phase_data_.resize(*buffer_num_bytes); |
- *buffer = &two_phase_data_[0]; |
+ in_two_phase_write_ = true; |
+ *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_, |
+ available_capacity_); |
+ DCHECK_GT(*buffer_num_bytes, 0u); |
- // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes |
- // we can construct a MessageInTransit here. But then we need to make |
- // MessageInTransit support changing its data size later. |
+ CHECK(ring_buffer_mapping_); |
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
+ *buffer = data + write_offset_; |
return MOJO_RESULT_OK; |
} |
-MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( |
+MojoResult DataPipeProducerDispatcher::EndWriteData( |
uint32_t num_bytes_written) { |
- lock().AssertAcquired(); |
- if (!InTwoPhaseWrite()) |
+ base::AutoLock lock(lock_); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ if (!in_two_phase_write_) |
return MOJO_RESULT_FAILED_PRECONDITION; |
+ DCHECK(shared_ring_buffer_); |
+ DCHECK(ring_buffer_mapping_); |
+ |
// Note: Allow successful completion of the two-phase write even if the other |
// side has been closed. |
MojoResult rv = MOJO_RESULT_OK; |
- if (num_bytes_written > two_phase_data_.size() || |
- num_bytes_written % options_.element_num_bytes != 0) { |
+ if (num_bytes_written > available_capacity_ || |
+ num_bytes_written % options_.element_num_bytes != 0 || |
+ write_offset_ + num_bytes_written > options_.capacity_num_bytes) { |
rv = MOJO_RESULT_INVALID_ARGUMENT; |
- } else if (channel_) { |
- WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written); |
+ } else { |
+ DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes); |
+ available_capacity_ -= num_bytes_written; |
+ write_offset_ = (write_offset_ + num_bytes_written) % |
+ options_.capacity_num_bytes; |
+ |
+ base::AutoUnlock unlock(lock_); |
+ NotifyWrite(num_bytes_written); |
} |
- // Two-phase write ended even on failure. |
- two_phase_data_.clear(); |
+ in_two_phase_write_ = false; |
+ |
// If we're now writable, we *became* writable (since we weren't writable |
// during the two-phase write), so awake producer awakables. |
- HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
+ HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
awakable_list_.AwakeForStateChange(new_state); |
return rv; |
} |
-HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() |
- const { |
- lock().AssertAcquired(); |
- |
- HandleSignalsState rv; |
- if (!error_) { |
- if (!InTwoPhaseWrite()) |
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
- } else { |
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
- } |
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
- return rv; |
+HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
+ base::AutoLock lock(lock_); |
+ return GetHandleSignalsStateNoLock(); |
} |
-MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( |
+MojoResult DataPipeProducerDispatcher::AddAwakable( |
Awakable* awakable, |
MojoHandleSignals signals, |
uintptr_t context, |
HandleSignalsState* signals_state) { |
- lock().AssertAcquired(); |
- if (channel_) |
- channel_->EnsureLazyInitialized(); |
- HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
+ base::AutoLock lock(lock_); |
+ if (!shared_ring_buffer_ || in_transit_) { |
+ if (signals_state) |
+ *signals_state = HandleSignalsState(); |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ } |
+ UpdateSignalsStateNoLock(); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
if (state.satisfies(signals)) { |
if (signals_state) |
*signals_state = state; |
@@ -256,154 +256,256 @@ MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( |
return MOJO_RESULT_OK; |
} |
-void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( |
+void DataPipeProducerDispatcher::RemoveAwakable( |
Awakable* awakable, |
HandleSignalsState* signals_state) { |
- lock().AssertAcquired(); |
+ base::AutoLock lock(lock_); |
+ if ((!shared_ring_buffer_ || in_transit_) && signals_state) |
+ *signals_state = HandleSignalsState(); |
+ else if (signals_state) |
+ *signals_state = GetHandleSignalsStateNoLock(); |
awakable_list_.Remove(awakable); |
- if (signals_state) |
- *signals_state = GetHandleSignalsStateImplNoLock(); |
} |
-void DataPipeProducerDispatcher::StartSerializeImplNoLock( |
- size_t* max_size, |
- size_t* max_platform_handles) { |
- if (!serialized_) |
- SerializeInternal(); |
- |
- DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), |
- !serialized_write_buffer_.empty(), max_size, |
- max_platform_handles); |
+void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, |
+ uint32_t* num_ports, |
+ uint32_t* num_handles) { |
+ base::AutoLock lock(lock_); |
+ DCHECK(in_transit_); |
+ *num_bytes = sizeof(SerializedState); |
+ *num_ports = 1; |
+ *num_handles = 1; |
} |
-bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( |
+bool DataPipeProducerDispatcher::EndSerialize( |
void* destination, |
- size_t* actual_size, |
- PlatformHandleVector* platform_handles) { |
- ScopedPlatformHandle shared_memory_handle; |
- size_t shared_memory_size = serialized_write_buffer_.size(); |
- if (shared_memory_size) { |
- scoped_refptr<PlatformSharedBuffer> shared_buffer( |
- internal::g_platform_support->CreateSharedBuffer( |
- shared_memory_size)); |
- scoped_ptr<PlatformSharedBufferMapping> mapping( |
- shared_buffer->Map(0, shared_memory_size)); |
- memcpy(mapping->GetBase(), &serialized_write_buffer_[0], |
- shared_memory_size); |
- shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); |
- } |
+ ports::PortName* ports, |
+ PlatformHandle* platform_handles) { |
+ SerializedState* state = static_cast<SerializedState*>(destination); |
+ memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); |
+ |
+ base::AutoLock lock(lock_); |
+ DCHECK(in_transit_); |
+ state->pipe_id = pipe_id_; |
+ state->peer_closed = peer_closed_; |
+ state->write_offset = write_offset_; |
+ state->available_capacity = available_capacity_; |
+ |
+ ports[0] = control_port_.name(); |
+ |
+ buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); |
+ platform_handles[0] = buffer_handle_for_transit_.get(); |
- DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), |
- std::move(shared_memory_handle), shared_memory_size, |
- destination, actual_size, platform_handles); |
- CloseImplNoLock(); |
return true; |
} |
-void DataPipeProducerDispatcher::TransportStarted() { |
- started_transport_.Acquire(); |
+bool DataPipeProducerDispatcher::BeginTransit() { |
+ base::AutoLock lock(lock_); |
+ if (in_transit_) |
+ return false; |
+ in_transit_ = !in_two_phase_write_; |
+ return in_transit_; |
} |
-void DataPipeProducerDispatcher::TransportEnded() { |
- started_transport_.Release(); |
-} |
+void DataPipeProducerDispatcher::CompleteTransitAndClose() { |
+ node_controller_->SetPortObserver(control_port_, nullptr); |
-bool DataPipeProducerDispatcher::IsBusyNoLock() const { |
- lock().AssertAcquired(); |
- return InTwoPhaseWrite(); |
+ base::AutoLock lock(lock_); |
+ DCHECK(in_transit_); |
+ transferred_ = true; |
+ in_transit_ = false; |
+ ignore_result(buffer_handle_for_transit_.release()); |
+ CloseNoLock(); |
} |
-void DataPipeProducerDispatcher::OnReadMessage( |
- const MessageInTransit::View& message_view, |
- ScopedPlatformHandleVectorPtr platform_handles) { |
- CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; |
+void DataPipeProducerDispatcher::CancelTransit() { |
+ base::AutoLock lock(lock_); |
+ DCHECK(in_transit_); |
+ in_transit_ = false; |
+ buffer_handle_for_transit_.reset(); |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
} |
-void DataPipeProducerDispatcher::OnError(Error error) { |
- switch (error) { |
- case ERROR_READ_BROKEN: |
- case ERROR_READ_BAD_MESSAGE: |
- case ERROR_READ_UNKNOWN: |
- LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error."; |
- break; |
- case ERROR_READ_SHUTDOWN: |
- // The other side was cleanly closed, so this isn't actually an error. |
- DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; |
- break; |
- case ERROR_WRITE: |
- // Write errors are slightly notable: they probably shouldn't happen under |
- // normal operation (but maybe the other side crashed). |
- LOG(WARNING) << "DataPipeProducerDispatcher write error"; |
- break; |
+// static |
+scoped_refptr<DataPipeProducerDispatcher> |
+DataPipeProducerDispatcher::Deserialize(const void* data, |
+ size_t num_bytes, |
+ const ports::PortName* ports, |
+ size_t num_ports, |
+ PlatformHandle* handles, |
+ size_t num_handles) { |
+ if (num_ports != 1 || num_handles != 1 || |
+ num_bytes != sizeof(SerializedState)) { |
+ return nullptr; |
} |
- error_ = true; |
- if (started_transport_.Try()) { |
- base::AutoLock locker(lock()); |
- // We can get two OnError callbacks before the post task below completes. |
- // Although RawChannel still has a pointer to this object until Shutdown is |
- // called, that is safe since this class always does a PostTask to the IO |
- // thread to self destruct. |
- if (channel_) { |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
- channel_->Shutdown(); |
- channel_ = nullptr; |
- } |
- started_transport_.Release(); |
- } else { |
- // We must be waiting to call ReleaseHandle. It will call Shutdown. |
+ const SerializedState* state = static_cast<const SerializedState*>(data); |
+ |
+ NodeController* node_controller = internal::g_core->GetNodeController(); |
+ ports::PortRef port; |
+ if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) |
+ return nullptr; |
+ |
+ PlatformHandle buffer_handle; |
+ std::swap(buffer_handle, handles[0]); |
+ scoped_refptr<PlatformSharedBuffer> ring_buffer = |
+ internal::g_platform_support->CreateSharedBufferFromHandle( |
+ state->options.capacity_num_bytes, |
+ ScopedPlatformHandle(buffer_handle)); |
+ if (!ring_buffer) { |
+ DLOG(ERROR) << "Failed to deserialize shared buffer handle."; |
+ return nullptr; |
} |
+ |
+ scoped_refptr<DataPipeProducerDispatcher> dispatcher = |
+ new DataPipeProducerDispatcher(node_controller, port, ring_buffer, |
+ state->options, false /* initialized */, |
+ state->pipe_id); |
+ |
+ { |
+ base::AutoLock lock(dispatcher->lock_); |
+ dispatcher->peer_closed_ = state->peer_closed; |
+ dispatcher->write_offset_ = state->write_offset; |
+ dispatcher->available_capacity_ = state->available_capacity; |
+ dispatcher->InitializeNoLock(); |
+ } |
+ |
+ return dispatcher; |
} |
-bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { |
- return !two_phase_data_.empty(); |
+DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
+ DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ && |
+ !ring_buffer_mapping_); |
} |
-bool DataPipeProducerDispatcher::WriteDataIntoMessages( |
- const void* elements, |
- uint32_t num_bytes) { |
- // The maximum amount of data to send per message (make it a multiple of the |
- // element size. |
- size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
- max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; |
- DCHECK_GT(max_message_num_bytes, 0u); |
- |
- uint32_t offset = 0; |
- while (offset < num_bytes) { |
- uint32_t message_num_bytes = |
- std::min(static_cast<uint32_t>(max_message_num_bytes), |
- num_bytes - offset); |
- scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::Type::MESSAGE, message_num_bytes, |
- static_cast<const char*>(elements) + offset)); |
- if (!channel_->WriteMessage(std::move(message))) { |
- error_ = true; |
- return false; |
+void DataPipeProducerDispatcher::InitializeNoLock() { |
+ lock_.AssertAcquired(); |
+ |
+ if (shared_ring_buffer_) { |
+ ring_buffer_mapping_ = |
+ shared_ring_buffer_->Map(0, options_.capacity_num_bytes); |
+ if (!ring_buffer_mapping_) { |
+ DLOG(ERROR) << "Failed to map shared buffer."; |
+ shared_ring_buffer_ = nullptr; |
} |
+ } |
- offset += message_num_bytes; |
+ base::AutoUnlock unlock(lock_); |
+ node_controller_->SetPortObserver( |
+ control_port_, |
+ make_scoped_refptr(new PortObserverThunk(this))); |
+} |
+ |
+MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
+ lock_.AssertAcquired(); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ is_closed_ = true; |
+ ring_buffer_mapping_.reset(); |
+ shared_ring_buffer_ = nullptr; |
+ |
+ awakable_list_.CancelAll(); |
+ if (!transferred_) { |
+ base::AutoUnlock unlock(lock_); |
+ node_controller_->ClosePort(control_port_); |
} |
- return true; |
+ return MOJO_RESULT_OK; |
} |
-void DataPipeProducerDispatcher::SerializeInternal() { |
- // We need to stop watching handle immediately, even though not on IO thread, |
- // so that other messages aren't read after this. |
- if (channel_) { |
- std::vector<char> serialized_read_buffer; |
- std::vector<int> fds; |
- bool write_error = false; |
- serialized_platform_handle_ = channel_->ReleaseHandle( |
- &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds, |
- &write_error); |
- CHECK(serialized_read_buffer.empty()); |
- CHECK(fds.empty()); |
- if (write_error) |
- serialized_platform_handle_.reset(); |
- channel_ = nullptr; |
+HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
+ const { |
+ lock_.AssertAcquired(); |
+ HandleSignalsState rv; |
+ if (!peer_closed_) { |
+ if (!in_two_phase_write_ && shared_ring_buffer_ && |
+ available_capacity_ > 0) |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
+ } else { |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ } |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ return rv; |
+} |
+ |
+void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { |
+ DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: " |
+ << num_bytes << " bytes written. [control_port=" |
+ << control_port_.name() << "]"; |
+ |
+ SendDataPipeControlMessage(node_controller_, control_port_, |
+ DataPipeCommand::DATA_WAS_WRITTEN, num_bytes); |
+} |
+ |
+void DataPipeProducerDispatcher::OnPortStatusChanged() { |
+ base::AutoLock lock(lock_); |
+ |
+ // We stop observing the control port as soon it's transferred, but this can |
+ // race with events which are raised right before that happens. This is fine |
+ // to ignore. |
+ if (transferred_) |
+ return; |
+ |
+ DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; |
+ |
+ UpdateSignalsStateNoLock(); |
+} |
+ |
+void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { |
+ lock_.AssertAcquired(); |
+ |
+ bool was_peer_closed = peer_closed_; |
+ size_t previous_capacity = available_capacity_; |
+ |
+ ports::PortStatus port_status; |
+ if (node_controller_->node()->GetStatus(control_port_, &port_status) != |
+ ports::OK || |
+ !port_status.receiving_messages) { |
+ DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure" |
+ << " [control_port=" << control_port_.name() << "]"; |
+ |
+ peer_closed_ = true; |
+ } |
+ |
+ if (port_status.has_messages && !in_transit_) { |
+ ports::ScopedMessage message; |
+ do { |
+ int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, |
+ &message); |
+ if (rv != ports::OK) |
+ peer_closed_ = true; |
+ if (message) { |
+ PortsMessage* ports_message = static_cast<PortsMessage*>(message.get()); |
+ const DataPipeControlMessage* m = |
+ static_cast<const DataPipeControlMessage*>( |
+ ports_message->payload_bytes()); |
+ |
+ if (m->command != DataPipeCommand::DATA_WAS_READ) { |
+ DLOG(ERROR) << "Unexpected message from consumer."; |
+ peer_closed_ = true; |
+ break; |
+ } |
+ |
+ if (static_cast<size_t>(available_capacity_) + m->num_bytes > |
+ options_.capacity_num_bytes) { |
+ DLOG(ERROR) << "Consumer claims to have read too many bytes."; |
+ break; |
+ } |
+ |
+ DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that " |
+ << m->num_bytes << " bytes were read. [control_port=" |
+ << control_port_.name() << "]"; |
+ |
+ available_capacity_ += m->num_bytes; |
+ } |
+ } while (message); |
+ } |
+ |
+ if (peer_closed_ != was_peer_closed || |
+ available_capacity_ != previous_capacity) { |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
} |
- serialized_ = true; |
} |
} // namespace edk |