Index: mojo/edk/system/data_pipe_consumer_dispatcher.h |
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.h b/mojo/edk/system/data_pipe_consumer_dispatcher.h |
index 5566d90e428cfb1b3532ed9c2d4f24875901eeac..c75833bbf85e7ec6ead0d961622452c9a4d96db1 100644 |
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.h |
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.h |
@@ -8,114 +8,112 @@ |
#include <stddef.h> |
#include <stdint.h> |
+#include "base/macros.h" |
#include "base/memory/ref_counted.h" |
+#include "base/memory/scoped_ptr.h" |
+#include "base/synchronization/lock.h" |
+#include "mojo/edk/embedder/platform_handle_vector.h" |
+#include "mojo/edk/embedder/platform_shared_buffer.h" |
+#include "mojo/edk/embedder/scoped_platform_handle.h" |
#include "mojo/edk/system/awakable_list.h" |
#include "mojo/edk/system/dispatcher.h" |
-#include "mojo/edk/system/raw_channel.h" |
+#include "mojo/edk/system/ports/port_ref.h" |
#include "mojo/edk/system/system_impl_export.h" |
-#include "mojo/public/cpp/system/macros.h" |
namespace mojo { |
namespace edk { |
-// This is the |Dispatcher| implementation for the consumer handle for data |
-// pipes (created by the Mojo primitive |MojoCreateDataPipe()|). This class is |
+struct DataPipeControlMessage; |
+class NodeController; |
+ |
+// This is the Dispatcher implementation for the consumer handle for data |
+// pipes created by the Mojo primitive MojoCreateDataPipe(). This class is |
// thread-safe. |
class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final |
- : public Dispatcher, public RawChannel::Delegate { |
+ : public Dispatcher { |
public: |
- static scoped_refptr<DataPipeConsumerDispatcher> Create( |
- const MojoCreateDataPipeOptions& options) { |
- return make_scoped_refptr(new DataPipeConsumerDispatcher(options)); |
- } |
- |
- // Must be called before any other methods. |
- void Init(ScopedPlatformHandle message_pipe, |
- char* serialized_read_buffer, size_t serialized_read_buffer_size); |
- |
- // |Dispatcher| public methods: |
+ DataPipeConsumerDispatcher( |
+ NodeController* node_controller, |
+ const ports::PortRef& control_port, |
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
+ const MojoCreateDataPipeOptions& options, |
+ bool initialized, |
+ uint64_t pipe_id); |
+ |
+ // Dispatcher: |
Type GetType() const override; |
+ MojoResult Close() override; |
+ MojoResult ReadData(void* elements, |
+ uint32_t* num_bytes, |
+ MojoReadDataFlags flags) override; |
+ MojoResult BeginReadData(const void** buffer, |
+ uint32_t* buffer_num_bytes, |
+ MojoReadDataFlags flags) override; |
+ MojoResult EndReadData(uint32_t num_bytes_read) override; |
+ HandleSignalsState GetHandleSignalsState() const override; |
+ MojoResult AddAwakable(Awakable* awakable, |
+ MojoHandleSignals signals, |
+ uintptr_t context, |
+ HandleSignalsState* signals_state) override; |
+ void RemoveAwakable(Awakable* awakable, |
+ HandleSignalsState* signals_state) override; |
+ void StartSerialize(uint32_t* num_bytes, |
+ uint32_t* num_ports, |
+ uint32_t* num_handles) override; |
+ bool EndSerialize(void* destination, |
+ ports::PortName* ports, |
+ PlatformHandle* handles) override; |
+ bool BeginTransit() override; |
+ void CompleteTransitAndClose() override; |
+ void CancelTransit() override; |
- // The "opposite" of |SerializeAndClose()|. (Typically this is called by |
- // |Dispatcher::Deserialize()|.) |
static scoped_refptr<DataPipeConsumerDispatcher> |
- Deserialize(const void* source, |
- size_t size, |
- PlatformHandleVector* platform_handles); |
+ Deserialize(const void* data, |
+ size_t num_bytes, |
+ const ports::PortName* ports, |
+ size_t num_ports, |
+ PlatformHandle* handles, |
+ size_t num_handles); |
private: |
- DataPipeConsumerDispatcher(const MojoCreateDataPipeOptions& options); |
+ class PortObserverThunk; |
+ friend class PortObserverThunk; |
+ |
~DataPipeConsumerDispatcher() override; |
- void InitOnIO(); |
- void CloseOnIO(); |
- |
- // |Dispatcher| protected methods: |
- void CancelAllAwakablesNoLock() override; |
- void CloseImplNoLock() override; |
- scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock() |
- override; |
- MojoResult ReadDataImplNoLock(void* elements, |
- uint32_t* num_bytes, |
- MojoReadDataFlags flags) override; |
- MojoResult BeginReadDataImplNoLock(const void** buffer, |
- uint32_t* buffer_num_bytes, |
- MojoReadDataFlags flags) override; |
- MojoResult EndReadDataImplNoLock(uint32_t num_bytes_read) override; |
- HandleSignalsState GetHandleSignalsStateImplNoLock() const override; |
- MojoResult AddAwakableImplNoLock(Awakable* awakable, |
- MojoHandleSignals signals, |
- uintptr_t context, |
- HandleSignalsState* signals_state) override; |
- void RemoveAwakableImplNoLock(Awakable* awakable, |
- HandleSignalsState* signals_state) override; |
- void StartSerializeImplNoLock(size_t* max_size, |
- size_t* max_platform_handles) override; |
- bool EndSerializeAndCloseImplNoLock( |
- void* destination, |
- size_t* actual_size, |
- PlatformHandleVector* platform_handles) override; |
- void TransportStarted() override; |
- void TransportEnded() override; |
- bool IsBusyNoLock() const override; |
- |
- // |RawChannel::Delegate methods: |
- void OnReadMessage( |
- const MessageInTransit::View& message_view, |
- ScopedPlatformHandleVectorPtr platform_handles) override; |
- void OnError(Error error) override; |
- |
- // See comment in MessagePipeDispatcher for this method. |
- void SerializeInternal(); |
- |
- MojoCreateDataPipeOptions options_; |
- |
- // Protected by |lock()|: |
- RawChannel* channel_; // This will be null if closed. |
- |
- // Queue of incoming messages. |
- std::vector<char> data_; |
- AwakableList awakable_list_; |
+ void InitializeNoLock(); |
+ MojoResult CloseNoLock(); |
+ HandleSignalsState GetHandleSignalsStateNoLock() const; |
+ void NotifyRead(uint32_t num_bytes); |
+ void OnPortStatusChanged(); |
+ void UpdateSignalsStateNoLock(); |
+ |
+ const MojoCreateDataPipeOptions options_; |
+ NodeController* const node_controller_; |
+ const ports::PortRef control_port_; |
+ const uint64_t pipe_id_; |
- // If DispatcherTransport is created. Must be set before lock() is called to |
- // avoid deadlocks with RawChannel calling us. |
- base::Lock started_transport_; |
+ // Guards access to the fields below. |
+ mutable base::Lock lock_; |
+ |
+ AwakableList awakable_list_; |
- bool calling_init_; |
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer_; |
+ scoped_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_; |
+ ScopedPlatformHandle buffer_handle_for_transit_; |
- bool in_two_phase_read_; |
- uint32_t two_phase_max_bytes_read_; |
- // If we get data from the channel while we're in two-phase read, we can't |
- // resize data_ since it's being used. So instead we store it temporarly. |
- std::vector<char> data_received_during_two_phase_read_; |
+ bool in_two_phase_read_ = false; |
+ uint32_t two_phase_max_bytes_read_ = 0; |
- bool error_; |
+ bool in_transit_ = false; |
+ bool is_closed_ = false; |
+ bool peer_closed_ = false; |
+ bool transferred_ = false; |
- bool serialized_; |
- std::vector<char> serialized_read_buffer_; |
- ScopedPlatformHandle serialized_platform_handle_; |
+ uint32_t read_offset_ = 0; |
+ uint32_t bytes_available_ = 0; |
- MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeConsumerDispatcher); |
+ DISALLOW_COPY_AND_ASSIGN(DataPipeConsumerDispatcher); |
}; |
} // namespace edk |