Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(768)

Unified Diff: mojo/edk/system/message_pipe_dispatcher.h

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/message_pipe_dispatcher.h
diff --git a/mojo/edk/system/message_pipe_dispatcher.h b/mojo/edk/system/message_pipe_dispatcher.h
index a49a45d6abd26658ea79a910377434996f723172..f898ea8a46fad05cd12b4981beac21cda110f9e1 100644
--- a/mojo/edk/system/message_pipe_dispatcher.h
+++ b/mojo/edk/system/message_pipe_dispatcher.h
@@ -5,216 +5,106 @@
#ifndef MOJO_EDK_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
#define MOJO_EDK_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
-#include <stddef.h>
#include <stdint.h>
-#include "base/memory/ref_counted.h"
-#include "mojo/edk/embedder/platform_channel_pair.h"
+#include <queue>
+
+#include "base/macros.h"
+#include "base/memory/scoped_ptr.h"
#include "mojo/edk/system/awakable_list.h"
#include "mojo/edk/system/dispatcher.h"
-#include "mojo/edk/system/message_in_transit_queue.h"
-#include "mojo/edk/system/raw_channel.h"
-#include "mojo/edk/system/system_impl_export.h"
-#include "mojo/public/cpp/system/macros.h"
-
-namespace base {
-namespace debug {
-class StackTrace;
-}
-}
+#include "mojo/edk/system/ports/port_ref.h"
namespace mojo {
namespace edk {
-// This is the |Dispatcher| implementation for message pipes (created by the
-// Mojo primitive |MojoCreateMessagePipe()|). This class is thread-safe.
-class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher final
- : public Dispatcher, public RawChannel::Delegate {
+class NodeController;
+class PortsMessage;
+
+class MessagePipeDispatcher : public Dispatcher {
public:
- // The default options to use for |MojoCreateMessagePipe()|. (Real uses
- // should obtain this via |ValidateCreateOptions()| with a null |in_options|;
- // this is exposed directly for testing convenience.)
- static const MojoCreateMessagePipeOptions kDefaultCreateOptions;
-
- static scoped_refptr<MessagePipeDispatcher> Create(
- const MojoCreateMessagePipeOptions& validated_options) {
- return make_scoped_refptr(new MessagePipeDispatcher(
- !!(validated_options.flags &
- MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE)));
- }
-
- // Validates and/or sets default options for |MojoCreateMessagePipeOptions|.
- // If non-null, |in_options| must point to a struct of at least
- // |in_options->struct_size| bytes. |out_options| must point to a (current)
- // |MojoCreateMessagePipeOptions| and will be entirely overwritten on success
- // (it may be partly overwritten on failure).
- static MojoResult ValidateCreateOptions(
- const MojoCreateMessagePipeOptions* in_options,
- MojoCreateMessagePipeOptions* out_options);
-
- // Initializes a transferable message pipe.
- // Must be called before any other methods. (This method is not thread-safe.)
- void Init(
- ScopedPlatformHandle message_pipe,
- char* serialized_read_buffer, size_t serialized_read_buffer_size,
- char* serialized_write_buffer, size_t serialized_write_buffer_size,
- std::vector<int>* serialized_read_fds,
- std::vector<int>* serialized_write_fds);
-
- // Initializes a nontransferable message pipe.
- void InitNonTransferable(uint64_t pipe_id);
-
- // |Dispatcher| public methods:
+ // Constructs a MessagePipeDispatcher permanently tied to a specific port.
+ // |connected| must indicate the state of the port at construction time; if
+ // the port is initialized with a peer, |connected| must be true. Otherwise it
+ // must be false.
+ //
+ // A MessagePipeDispatcher may not be transferred while in a disconnected
+ // state, and one can never return to a disconnected once connected.
+ //
+ // |pipe_id| is a unique identifier which can be used to track pipe endpoints
+ // as they're passed around. |endpoint| is either 0 or 1 and again is only
+ // used for tracking pipes (one side is always 0, the other is always 1.)
+ MessagePipeDispatcher(NodeController* node_controller,
+ const ports::PortRef& port,
+ uint64_t pipe_id,
+ int endpoint);
+
+ // Dispatcher:
Type GetType() const override;
-
- // RawChannel::Delegate methods:
- void OnReadMessage(
- const MessageInTransit::View& message_view,
- ScopedPlatformHandleVectorPtr platform_handles) override;
- void OnError(Error error) override;
-
- // Called by broker when a route is established between this
- // MessagePipeDispatcher and another one. This object will receive messages
- // sent to its pipe_id. It should tag all outgoing messages by calling
- // MessageInTransit::set_route_id with pipe_id_.
- void GotNonTransferableChannel(RawChannel* channel);
-
- // The "opposite" of |SerializeAndClose()|. (Typically this is called by
- // |Dispatcher::Deserialize()|.)
- static scoped_refptr<MessagePipeDispatcher> Deserialize(
- const void* source,
- size_t size,
- PlatformHandleVector* platform_handles);
+ MojoResult Close() override;
+ MojoResult WriteMessage(const void* bytes,
+ uint32_t num_bytes,
+ const DispatcherInTransit* dispatchers,
+ uint32_t num_dispatchers,
+ MojoWriteMessageFlags flags) override;
+ MojoResult ReadMessage(void* bytes,
+ uint32_t* num_bytes,
+ MojoHandle* handles,
+ uint32_t* num_handles,
+ MojoReadMessageFlags flags) override;
+ HandleSignalsState GetHandleSignalsState() const override;
+ MojoResult AddAwakable(Awakable* awakable,
+ MojoHandleSignals signals,
+ uintptr_t context,
+ HandleSignalsState* signals_state) override;
+ void RemoveAwakable(Awakable* awakable,
+ HandleSignalsState* signals_state) override;
+ void StartSerialize(uint32_t* num_bytes,
+ uint32_t* num_ports,
+ uint32_t* num_handles) override;
+ bool EndSerialize(void* destination,
+ ports::PortName* ports,
+ PlatformHandle* handles) override;
+ bool BeginTransit() override;
+ void CompleteTransitAndClose() override;
+ void CancelTransit() override;
+
+ static scoped_refptr<Dispatcher> Deserialize(
+ const void* data,
+ size_t num_bytes,
+ const ports::PortName* ports,
+ size_t num_ports,
+ PlatformHandle* handles,
+ size_t num_handles);
private:
- // See MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE's definition for an
- // explanation of what is a transferable pipe.
- explicit MessagePipeDispatcher(bool transferable);
+ class PortObserverThunk;
+ friend class PortObserverThunk;
+
~MessagePipeDispatcher() override;
- void InitOnIO();
- void CloseOnIOAndRelease();
- void CloseOnIO();
-
- // |Dispatcher| protected methods:
- void CancelAllAwakablesNoLock() override;
- void CloseImplNoLock() override;
- scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock()
- override;
- MojoResult WriteMessageImplNoLock(
- const void* bytes,
- uint32_t num_bytes,
- std::vector<DispatcherTransport>* transports,
- MojoWriteMessageFlags flags) override;
- MojoResult ReadMessageImplNoLock(void* bytes,
- uint32_t* num_bytes,
- DispatcherVector* dispatchers,
- uint32_t* num_dispatchers,
- MojoReadMessageFlags flags) override;
- HandleSignalsState GetHandleSignalsStateImplNoLock() const override;
- MojoResult AddAwakableImplNoLock(Awakable* awakable,
- MojoHandleSignals signals,
- uintptr_t context,
- HandleSignalsState* signals_state) override;
- void RemoveAwakableImplNoLock(Awakable* awakable,
- HandleSignalsState* signals_state) override;
- void StartSerializeImplNoLock(size_t* max_size,
- size_t* max_platform_handles) override;
- bool EndSerializeAndCloseImplNoLock(
- void* destination,
- size_t* actual_size,
- PlatformHandleVector* platform_handles) override;
- void TransportStarted() override;
- void TransportEnded() override;
-
- // Calls ReleaseHandle and serializes the raw channel. This is split into a
- // function because it's called in two different ways:
- // 1) When serializing "live" dispatchers that are passed to MojoWriteMessage,
- // CreateEquivalentDispatcherAndCloseImplNoLock calls this.
- // 2) When serializing dispatchers that are attached to deserialized messages
- // which haven't been consumed by MojoReadMessage, StartSerializeImplNoLock
- // calls this.
- void SerializeInternal();
-
- MojoResult AttachTransportsNoLock(
- MessageInTransit* message,
- std::vector<DispatcherTransport>* transports);
-
- // Called whenever a read or write is done on a non-transferable pipe, which
- // "binds" the pipe id to this object.
- void RequestNontransferableChannel();
-
- // Protected by |lock()|:
- RawChannel* channel_;
-
- // Queue of incoming messages that we read from RawChannel but haven't been
- // consumed through MojoReadMessage yet.
- MessageInTransitQueue message_queue_;
-
- // The following members are only used when transferable_ is true;
-
- // When sending MP, contains serialized message_queue_.
- std::vector<char> serialized_message_queue_;
- std::vector<char> serialized_read_buffer_;
- std::vector<char> serialized_write_buffer_;
- // Contains FDs from (in this order): the read buffer, the write buffer, and
- // message queue.
- std::vector<int> serialized_fds_;
- size_t serialized_read_fds_length_;
- size_t serialized_write_fds_length_;
- size_t serialized_message_fds_length_;
- ScopedPlatformHandle serialized_platform_handle_;
-
- // The following members are only used when transferable_ is false;
-
- // The unique id shared by both ends of a non-transferable message pipe. This
- // is held on until a read or write are done, and at that point it's used to
- // get a RawChannel.
- uint64_t pipe_id_;
- enum NonTransferableState {
- // The pipe_id hasn't been bound to this object yet until it's read,
- // written, or waited on.
- WAITING_FOR_READ_OR_WRITE,
- // This object was interacted with, so the pipe_id has been bound and we are
- // waiting for the broker to connect both sides.
- CONNECT_CALLED,
- // We have a connection to the other end of the message pipe.
- CONNECTED,
- // This object has been closed before it's connected. To ensure that the
- // other end receives a closed message from this end, we've initiated
- // connecting and will close after it succeeds.
- WAITING_FOR_CONNECT_TO_CLOSE,
- // The message pipe is closed.
- CLOSED,
- // The message pipe has been transferred.
- SERIALISED,
- };
-
- NonTransferableState non_transferable_state_;
- // Messages that were written while we were waiting to get a RawChannel.
- MessageInTransitQueue non_transferable_outgoing_message_queue_;
- scoped_ptr<base::debug::StackTrace> non_transferable_bound_stack_;
-
-
- // The following members are used for both modes of transferable_.
-
- AwakableList awakable_list_;
-
- // If DispatcherTransport is created. Must be set before lock() is called to
- // avoid deadlocks with RawChannel calling us.
- base::Lock started_transport_;
-
- bool serialized_;
- bool calling_init_;
- bool write_error_;
- // Whether it can be sent after read or write.
- bool transferable_;
- // When this object is closed, it has to wait to flush any pending messages
- // from the other side to ensure that any in-queue message pipes are closed.
- // If this is true, we have already sent the other side the request.
- bool close_requested_;
-
- MOJO_DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher);
+ MojoResult CloseNoLock();
+ HandleSignalsState GetHandleSignalsStateNoLock() const;
+ void OnPortStatusChanged();
+
+ // These are safe to access from any thread without locking.
+ NodeController* const node_controller_;
+ const ports::PortRef port_;
+ const uint64_t pipe_id_;
+ const int endpoint_;
+
+ // Guards access to all the fields below.
+ mutable base::Lock signal_lock_;
+
+ // This is not the same is |port_transferred_|. It's only held true between
+ // BeginTransit() and Complete/CancelTransit().
+ bool in_transit_ = false;
+
+ bool port_transferred_ = false;
+ bool port_closed_ = false;
+ AwakableList awakables_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher);
};
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698