Index: mojo/edk/system/data_pipe_producer_dispatcher.h |
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.h b/mojo/edk/system/data_pipe_producer_dispatcher.h |
index d6920a3ce79ac44e8a776032f92fc7e64d28723e..c1020b0f5b560ec0ddc1f320c1a8d1867169ba80 100644 |
--- a/mojo/edk/system/data_pipe_producer_dispatcher.h |
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.h |
@@ -8,108 +8,114 @@ |
#include <stddef.h> |
#include <stdint.h> |
+#include "base/macros.h" |
#include "base/memory/ref_counted.h" |
+#include "base/memory/scoped_ptr.h" |
+#include "base/synchronization/lock.h" |
+#include "mojo/edk/embedder/platform_handle_vector.h" |
+#include "mojo/edk/embedder/platform_shared_buffer.h" |
#include "mojo/edk/system/awakable_list.h" |
#include "mojo/edk/system/dispatcher.h" |
-#include "mojo/edk/system/raw_channel.h" |
+#include "mojo/edk/system/ports/port_ref.h" |
#include "mojo/edk/system/system_impl_export.h" |
-#include "mojo/public/cpp/system/macros.h" |
namespace mojo { |
namespace edk { |
-// This is the |Dispatcher| implementation for the producer handle for data |
-// pipes (created by the Mojo primitive |MojoCreateDataPipe()|). This class is |
+struct DataPipeControlMessage; |
+class NodeController; |
+ |
+// This is the Dispatcher implementation for the producer handle for data |
+// pipes created by the Mojo primitive MojoCreateDataPipe(). This class is |
// thread-safe. |
class MOJO_SYSTEM_IMPL_EXPORT DataPipeProducerDispatcher final |
- : public Dispatcher, public RawChannel::Delegate { |
+ : public Dispatcher { |
public: |
- static scoped_refptr<DataPipeProducerDispatcher> Create( |
- const MojoCreateDataPipeOptions& options) { |
- return make_scoped_refptr(new DataPipeProducerDispatcher(options)); |
- } |
- |
- // Must be called before any other methods. |
- void Init(ScopedPlatformHandle message_pipe, |
- char* serialized_write_buffer, size_t serialized_write_buffer_size); |
- |
- // |Dispatcher| public methods: |
+ DataPipeProducerDispatcher( |
+ NodeController* node_controller, |
+ const ports::PortRef& port, |
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
+ const MojoCreateDataPipeOptions& options, |
+ bool initialized, |
+ uint64_t pipe_id); |
+ |
+ // Dispatcher: |
Type GetType() const override; |
+ MojoResult Close() override; |
+ MojoResult WriteData(const void* elements, |
+ uint32_t* num_bytes, |
+ MojoReadDataFlags flags) override; |
+ MojoResult BeginWriteData(void** buffer, |
+ uint32_t* buffer_num_bytes, |
+ MojoWriteDataFlags flags) override; |
+ MojoResult EndWriteData(uint32_t num_bytes_written) override; |
+ HandleSignalsState GetHandleSignalsState() const override; |
+ MojoResult AddAwakable(Awakable* awakable, |
+ MojoHandleSignals signals, |
+ uintptr_t context, |
+ HandleSignalsState* signals_state) override; |
+ void RemoveAwakable(Awakable* awakable, |
+ HandleSignalsState* signals_state) override; |
+ void StartSerialize(uint32_t* num_bytes, |
+ uint32_t* num_ports, |
+ uint32_t* num_handles) override; |
+ bool EndSerialize(void* destination, |
+ ports::PortName* ports, |
+ PlatformHandle* handles) override; |
+ bool BeginTransit() override; |
+ void CompleteTransitAndClose() override; |
+ void CancelTransit() override; |
- // The "opposite" of |SerializeAndClose()|. (Typically this is called by |
- // |Dispatcher::Deserialize()|.) |
static scoped_refptr<DataPipeProducerDispatcher> |
- Deserialize(const void* source, |
- size_t size, |
- PlatformHandleVector* platform_handles); |
+ Deserialize(const void* data, |
+ size_t num_bytes, |
+ const ports::PortName* ports, |
+ size_t num_ports, |
+ PlatformHandle* handles, |
+ size_t num_handles); |
private: |
- DataPipeProducerDispatcher(const MojoCreateDataPipeOptions& options); |
+ class PortObserverThunk; |
+ friend class PortObserverThunk; |
+ |
~DataPipeProducerDispatcher() override; |
- void InitOnIO(); |
- void CloseOnIO(); |
- |
- // |Dispatcher| protected methods: |
- void CancelAllAwakablesNoLock() override; |
- void CloseImplNoLock() override; |
- scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock() |
- override; |
- MojoResult WriteDataImplNoLock(const void* elements, |
- uint32_t* num_bytes, |
- MojoWriteDataFlags flags) override; |
- MojoResult BeginWriteDataImplNoLock(void** buffer, |
- uint32_t* buffer_num_bytes, |
- MojoWriteDataFlags flags) override; |
- MojoResult EndWriteDataImplNoLock(uint32_t num_bytes_written) override; |
- HandleSignalsState GetHandleSignalsStateImplNoLock() const override; |
- MojoResult AddAwakableImplNoLock(Awakable* awakable, |
- MojoHandleSignals signals, |
- uintptr_t context, |
- HandleSignalsState* signals_state) override; |
- void RemoveAwakableImplNoLock(Awakable* awakable, |
- HandleSignalsState* signals_state) override; |
- void StartSerializeImplNoLock(size_t* max_size, |
- size_t* max_platform_handles) override; |
- bool EndSerializeAndCloseImplNoLock( |
- void* destination, |
- size_t* actual_size, |
- PlatformHandleVector* platform_handles) override; |
- void TransportStarted() override; |
- void TransportEnded() override; |
- bool IsBusyNoLock() const override; |
- |
- // |RawChannel::Delegate methods: |
- void OnReadMessage( |
- const MessageInTransit::View& message_view, |
- ScopedPlatformHandleVectorPtr platform_handles) override; |
- void OnError(Error error) override; |
- |
- bool InTwoPhaseWrite() const; |
- bool WriteDataIntoMessages(const void* elements, uint32_t num_bytes); |
- |
- // See comment in MessagePipeDispatcher for this method. |
- void SerializeInternal(); |
- |
- MojoCreateDataPipeOptions options_; |
- |
- // Protected by |lock()|: |
- RawChannel* channel_; // This will be null if closed. |
+ void OnSharedBufferCreated(const scoped_refptr<PlatformSharedBuffer>& buffer); |
+ void InitializeNoLock(); |
+ MojoResult CloseNoLock(); |
+ HandleSignalsState GetHandleSignalsStateNoLock() const; |
+ void NotifyWrite(uint32_t num_bytes); |
+ void OnPortStatusChanged(); |
+ void UpdateSignalsStateNoLock(); |
+ bool ProcessMessageNoLock(const DataPipeControlMessage& message, |
+ ScopedPlatformHandleVectorPtr handles); |
+ |
+ const MojoCreateDataPipeOptions options_; |
+ NodeController* const node_controller_; |
+ const ports::PortRef control_port_; |
+ const uint64_t pipe_id_; |
+ |
+ // Guards access to the fields below. |
+ mutable base::Lock lock_; |
AwakableList awakable_list_; |
- // If DispatcherTransport is created. Must be set before lock() is called to |
- // avoid deadlocks with RawChannel calling us. |
- base::Lock started_transport_; |
+ bool buffer_requested_ = false; |
+ |
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer_; |
+ scoped_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_; |
+ ScopedPlatformHandle buffer_handle_for_transit_; |
- bool error_; |
+ bool in_transit_ = false; |
+ bool is_closed_ = false; |
+ bool peer_closed_ = false; |
+ bool transferred_ = false; |
+ bool in_two_phase_write_ = false; |
- bool serialized_; |
- ScopedPlatformHandle serialized_platform_handle_; |
- std::vector<char> serialized_write_buffer_; |
- std::vector<char> two_phase_data_; |
+ uint32_t write_offset_ = 0; |
+ uint32_t available_capacity_; |
- MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeProducerDispatcher); |
+ DISALLOW_COPY_AND_ASSIGN(DataPipeProducerDispatcher); |
}; |
} // namespace edk |