Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
index 68c9e0e038875f75385c383b3bb018b985d020f2..386ab14b82e95ad1a7c2d12dfb1144ac763cc383 100644 |
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
@@ -8,158 +8,92 @@ |
#include <stdint.h> |
#include <algorithm> |
+#include <limits> |
#include <utility> |
#include "base/bind.h" |
#include "base/logging.h" |
+#include "base/memory/ref_counted.h" |
#include "base/message_loop/message_loop.h" |
#include "mojo/edk/embedder/embedder_internal.h" |
#include "mojo/edk/embedder/platform_shared_buffer.h" |
#include "mojo/edk/embedder/platform_support.h" |
-#include "mojo/edk/system/data_pipe.h" |
+#include "mojo/edk/system/core.h" |
+#include "mojo/edk/system/data_pipe_control_message.h" |
+#include "mojo/edk/system/node_controller.h" |
+#include "mojo/edk/system/ports_message.h" |
+#include "mojo/public/c/system/data_pipe.h" |
namespace mojo { |
namespace edk { |
-struct SharedMemoryHeader { |
- uint32_t data_size; |
- uint32_t read_buffer_size; |
+namespace { |
+ |
+struct MOJO_ALIGNAS(8) SerializedState { |
+ MojoCreateDataPipeOptions options; |
+ uint64_t pipe_id; |
+ bool peer_closed; |
+ uint32_t read_offset; |
+ uint32_t bytes_available; |
}; |
-void DataPipeConsumerDispatcher::Init( |
- ScopedPlatformHandle message_pipe, |
- char* serialized_read_buffer, size_t serialized_read_buffer_size) { |
- if (message_pipe.is_valid()) { |
- channel_ = RawChannel::Create(std::move(message_pipe)); |
- channel_->SetSerializedData( |
- serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u, |
- nullptr, nullptr); |
- internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this)); |
- } else { |
- // The data pipe consumer could have read all the data and the producer |
- // closed its end subsequently (before the consumer was sent). In that case |
- // when we deserialize the consumer we must make sure to set error_ or |
- // otherwise the peer-closed signal will never be satisfied. |
- error_ = true; |
- } |
-} |
+} // namespace |
-void DataPipeConsumerDispatcher::InitOnIO() { |
- base::AutoLock locker(lock()); |
- calling_init_ = true; |
- if (channel_) |
- channel_->Init(this); |
- calling_init_ = false; |
-} |
+// A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a |
+// reference to the dispatcher to ensure it lives as long as the observed port. |
+class DataPipeConsumerDispatcher::PortObserverThunk |
+ : public NodeController::PortObserver { |
+ public: |
+ explicit PortObserverThunk( |
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher) |
+ : dispatcher_(dispatcher) {} |
-void DataPipeConsumerDispatcher::CloseOnIO() { |
- base::AutoLock locker(lock()); |
- if (channel_) { |
- channel_->Shutdown(); |
- channel_ = nullptr; |
- } |
-} |
+ private: |
+ ~PortObserverThunk() override {} |
-Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { |
- return Type::DATA_PIPE_CONSUMER; |
-} |
+ // NodeController::PortObserver: |
+ void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
-scoped_refptr<DataPipeConsumerDispatcher> |
-DataPipeConsumerDispatcher::Deserialize( |
- const void* source, |
- size_t size, |
- PlatformHandleVector* platform_handles) { |
- MojoCreateDataPipeOptions options; |
- ScopedPlatformHandle shared_memory_handle; |
- size_t shared_memory_size = 0; |
- |
- ScopedPlatformHandle platform_handle = |
- DataPipe::Deserialize(source, size, platform_handles, &options, |
- &shared_memory_handle, &shared_memory_size); |
- |
- scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options)); |
- |
- char* serialized_read_buffer = nullptr; |
- size_t serialized_read_buffer_size = 0; |
- scoped_refptr<PlatformSharedBuffer> shared_buffer; |
- scoped_ptr<PlatformSharedBufferMapping> mapping; |
- if (shared_memory_size) { |
- shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
- shared_memory_size, std::move(shared_memory_handle)); |
- mapping = shared_buffer->Map(0, shared_memory_size); |
- char* buffer = static_cast<char*>(mapping->GetBase()); |
- SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer); |
- buffer += sizeof(SharedMemoryHeader); |
- if (header->data_size) { |
- rv->data_.assign(buffer, buffer + header->data_size); |
- buffer += header->data_size; |
- } |
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher_; |
- if (header->read_buffer_size) { |
- serialized_read_buffer = buffer; |
- serialized_read_buffer_size = header->read_buffer_size; |
- buffer += header->read_buffer_size; |
- } |
- } |
- |
- rv->Init(std::move(platform_handle), serialized_read_buffer, |
- serialized_read_buffer_size); |
- return rv; |
-} |
+ DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
+}; |
DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( |
- const MojoCreateDataPipeOptions& options) |
+ NodeController* node_controller, |
+ const ports::PortRef& control_port, |
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
+ const MojoCreateDataPipeOptions& options, |
+ bool initialized, |
+ uint64_t pipe_id) |
: options_(options), |
- channel_(nullptr), |
- calling_init_(false), |
- in_two_phase_read_(false), |
- two_phase_max_bytes_read_(0), |
- error_(false), |
- serialized_(false) { |
-} |
- |
-DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { |
- // See comment in ~MessagePipeDispatcher. |
- if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
- channel_->Shutdown(); |
- else |
- DCHECK(!channel_); |
+ node_controller_(node_controller), |
+ control_port_(control_port), |
+ pipe_id_(pipe_id), |
+ shared_ring_buffer_(shared_ring_buffer) { |
+ if (initialized) { |
+ base::AutoLock lock(lock_); |
+ InitializeNoLock(); |
+ } |
} |
-void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() { |
- lock().AssertAcquired(); |
- awakable_list_.CancelAll(); |
+Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { |
+ return Type::DATA_PIPE_CONSUMER; |
} |
-void DataPipeConsumerDispatcher::CloseImplNoLock() { |
- lock().AssertAcquired(); |
- internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this)); |
+MojoResult DataPipeConsumerDispatcher::Close() { |
+ base::AutoLock lock(lock_); |
+ DVLOG(1) << "Closing data pipe consumer " << pipe_id_; |
+ return CloseNoLock(); |
} |
-scoped_refptr<Dispatcher> |
-DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
- lock().AssertAcquired(); |
- |
- SerializeInternal(); |
- |
- scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_); |
- data_.swap(rv->data_); |
- serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
- rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); |
- rv->serialized_ = true; |
- |
- return scoped_refptr<Dispatcher>(rv.get()); |
-} |
+MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
+ uint32_t* num_bytes, |
+ MojoReadDataFlags flags) { |
+ base::AutoLock lock(lock_); |
+ if (!shared_ring_buffer_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
-MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( |
- void* elements, |
- uint32_t* num_bytes, |
- MojoReadDataFlags flags) { |
- lock().AssertAcquired(); |
- if (channel_) |
- channel_->EnsureLazyInitialized(); |
if (in_two_phase_read_) |
return MOJO_RESULT_BUSY; |
@@ -170,7 +104,7 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( |
DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. |
DVLOG_IF(2, elements) |
<< "Query mode: ignoring non-null |elements|"; |
- *num_bytes = static_cast<uint32_t>(data_.size()); |
+ *num_bytes = static_cast<uint32_t>(bytes_available_); |
return MOJO_RESULT_OK; |
} |
@@ -192,32 +126,54 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( |
uint32_t min_num_bytes_to_read = |
all_or_none ? max_num_bytes_to_read : 0; |
- if (min_num_bytes_to_read > data_.size()) |
- return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; |
+ if (min_num_bytes_to_read > bytes_available_) { |
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
+ : MOJO_RESULT_OUT_OF_RANGE; |
+ } |
- uint32_t bytes_to_read = std::min(max_num_bytes_to_read, |
- static_cast<uint32_t>(data_.size())); |
- if (bytes_to_read == 0) |
- return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; |
+ uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); |
+ if (bytes_to_read == 0) { |
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
+ : MOJO_RESULT_SHOULD_WAIT; |
+ } |
- if (!discard) |
- memcpy(elements, &data_[0], bytes_to_read); |
+ if (!discard) { |
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
+ CHECK(data); |
+ |
+ uint8_t* destination = static_cast<uint8_t*>(elements); |
+ CHECK(destination); |
+ |
+ DCHECK_LE(read_offset_, options_.capacity_num_bytes); |
+ uint32_t tail_bytes_to_copy = |
+ std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read); |
+ uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy; |
+ if (tail_bytes_to_copy > 0) |
+ memcpy(destination, data + read_offset_, tail_bytes_to_copy); |
+ if (head_bytes_to_copy > 0) |
+ memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy); |
+ } |
*num_bytes = bytes_to_read; |
bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); |
- if (discard || !peek) |
- data_.erase(data_.begin(), data_.begin() + bytes_to_read); |
+ if (discard || !peek) { |
+ read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; |
+ bytes_available_ -= bytes_to_read; |
+ |
+ base::AutoUnlock unlock(lock_); |
+ NotifyRead(bytes_to_read); |
+ } |
return MOJO_RESULT_OK; |
} |
-MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( |
- const void** buffer, |
- uint32_t* buffer_num_bytes, |
- MojoReadDataFlags flags) { |
- lock().AssertAcquired(); |
- if (channel_) |
- channel_->EnsureLazyInitialized(); |
+MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
+ uint32_t* buffer_num_bytes, |
+ MojoReadDataFlags flags) { |
+ base::AutoLock lock(lock_); |
+ if (!shared_ring_buffer_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
if (in_two_phase_read_) |
return MOJO_RESULT_BUSY; |
@@ -227,81 +183,82 @@ MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( |
(flags & MOJO_READ_DATA_FLAG_PEEK)) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size()); |
- if (max_num_bytes_to_read == 0) |
- return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; |
+ if (bytes_available_ == 0) { |
+ return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
+ : MOJO_RESULT_SHOULD_WAIT; |
+ } |
+ |
+ DCHECK_LT(read_offset_, options_.capacity_num_bytes); |
+ uint32_t bytes_to_read = std::min(bytes_available_, |
+ options_.capacity_num_bytes - read_offset_); |
+ |
+ CHECK(ring_buffer_mapping_); |
+ uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
+ CHECK(data); |
in_two_phase_read_ = true; |
- *buffer = &data_[0]; |
- *buffer_num_bytes = max_num_bytes_to_read; |
- two_phase_max_bytes_read_ = max_num_bytes_to_read; |
+ *buffer = data + read_offset_; |
+ *buffer_num_bytes = bytes_to_read; |
+ two_phase_max_bytes_read_ = bytes_to_read; |
return MOJO_RESULT_OK; |
} |
-MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( |
- uint32_t num_bytes_read) { |
- lock().AssertAcquired(); |
+MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { |
+ base::AutoLock lock(lock_); |
if (!in_two_phase_read_) |
return MOJO_RESULT_FAILED_PRECONDITION; |
- HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
+ if (in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ CHECK(shared_ring_buffer_); |
+ |
+ HandleSignalsState old_state = GetHandleSignalsStateNoLock(); |
MojoResult rv; |
if (num_bytes_read > two_phase_max_bytes_read_ || |
num_bytes_read % options_.element_num_bytes != 0) { |
rv = MOJO_RESULT_INVALID_ARGUMENT; |
} else { |
rv = MOJO_RESULT_OK; |
- data_.erase(data_.begin(), data_.begin() + num_bytes_read); |
+ read_offset_ = |
+ (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; |
+ |
+ DCHECK_GE(bytes_available_, num_bytes_read); |
+ bytes_available_ -= num_bytes_read; |
+ |
+ base::AutoUnlock unlock(lock_); |
+ NotifyRead(num_bytes_read); |
} |
in_two_phase_read_ = false; |
two_phase_max_bytes_read_ = 0; |
- if (!data_received_during_two_phase_read_.empty()) { |
- if (data_.empty()) { |
- data_received_during_two_phase_read_.swap(data_); |
- } else { |
- data_.insert(data_.end(), data_received_during_two_phase_read_.begin(), |
- data_received_during_two_phase_read_.end()); |
- data_received_during_two_phase_read_.clear(); |
- } |
- } |
- HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
+ HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
return rv; |
} |
-HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() |
- const { |
- lock().AssertAcquired(); |
- |
- HandleSignalsState rv; |
- if (!data_.empty()) { |
- if (!in_two_phase_read_) |
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
- } else if (!error_) { |
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
- } |
- |
- if (error_) |
- rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
- rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
- return rv; |
+HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
+ base::AutoLock lock(lock_); |
+ return GetHandleSignalsStateNoLock(); |
} |
-MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( |
+MojoResult DataPipeConsumerDispatcher::AddAwakable( |
Awakable* awakable, |
MojoHandleSignals signals, |
uintptr_t context, |
HandleSignalsState* signals_state) { |
- lock().AssertAcquired(); |
- if (channel_) |
- channel_->EnsureLazyInitialized(); |
- HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
+ base::AutoLock lock(lock_); |
+ if (!shared_ring_buffer_ || in_transit_) { |
+ if (signals_state) |
+ *signals_state = HandleSignalsState(); |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ } |
+ UpdateSignalsStateNoLock(); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
if (state.satisfies(signals)) { |
if (signals_state) |
*signals_state = state; |
@@ -317,186 +274,260 @@ MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( |
return MOJO_RESULT_OK; |
} |
-void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock( |
+void DataPipeConsumerDispatcher::RemoveAwakable( |
Awakable* awakable, |
HandleSignalsState* signals_state) { |
- lock().AssertAcquired(); |
+ base::AutoLock lock(lock_); |
+ if ((!shared_ring_buffer_ || in_transit_) && signals_state) |
+ *signals_state = HandleSignalsState(); |
+ else if (signals_state) |
+ *signals_state = GetHandleSignalsStateNoLock(); |
awakable_list_.Remove(awakable); |
- if (signals_state) |
- *signals_state = GetHandleSignalsStateImplNoLock(); |
} |
-void DataPipeConsumerDispatcher::StartSerializeImplNoLock( |
- size_t* max_size, |
- size_t* max_platform_handles) { |
- if (!serialized_) { |
- // Handles the case where we have messages read off RawChannel but not ready |
- // by MojoReadMessage. |
- SerializeInternal(); |
- } |
- |
- DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), |
- !data_.empty() || !serialized_read_buffer_.empty(), |
- max_size, max_platform_handles); |
+void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, |
+ uint32_t* num_ports, |
+ uint32_t* num_handles) { |
+ base::AutoLock lock(lock_); |
+ DCHECK(in_transit_); |
+ *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); |
+ *num_ports = 1; |
+ *num_handles = 1; |
} |
-bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock( |
+bool DataPipeConsumerDispatcher::EndSerialize( |
void* destination, |
- size_t* actual_size, |
- PlatformHandleVector* platform_handles) { |
- ScopedPlatformHandle shared_memory_handle; |
- size_t shared_memory_size = data_.size() + serialized_read_buffer_.size(); |
- if (shared_memory_size) { |
- shared_memory_size += sizeof(SharedMemoryHeader); |
- SharedMemoryHeader header; |
- header.data_size = static_cast<uint32_t>(data_.size()); |
- header.read_buffer_size = |
- static_cast<uint32_t>(serialized_read_buffer_.size()); |
- |
- scoped_refptr<PlatformSharedBuffer> shared_buffer( |
- internal::g_platform_support->CreateSharedBuffer( |
- shared_memory_size)); |
- scoped_ptr<PlatformSharedBufferMapping> mapping( |
- shared_buffer->Map(0, shared_memory_size)); |
- |
- char* start = static_cast<char*>(mapping->GetBase()); |
- memcpy(start, &header, sizeof(SharedMemoryHeader)); |
- start += sizeof(SharedMemoryHeader); |
- |
- if (!data_.empty()) { |
- memcpy(start, &data_[0], data_.size()); |
- start += data_.size(); |
- } |
+ ports::PortName* ports, |
+ PlatformHandle* platform_handles) { |
+ SerializedState* state = static_cast<SerializedState*>(destination); |
+ memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); |
- if (!serialized_read_buffer_.empty()) { |
- memcpy(start, &serialized_read_buffer_[0], |
- serialized_read_buffer_.size()); |
- start += serialized_read_buffer_.size(); |
- } |
+ base::AutoLock lock(lock_); |
+ DCHECK(in_transit_); |
+ state->pipe_id = pipe_id_; |
+ state->peer_closed = peer_closed_; |
+ state->read_offset = read_offset_; |
+ state->bytes_available = bytes_available_; |
- shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); |
- } |
+ ports[0] = control_port_.name(); |
+ |
+ buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); |
+ platform_handles[0] = buffer_handle_for_transit_.get(); |
- DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), |
- std::move(shared_memory_handle), shared_memory_size, |
- destination, actual_size, platform_handles); |
- CloseImplNoLock(); |
return true; |
} |
-void DataPipeConsumerDispatcher::TransportStarted() { |
- started_transport_.Acquire(); |
+bool DataPipeConsumerDispatcher::BeginTransit() { |
+ base::AutoLock lock(lock_); |
+ if (in_transit_) |
+ return false; |
+ in_transit_ = !in_two_phase_read_; |
+ return in_transit_; |
} |
-void DataPipeConsumerDispatcher::TransportEnded() { |
- started_transport_.Release(); |
+void DataPipeConsumerDispatcher::CompleteTransitAndClose() { |
+ node_controller_->SetPortObserver(control_port_, nullptr); |
- base::AutoLock locker(lock()); |
+ base::AutoLock lock(lock_); |
+ DCHECK(in_transit_); |
+ in_transit_ = false; |
+ transferred_ = true; |
+ ignore_result(buffer_handle_for_transit_.release()); |
+ CloseNoLock(); |
+} |
- // If transporting of DP failed, we might have got more data and didn't awake |
- // for. |
- // TODO(jam): should we care about only alerting if it was empty before |
- // TransportStarted? |
- if (!data_.empty()) |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+void DataPipeConsumerDispatcher::CancelTransit() { |
+ base::AutoLock lock(lock_); |
+ DCHECK(in_transit_); |
+ in_transit_ = false; |
+ buffer_handle_for_transit_.reset(); |
+ UpdateSignalsStateNoLock(); |
} |
-bool DataPipeConsumerDispatcher::IsBusyNoLock() const { |
- lock().AssertAcquired(); |
- return in_two_phase_read_; |
+// static |
+scoped_refptr<DataPipeConsumerDispatcher> |
+DataPipeConsumerDispatcher::Deserialize(const void* data, |
+ size_t num_bytes, |
+ const ports::PortName* ports, |
+ size_t num_ports, |
+ PlatformHandle* handles, |
+ size_t num_handles) { |
+ if (num_ports != 1 || num_handles != 1 || |
+ num_bytes != sizeof(SerializedState)) { |
+ return nullptr; |
+ } |
+ |
+ const SerializedState* state = static_cast<const SerializedState*>(data); |
+ |
+ NodeController* node_controller = internal::g_core->GetNodeController(); |
+ ports::PortRef port; |
+ if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) |
+ return nullptr; |
+ |
+ PlatformHandle buffer_handle; |
+ std::swap(buffer_handle, handles[0]); |
+ scoped_refptr<PlatformSharedBuffer> ring_buffer = |
+ internal::g_platform_support->CreateSharedBufferFromHandle( |
+ state->options.capacity_num_bytes, |
+ ScopedPlatformHandle(buffer_handle)); |
+ if (!ring_buffer) { |
+ DLOG(ERROR) << "Failed to deserialize shared buffer handle."; |
+ return nullptr; |
+ } |
+ |
+ scoped_refptr<DataPipeConsumerDispatcher> dispatcher = |
+ new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, |
+ state->options, false /* initialized */, |
+ state->pipe_id); |
+ |
+ { |
+ base::AutoLock lock(dispatcher->lock_); |
+ dispatcher->peer_closed_ = state->peer_closed; |
+ dispatcher->read_offset_ = state->read_offset; |
+ dispatcher->bytes_available_ = state->bytes_available; |
+ dispatcher->InitializeNoLock(); |
+ } |
+ |
+ return dispatcher; |
} |
-void DataPipeConsumerDispatcher::OnReadMessage( |
- const MessageInTransit::View& message_view, |
- ScopedPlatformHandleVectorPtr platform_handles) { |
- const char* bytes_start = static_cast<const char*>(message_view.bytes()); |
- const char* bytes_end = bytes_start + message_view.num_bytes(); |
- if (started_transport_.Try()) { |
- // We're not in the middle of being sent. |
- |
- // Can get synchronously called back in Init if there was initial data. |
- scoped_ptr<base::AutoLock> locker; |
- if (!calling_init_) { |
- locker.reset(new base::AutoLock(lock())); |
- } |
+DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { |
+ DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && |
+ !in_transit_); |
+} |
- if (in_two_phase_read_) { |
- data_received_during_two_phase_read_.insert( |
- data_received_during_two_phase_read_.end(), bytes_start, bytes_end); |
- } else { |
- bool was_empty = data_.empty(); |
- data_.insert(data_.end(), bytes_start, bytes_end); |
- if (was_empty) |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+void DataPipeConsumerDispatcher::InitializeNoLock() { |
+ lock_.AssertAcquired(); |
+ |
+ if (shared_ring_buffer_) { |
+ DCHECK(!ring_buffer_mapping_); |
+ ring_buffer_mapping_ = |
+ shared_ring_buffer_->Map(0, options_.capacity_num_bytes); |
+ if (!ring_buffer_mapping_) { |
+ DLOG(ERROR) << "Failed to map shared buffer."; |
+ shared_ring_buffer_ = nullptr; |
} |
- started_transport_.Release(); |
- } else { |
- // See comment in MessagePipeDispatcher about why we can't and don't need |
- // to lock here. |
- data_.insert(data_.end(), bytes_start, bytes_end); |
} |
+ |
+ base::AutoUnlock unlock(lock_); |
+ node_controller_->SetPortObserver( |
+ control_port_, |
+ make_scoped_refptr(new PortObserverThunk(this))); |
} |
-void DataPipeConsumerDispatcher::OnError(Error error) { |
- switch (error) { |
- case ERROR_READ_SHUTDOWN: |
- // The other side was cleanly closed, so this isn't actually an error. |
- DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)"; |
- break; |
- case ERROR_READ_BROKEN: |
- LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)"; |
- break; |
- case ERROR_READ_BAD_MESSAGE: |
- // Receiving a bad message means either a bug, data corruption, or |
- // malicious attack (probably due to some other bug). |
- LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad " |
- << "message)"; |
- break; |
- case ERROR_READ_UNKNOWN: |
- LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)"; |
- break; |
- case ERROR_WRITE: |
- LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages"; |
- break; |
+MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
+ lock_.AssertAcquired(); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ is_closed_ = true; |
+ ring_buffer_mapping_.reset(); |
+ shared_ring_buffer_ = nullptr; |
+ |
+ awakable_list_.CancelAll(); |
+ if (!transferred_) { |
+ base::AutoUnlock unlock(lock_); |
+ node_controller_->ClosePort(control_port_); |
} |
- error_ = true; |
- if (started_transport_.Try()) { |
- base::AutoLock locker(lock()); |
- // We can get two OnError callbacks before the post task below completes. |
- // Although RawChannel still has a pointer to this object until Shutdown is |
- // called, that is safe since this class always does a PostTask to the IO |
- // thread to self destruct. |
- if (channel_) { |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
- channel_->Shutdown(); |
- channel_ = nullptr; |
- } |
- started_transport_.Release(); |
- } else { |
- // We must be waiting to call ReleaseHandle. It will call Shutdown. |
+ return MOJO_RESULT_OK; |
+} |
+ |
+HandleSignalsState |
+DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
+ lock_.AssertAcquired(); |
+ |
+ HandleSignalsState rv; |
+ if (shared_ring_buffer_ && bytes_available_) { |
+ if (!in_two_phase_read_) |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
+ } else if (!peer_closed_ && shared_ring_buffer_) { |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
} |
+ |
+ if (peer_closed_) |
+ rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+ return rv; |
} |
-void DataPipeConsumerDispatcher::SerializeInternal() { |
- DCHECK(!in_two_phase_read_); |
- // We need to stop watching handle immediately, even though not on IO thread, |
- // so that other messages aren't read after this. |
- if (channel_) { |
- std::vector<char> serialized_write_buffer; |
- std::vector<int> fds; |
- bool write_error = false; |
- serialized_platform_handle_ = channel_->ReleaseHandle( |
- &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds, |
- &write_error); |
- CHECK(serialized_write_buffer.empty()); |
- CHECK(fds.empty()); |
- CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write."; |
- |
- channel_ = nullptr; |
+void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { |
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " |
+ << num_bytes << " bytes read. [control_port=" |
+ << control_port_.name() << "]"; |
+ |
+ SendDataPipeControlMessage(node_controller_, control_port_, |
+ DataPipeCommand::DATA_WAS_READ, num_bytes); |
+} |
+ |
+void DataPipeConsumerDispatcher::OnPortStatusChanged() { |
+ base::AutoLock lock(lock_); |
+ |
+ // We stop observing the control port as soon it's transferred, but this can |
+ // race with events which are raised right before that happens. This is fine |
+ // to ignore. |
+ if (transferred_) |
+ return; |
+ |
+ DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; |
+ |
+ UpdateSignalsStateNoLock(); |
+} |
+ |
+void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { |
+ lock_.AssertAcquired(); |
+ |
+ bool was_peer_closed = peer_closed_; |
+ size_t previous_bytes_available = bytes_available_; |
+ |
+ ports::PortStatus port_status; |
+ if (node_controller_->node()->GetStatus(control_port_, &port_status) != |
+ ports::OK || |
+ !port_status.receiving_messages) { |
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure" |
+ << " [control_port=" << control_port_.name() << "]"; |
+ |
+ peer_closed_ = true; |
} |
- serialized_ = true; |
+ if (port_status.has_messages && !in_transit_) { |
+ ports::ScopedMessage message; |
+ do { |
+ int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, |
+ &message); |
+ if (rv != ports::OK) |
+ peer_closed_ = true; |
+ if (message) { |
+ const DataPipeControlMessage* m = |
+ static_cast<const DataPipeControlMessage*>( |
+ message->payload_bytes()); |
+ |
+ if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) { |
+ DLOG(ERROR) << "Unexpected control message from producer."; |
+ peer_closed_ = true; |
+ break; |
+ } |
+ |
+ if (static_cast<size_t>(bytes_available_) + m->num_bytes > |
+ options_.capacity_num_bytes) { |
+ DLOG(ERROR) << "Producer claims to have written too many bytes."; |
+ peer_closed_ = true; |
+ break; |
+ } |
+ |
+ DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " |
+ << m->num_bytes << " bytes were written. [control_port=" |
+ << control_port_.name() << "]"; |
+ |
+ bytes_available_ += m->num_bytes; |
+ } |
+ } while (message); |
+ } |
+ |
+ if (peer_closed_ != was_peer_closed || |
+ bytes_available_ != previous_bytes_available) { |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ } |
} |
} // namespace edk |