Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1207)

Unified Diff: mojo/edk/system/data_pipe_producer_dispatcher.h

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe_producer_dispatcher.h
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.h b/mojo/edk/system/data_pipe_producer_dispatcher.h
index d6920a3ce79ac44e8a776032f92fc7e64d28723e..c1020b0f5b560ec0ddc1f320c1a8d1867169ba80 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.h
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.h
@@ -8,108 +8,114 @@
#include <stddef.h>
#include <stdint.h>
+#include "base/macros.h"
#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/lock.h"
+#include "mojo/edk/embedder/platform_handle_vector.h"
+#include "mojo/edk/embedder/platform_shared_buffer.h"
#include "mojo/edk/system/awakable_list.h"
#include "mojo/edk/system/dispatcher.h"
-#include "mojo/edk/system/raw_channel.h"
+#include "mojo/edk/system/ports/port_ref.h"
#include "mojo/edk/system/system_impl_export.h"
-#include "mojo/public/cpp/system/macros.h"
namespace mojo {
namespace edk {
-// This is the |Dispatcher| implementation for the producer handle for data
-// pipes (created by the Mojo primitive |MojoCreateDataPipe()|). This class is
+struct DataPipeControlMessage;
+class NodeController;
+
+// This is the Dispatcher implementation for the producer handle for data
+// pipes created by the Mojo primitive MojoCreateDataPipe(). This class is
// thread-safe.
class MOJO_SYSTEM_IMPL_EXPORT DataPipeProducerDispatcher final
- : public Dispatcher, public RawChannel::Delegate {
+ : public Dispatcher {
public:
- static scoped_refptr<DataPipeProducerDispatcher> Create(
- const MojoCreateDataPipeOptions& options) {
- return make_scoped_refptr(new DataPipeProducerDispatcher(options));
- }
-
- // Must be called before any other methods.
- void Init(ScopedPlatformHandle message_pipe,
- char* serialized_write_buffer, size_t serialized_write_buffer_size);
-
- // |Dispatcher| public methods:
+ DataPipeProducerDispatcher(
+ NodeController* node_controller,
+ const ports::PortRef& port,
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
+ const MojoCreateDataPipeOptions& options,
+ bool initialized,
+ uint64_t pipe_id);
+
+ // Dispatcher:
Type GetType() const override;
+ MojoResult Close() override;
+ MojoResult WriteData(const void* elements,
+ uint32_t* num_bytes,
+ MojoReadDataFlags flags) override;
+ MojoResult BeginWriteData(void** buffer,
+ uint32_t* buffer_num_bytes,
+ MojoWriteDataFlags flags) override;
+ MojoResult EndWriteData(uint32_t num_bytes_written) override;
+ HandleSignalsState GetHandleSignalsState() const override;
+ MojoResult AddAwakable(Awakable* awakable,
+ MojoHandleSignals signals,
+ uintptr_t context,
+ HandleSignalsState* signals_state) override;
+ void RemoveAwakable(Awakable* awakable,
+ HandleSignalsState* signals_state) override;
+ void StartSerialize(uint32_t* num_bytes,
+ uint32_t* num_ports,
+ uint32_t* num_handles) override;
+ bool EndSerialize(void* destination,
+ ports::PortName* ports,
+ PlatformHandle* handles) override;
+ bool BeginTransit() override;
+ void CompleteTransitAndClose() override;
+ void CancelTransit() override;
- // The "opposite" of |SerializeAndClose()|. (Typically this is called by
- // |Dispatcher::Deserialize()|.)
static scoped_refptr<DataPipeProducerDispatcher>
- Deserialize(const void* source,
- size_t size,
- PlatformHandleVector* platform_handles);
+ Deserialize(const void* data,
+ size_t num_bytes,
+ const ports::PortName* ports,
+ size_t num_ports,
+ PlatformHandle* handles,
+ size_t num_handles);
private:
- DataPipeProducerDispatcher(const MojoCreateDataPipeOptions& options);
+ class PortObserverThunk;
+ friend class PortObserverThunk;
+
~DataPipeProducerDispatcher() override;
- void InitOnIO();
- void CloseOnIO();
-
- // |Dispatcher| protected methods:
- void CancelAllAwakablesNoLock() override;
- void CloseImplNoLock() override;
- scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock()
- override;
- MojoResult WriteDataImplNoLock(const void* elements,
- uint32_t* num_bytes,
- MojoWriteDataFlags flags) override;
- MojoResult BeginWriteDataImplNoLock(void** buffer,
- uint32_t* buffer_num_bytes,
- MojoWriteDataFlags flags) override;
- MojoResult EndWriteDataImplNoLock(uint32_t num_bytes_written) override;
- HandleSignalsState GetHandleSignalsStateImplNoLock() const override;
- MojoResult AddAwakableImplNoLock(Awakable* awakable,
- MojoHandleSignals signals,
- uintptr_t context,
- HandleSignalsState* signals_state) override;
- void RemoveAwakableImplNoLock(Awakable* awakable,
- HandleSignalsState* signals_state) override;
- void StartSerializeImplNoLock(size_t* max_size,
- size_t* max_platform_handles) override;
- bool EndSerializeAndCloseImplNoLock(
- void* destination,
- size_t* actual_size,
- PlatformHandleVector* platform_handles) override;
- void TransportStarted() override;
- void TransportEnded() override;
- bool IsBusyNoLock() const override;
-
- // |RawChannel::Delegate methods:
- void OnReadMessage(
- const MessageInTransit::View& message_view,
- ScopedPlatformHandleVectorPtr platform_handles) override;
- void OnError(Error error) override;
-
- bool InTwoPhaseWrite() const;
- bool WriteDataIntoMessages(const void* elements, uint32_t num_bytes);
-
- // See comment in MessagePipeDispatcher for this method.
- void SerializeInternal();
-
- MojoCreateDataPipeOptions options_;
-
- // Protected by |lock()|:
- RawChannel* channel_; // This will be null if closed.
+ void OnSharedBufferCreated(const scoped_refptr<PlatformSharedBuffer>& buffer);
+ void InitializeNoLock();
+ MojoResult CloseNoLock();
+ HandleSignalsState GetHandleSignalsStateNoLock() const;
+ void NotifyWrite(uint32_t num_bytes);
+ void OnPortStatusChanged();
+ void UpdateSignalsStateNoLock();
+ bool ProcessMessageNoLock(const DataPipeControlMessage& message,
+ ScopedPlatformHandleVectorPtr handles);
+
+ const MojoCreateDataPipeOptions options_;
+ NodeController* const node_controller_;
+ const ports::PortRef control_port_;
+ const uint64_t pipe_id_;
+
+ // Guards access to the fields below.
+ mutable base::Lock lock_;
AwakableList awakable_list_;
- // If DispatcherTransport is created. Must be set before lock() is called to
- // avoid deadlocks with RawChannel calling us.
- base::Lock started_transport_;
+ bool buffer_requested_ = false;
+
+ scoped_refptr<PlatformSharedBuffer> shared_ring_buffer_;
+ scoped_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_;
+ ScopedPlatformHandle buffer_handle_for_transit_;
- bool error_;
+ bool in_transit_ = false;
+ bool is_closed_ = false;
+ bool peer_closed_ = false;
+ bool transferred_ = false;
+ bool in_two_phase_write_ = false;
- bool serialized_;
- ScopedPlatformHandle serialized_platform_handle_;
- std::vector<char> serialized_write_buffer_;
- std::vector<char> two_phase_data_;
+ uint32_t write_offset_ = 0;
+ uint32_t available_capacity_;
- MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeProducerDispatcher);
+ DISALLOW_COPY_AND_ASSIGN(DataPipeProducerDispatcher);
};
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698