Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1299)

Unified Diff: mojo/edk/system/data_pipe_consumer_dispatcher.h

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe_consumer_dispatcher.h
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.h b/mojo/edk/system/data_pipe_consumer_dispatcher.h
index 5566d90e428cfb1b3532ed9c2d4f24875901eeac..c75833bbf85e7ec6ead0d961622452c9a4d96db1 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.h
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.h
@@ -8,114 +8,112 @@
#include <stddef.h>
#include <stdint.h>
+#include "base/macros.h"
#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/lock.h"
+#include "mojo/edk/embedder/platform_handle_vector.h"
+#include "mojo/edk/embedder/platform_shared_buffer.h"
+#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/awakable_list.h"
#include "mojo/edk/system/dispatcher.h"
-#include "mojo/edk/system/raw_channel.h"
+#include "mojo/edk/system/ports/port_ref.h"
#include "mojo/edk/system/system_impl_export.h"
-#include "mojo/public/cpp/system/macros.h"
namespace mojo {
namespace edk {
-// This is the |Dispatcher| implementation for the consumer handle for data
-// pipes (created by the Mojo primitive |MojoCreateDataPipe()|). This class is
+struct DataPipeControlMessage;
+class NodeController;
+
+// This is the Dispatcher implementation for the consumer handle for data
+// pipes created by the Mojo primitive MojoCreateDataPipe(). This class is
// thread-safe.
class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final
- : public Dispatcher, public RawChannel::Delegate {
+ : public Dispatcher {
public:
- static scoped_refptr<DataPipeConsumerDispatcher> Create(
- const MojoCreateDataPipeOptions& options) {
- return make_scoped_refptr(new DataPipeConsumerDispatcher(options));
- }
-
- // Must be called before any other methods.
- void Init(ScopedPlatformHandle message_pipe,
- char* serialized_read_buffer, size_t serialized_read_buffer_size);
-
- // |Dispatcher| public methods:
+ DataPipeConsumerDispatcher(
+ NodeController* node_controller,
+ const ports::PortRef& control_port,
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
+ const MojoCreateDataPipeOptions& options,
+ bool initialized,
+ uint64_t pipe_id);
+
+ // Dispatcher:
Type GetType() const override;
+ MojoResult Close() override;
+ MojoResult ReadData(void* elements,
+ uint32_t* num_bytes,
+ MojoReadDataFlags flags) override;
+ MojoResult BeginReadData(const void** buffer,
+ uint32_t* buffer_num_bytes,
+ MojoReadDataFlags flags) override;
+ MojoResult EndReadData(uint32_t num_bytes_read) override;
+ HandleSignalsState GetHandleSignalsState() const override;
+ MojoResult AddAwakable(Awakable* awakable,
+ MojoHandleSignals signals,
+ uintptr_t context,
+ HandleSignalsState* signals_state) override;
+ void RemoveAwakable(Awakable* awakable,
+ HandleSignalsState* signals_state) override;
+ void StartSerialize(uint32_t* num_bytes,
+ uint32_t* num_ports,
+ uint32_t* num_handles) override;
+ bool EndSerialize(void* destination,
+ ports::PortName* ports,
+ PlatformHandle* handles) override;
+ bool BeginTransit() override;
+ void CompleteTransitAndClose() override;
+ void CancelTransit() override;
- // The "opposite" of |SerializeAndClose()|. (Typically this is called by
- // |Dispatcher::Deserialize()|.)
static scoped_refptr<DataPipeConsumerDispatcher>
- Deserialize(const void* source,
- size_t size,
- PlatformHandleVector* platform_handles);
+ Deserialize(const void* data,
+ size_t num_bytes,
+ const ports::PortName* ports,
+ size_t num_ports,
+ PlatformHandle* handles,
+ size_t num_handles);
private:
- DataPipeConsumerDispatcher(const MojoCreateDataPipeOptions& options);
+ class PortObserverThunk;
+ friend class PortObserverThunk;
+
~DataPipeConsumerDispatcher() override;
- void InitOnIO();
- void CloseOnIO();
-
- // |Dispatcher| protected methods:
- void CancelAllAwakablesNoLock() override;
- void CloseImplNoLock() override;
- scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock()
- override;
- MojoResult ReadDataImplNoLock(void* elements,
- uint32_t* num_bytes,
- MojoReadDataFlags flags) override;
- MojoResult BeginReadDataImplNoLock(const void** buffer,
- uint32_t* buffer_num_bytes,
- MojoReadDataFlags flags) override;
- MojoResult EndReadDataImplNoLock(uint32_t num_bytes_read) override;
- HandleSignalsState GetHandleSignalsStateImplNoLock() const override;
- MojoResult AddAwakableImplNoLock(Awakable* awakable,
- MojoHandleSignals signals,
- uintptr_t context,
- HandleSignalsState* signals_state) override;
- void RemoveAwakableImplNoLock(Awakable* awakable,
- HandleSignalsState* signals_state) override;
- void StartSerializeImplNoLock(size_t* max_size,
- size_t* max_platform_handles) override;
- bool EndSerializeAndCloseImplNoLock(
- void* destination,
- size_t* actual_size,
- PlatformHandleVector* platform_handles) override;
- void TransportStarted() override;
- void TransportEnded() override;
- bool IsBusyNoLock() const override;
-
- // |RawChannel::Delegate methods:
- void OnReadMessage(
- const MessageInTransit::View& message_view,
- ScopedPlatformHandleVectorPtr platform_handles) override;
- void OnError(Error error) override;
-
- // See comment in MessagePipeDispatcher for this method.
- void SerializeInternal();
-
- MojoCreateDataPipeOptions options_;
-
- // Protected by |lock()|:
- RawChannel* channel_; // This will be null if closed.
-
- // Queue of incoming messages.
- std::vector<char> data_;
- AwakableList awakable_list_;
+ void InitializeNoLock();
+ MojoResult CloseNoLock();
+ HandleSignalsState GetHandleSignalsStateNoLock() const;
+ void NotifyRead(uint32_t num_bytes);
+ void OnPortStatusChanged();
+ void UpdateSignalsStateNoLock();
+
+ const MojoCreateDataPipeOptions options_;
+ NodeController* const node_controller_;
+ const ports::PortRef control_port_;
+ const uint64_t pipe_id_;
- // If DispatcherTransport is created. Must be set before lock() is called to
- // avoid deadlocks with RawChannel calling us.
- base::Lock started_transport_;
+ // Guards access to the fields below.
+ mutable base::Lock lock_;
+
+ AwakableList awakable_list_;
- bool calling_init_;
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer_;
+ scoped_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_;
+ ScopedPlatformHandle buffer_handle_for_transit_;
- bool in_two_phase_read_;
- uint32_t two_phase_max_bytes_read_;
- // If we get data from the channel while we're in two-phase read, we can't
- // resize data_ since it's being used. So instead we store it temporarly.
- std::vector<char> data_received_during_two_phase_read_;
+ bool in_two_phase_read_ = false;
+ uint32_t two_phase_max_bytes_read_ = 0;
- bool error_;
+ bool in_transit_ = false;
+ bool is_closed_ = false;
+ bool peer_closed_ = false;
+ bool transferred_ = false;
- bool serialized_;
- std::vector<char> serialized_read_buffer_;
- ScopedPlatformHandle serialized_platform_handle_;
+ uint32_t read_offset_ = 0;
+ uint32_t bytes_available_ = 0;
- MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeConsumerDispatcher);
+ DISALLOW_COPY_AND_ASSIGN(DataPipeConsumerDispatcher);
};
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698