| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #ifndef MOJO_EDK_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ | 5 #ifndef MOJO_EDK_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ |
| 6 #define MOJO_EDK_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ | 6 #define MOJO_EDK_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ |
| 7 | 7 |
| 8 #include <stddef.h> | |
| 9 #include <stdint.h> | 8 #include <stdint.h> |
| 10 | 9 |
| 11 #include "base/memory/ref_counted.h" | 10 #include <queue> |
| 12 #include "mojo/edk/embedder/platform_channel_pair.h" | 11 |
| 12 #include "base/macros.h" |
| 13 #include "base/memory/scoped_ptr.h" |
| 13 #include "mojo/edk/system/awakable_list.h" | 14 #include "mojo/edk/system/awakable_list.h" |
| 14 #include "mojo/edk/system/dispatcher.h" | 15 #include "mojo/edk/system/dispatcher.h" |
| 15 #include "mojo/edk/system/message_in_transit_queue.h" | 16 #include "mojo/edk/system/ports/port_ref.h" |
| 16 #include "mojo/edk/system/raw_channel.h" | |
| 17 #include "mojo/edk/system/system_impl_export.h" | |
| 18 #include "mojo/public/cpp/system/macros.h" | |
| 19 | |
| 20 namespace base { | |
| 21 namespace debug { | |
| 22 class StackTrace; | |
| 23 } | |
| 24 } | |
| 25 | 17 |
| 26 namespace mojo { | 18 namespace mojo { |
| 27 namespace edk { | 19 namespace edk { |
| 28 | 20 |
| 29 // This is the |Dispatcher| implementation for message pipes (created by the | 21 class NodeController; |
| 30 // Mojo primitive |MojoCreateMessagePipe()|). This class is thread-safe. | 22 class PortsMessage; |
| 31 class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher final | 23 |
| 32 : public Dispatcher, public RawChannel::Delegate { | 24 class MessagePipeDispatcher : public Dispatcher { |
| 33 public: | 25 public: |
| 34 // The default options to use for |MojoCreateMessagePipe()|. (Real uses | 26 // Constructs a MessagePipeDispatcher permanently tied to a specific port. |
| 35 // should obtain this via |ValidateCreateOptions()| with a null |in_options|; | 27 // |connected| must indicate the state of the port at construction time; if |
| 36 // this is exposed directly for testing convenience.) | 28 // the port is initialized with a peer, |connected| must be true. Otherwise it |
| 37 static const MojoCreateMessagePipeOptions kDefaultCreateOptions; | 29 // must be false. |
| 30 // |
| 31 // A MessagePipeDispatcher may not be transferred while in a disconnected |
| 32 // state, and one can never return to a disconnected once connected. |
| 33 // |
| 34 // |pipe_id| is a unique identifier which can be used to track pipe endpoints |
| 35 // as they're passed around. |endpoint| is either 0 or 1 and again is only |
| 36 // used for tracking pipes (one side is always 0, the other is always 1.) |
| 37 MessagePipeDispatcher(NodeController* node_controller, |
| 38 const ports::PortRef& port, |
| 39 uint64_t pipe_id, |
| 40 int endpoint); |
| 38 | 41 |
| 39 static scoped_refptr<MessagePipeDispatcher> Create( | 42 // Dispatcher: |
| 40 const MojoCreateMessagePipeOptions& validated_options) { | 43 Type GetType() const override; |
| 41 return make_scoped_refptr(new MessagePipeDispatcher( | 44 MojoResult Close() override; |
| 42 !!(validated_options.flags & | 45 MojoResult WriteMessage(const void* bytes, |
| 43 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE))); | 46 uint32_t num_bytes, |
| 44 } | 47 const DispatcherInTransit* dispatchers, |
| 48 uint32_t num_dispatchers, |
| 49 MojoWriteMessageFlags flags) override; |
| 50 MojoResult ReadMessage(void* bytes, |
| 51 uint32_t* num_bytes, |
| 52 MojoHandle* handles, |
| 53 uint32_t* num_handles, |
| 54 MojoReadMessageFlags flags) override; |
| 55 HandleSignalsState GetHandleSignalsState() const override; |
| 56 MojoResult AddAwakable(Awakable* awakable, |
| 57 MojoHandleSignals signals, |
| 58 uintptr_t context, |
| 59 HandleSignalsState* signals_state) override; |
| 60 void RemoveAwakable(Awakable* awakable, |
| 61 HandleSignalsState* signals_state) override; |
| 62 void StartSerialize(uint32_t* num_bytes, |
| 63 uint32_t* num_ports, |
| 64 uint32_t* num_handles) override; |
| 65 bool EndSerialize(void* destination, |
| 66 ports::PortName* ports, |
| 67 PlatformHandle* handles) override; |
| 68 bool BeginTransit() override; |
| 69 void CompleteTransitAndClose() override; |
| 70 void CancelTransit() override; |
| 45 | 71 |
| 46 // Validates and/or sets default options for |MojoCreateMessagePipeOptions|. | 72 static scoped_refptr<Dispatcher> Deserialize( |
| 47 // If non-null, |in_options| must point to a struct of at least | 73 const void* data, |
| 48 // |in_options->struct_size| bytes. |out_options| must point to a (current) | 74 size_t num_bytes, |
| 49 // |MojoCreateMessagePipeOptions| and will be entirely overwritten on success | 75 const ports::PortName* ports, |
| 50 // (it may be partly overwritten on failure). | 76 size_t num_ports, |
| 51 static MojoResult ValidateCreateOptions( | 77 PlatformHandle* handles, |
| 52 const MojoCreateMessagePipeOptions* in_options, | 78 size_t num_handles); |
| 53 MojoCreateMessagePipeOptions* out_options); | |
| 54 | |
| 55 // Initializes a transferable message pipe. | |
| 56 // Must be called before any other methods. (This method is not thread-safe.) | |
| 57 void Init( | |
| 58 ScopedPlatformHandle message_pipe, | |
| 59 char* serialized_read_buffer, size_t serialized_read_buffer_size, | |
| 60 char* serialized_write_buffer, size_t serialized_write_buffer_size, | |
| 61 std::vector<int>* serialized_read_fds, | |
| 62 std::vector<int>* serialized_write_fds); | |
| 63 | |
| 64 // Initializes a nontransferable message pipe. | |
| 65 void InitNonTransferable(uint64_t pipe_id); | |
| 66 | |
| 67 // |Dispatcher| public methods: | |
| 68 Type GetType() const override; | |
| 69 | |
| 70 // RawChannel::Delegate methods: | |
| 71 void OnReadMessage( | |
| 72 const MessageInTransit::View& message_view, | |
| 73 ScopedPlatformHandleVectorPtr platform_handles) override; | |
| 74 void OnError(Error error) override; | |
| 75 | |
| 76 // Called by broker when a route is established between this | |
| 77 // MessagePipeDispatcher and another one. This object will receive messages | |
| 78 // sent to its pipe_id. It should tag all outgoing messages by calling | |
| 79 // MessageInTransit::set_route_id with pipe_id_. | |
| 80 void GotNonTransferableChannel(RawChannel* channel); | |
| 81 | |
| 82 // The "opposite" of |SerializeAndClose()|. (Typically this is called by | |
| 83 // |Dispatcher::Deserialize()|.) | |
| 84 static scoped_refptr<MessagePipeDispatcher> Deserialize( | |
| 85 const void* source, | |
| 86 size_t size, | |
| 87 PlatformHandleVector* platform_handles); | |
| 88 | 79 |
| 89 private: | 80 private: |
| 90 // See MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE's definition for an | 81 class PortObserverThunk; |
| 91 // explanation of what is a transferable pipe. | 82 friend class PortObserverThunk; |
| 92 explicit MessagePipeDispatcher(bool transferable); | 83 |
| 93 ~MessagePipeDispatcher() override; | 84 ~MessagePipeDispatcher() override; |
| 94 | 85 |
| 95 void InitOnIO(); | 86 MojoResult CloseNoLock(); |
| 96 void CloseOnIOAndRelease(); | 87 HandleSignalsState GetHandleSignalsStateNoLock() const; |
| 97 void CloseOnIO(); | 88 void OnPortStatusChanged(); |
| 98 | 89 |
| 99 // |Dispatcher| protected methods: | 90 // These are safe to access from any thread without locking. |
| 100 void CancelAllAwakablesNoLock() override; | 91 NodeController* const node_controller_; |
| 101 void CloseImplNoLock() override; | 92 const ports::PortRef port_; |
| 102 scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock() | 93 const uint64_t pipe_id_; |
| 103 override; | 94 const int endpoint_; |
| 104 MojoResult WriteMessageImplNoLock( | |
| 105 const void* bytes, | |
| 106 uint32_t num_bytes, | |
| 107 std::vector<DispatcherTransport>* transports, | |
| 108 MojoWriteMessageFlags flags) override; | |
| 109 MojoResult ReadMessageImplNoLock(void* bytes, | |
| 110 uint32_t* num_bytes, | |
| 111 DispatcherVector* dispatchers, | |
| 112 uint32_t* num_dispatchers, | |
| 113 MojoReadMessageFlags flags) override; | |
| 114 HandleSignalsState GetHandleSignalsStateImplNoLock() const override; | |
| 115 MojoResult AddAwakableImplNoLock(Awakable* awakable, | |
| 116 MojoHandleSignals signals, | |
| 117 uintptr_t context, | |
| 118 HandleSignalsState* signals_state) override; | |
| 119 void RemoveAwakableImplNoLock(Awakable* awakable, | |
| 120 HandleSignalsState* signals_state) override; | |
| 121 void StartSerializeImplNoLock(size_t* max_size, | |
| 122 size_t* max_platform_handles) override; | |
| 123 bool EndSerializeAndCloseImplNoLock( | |
| 124 void* destination, | |
| 125 size_t* actual_size, | |
| 126 PlatformHandleVector* platform_handles) override; | |
| 127 void TransportStarted() override; | |
| 128 void TransportEnded() override; | |
| 129 | 95 |
| 130 // Calls ReleaseHandle and serializes the raw channel. This is split into a | 96 // Guards access to all the fields below. |
| 131 // function because it's called in two different ways: | 97 mutable base::Lock signal_lock_; |
| 132 // 1) When serializing "live" dispatchers that are passed to MojoWriteMessage, | |
| 133 // CreateEquivalentDispatcherAndCloseImplNoLock calls this. | |
| 134 // 2) When serializing dispatchers that are attached to deserialized messages | |
| 135 // which haven't been consumed by MojoReadMessage, StartSerializeImplNoLock | |
| 136 // calls this. | |
| 137 void SerializeInternal(); | |
| 138 | 98 |
| 139 MojoResult AttachTransportsNoLock( | 99 // This is not the same is |port_transferred_|. It's only held true between |
| 140 MessageInTransit* message, | 100 // BeginTransit() and Complete/CancelTransit(). |
| 141 std::vector<DispatcherTransport>* transports); | 101 bool in_transit_ = false; |
| 142 | 102 |
| 143 // Called whenever a read or write is done on a non-transferable pipe, which | 103 bool port_transferred_ = false; |
| 144 // "binds" the pipe id to this object. | 104 bool port_closed_ = false; |
| 145 void RequestNontransferableChannel(); | 105 AwakableList awakables_; |
| 146 | 106 |
| 147 // Protected by |lock()|: | 107 DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher); |
| 148 RawChannel* channel_; | |
| 149 | |
| 150 // Queue of incoming messages that we read from RawChannel but haven't been | |
| 151 // consumed through MojoReadMessage yet. | |
| 152 MessageInTransitQueue message_queue_; | |
| 153 | |
| 154 // The following members are only used when transferable_ is true; | |
| 155 | |
| 156 // When sending MP, contains serialized message_queue_. | |
| 157 std::vector<char> serialized_message_queue_; | |
| 158 std::vector<char> serialized_read_buffer_; | |
| 159 std::vector<char> serialized_write_buffer_; | |
| 160 // Contains FDs from (in this order): the read buffer, the write buffer, and | |
| 161 // message queue. | |
| 162 std::vector<int> serialized_fds_; | |
| 163 size_t serialized_read_fds_length_; | |
| 164 size_t serialized_write_fds_length_; | |
| 165 size_t serialized_message_fds_length_; | |
| 166 ScopedPlatformHandle serialized_platform_handle_; | |
| 167 | |
| 168 // The following members are only used when transferable_ is false; | |
| 169 | |
| 170 // The unique id shared by both ends of a non-transferable message pipe. This | |
| 171 // is held on until a read or write are done, and at that point it's used to | |
| 172 // get a RawChannel. | |
| 173 uint64_t pipe_id_; | |
| 174 enum NonTransferableState { | |
| 175 // The pipe_id hasn't been bound to this object yet until it's read, | |
| 176 // written, or waited on. | |
| 177 WAITING_FOR_READ_OR_WRITE, | |
| 178 // This object was interacted with, so the pipe_id has been bound and we are | |
| 179 // waiting for the broker to connect both sides. | |
| 180 CONNECT_CALLED, | |
| 181 // We have a connection to the other end of the message pipe. | |
| 182 CONNECTED, | |
| 183 // This object has been closed before it's connected. To ensure that the | |
| 184 // other end receives a closed message from this end, we've initiated | |
| 185 // connecting and will close after it succeeds. | |
| 186 WAITING_FOR_CONNECT_TO_CLOSE, | |
| 187 // The message pipe is closed. | |
| 188 CLOSED, | |
| 189 // The message pipe has been transferred. | |
| 190 SERIALISED, | |
| 191 }; | |
| 192 | |
| 193 NonTransferableState non_transferable_state_; | |
| 194 // Messages that were written while we were waiting to get a RawChannel. | |
| 195 MessageInTransitQueue non_transferable_outgoing_message_queue_; | |
| 196 scoped_ptr<base::debug::StackTrace> non_transferable_bound_stack_; | |
| 197 | |
| 198 | |
| 199 // The following members are used for both modes of transferable_. | |
| 200 | |
| 201 AwakableList awakable_list_; | |
| 202 | |
| 203 // If DispatcherTransport is created. Must be set before lock() is called to | |
| 204 // avoid deadlocks with RawChannel calling us. | |
| 205 base::Lock started_transport_; | |
| 206 | |
| 207 bool serialized_; | |
| 208 bool calling_init_; | |
| 209 bool write_error_; | |
| 210 // Whether it can be sent after read or write. | |
| 211 bool transferable_; | |
| 212 // When this object is closed, it has to wait to flush any pending messages | |
| 213 // from the other side to ensure that any in-queue message pipes are closed. | |
| 214 // If this is true, we have already sent the other side the request. | |
| 215 bool close_requested_; | |
| 216 | |
| 217 MOJO_DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher); | |
| 218 }; | 108 }; |
| 219 | 109 |
| 220 } // namespace edk | 110 } // namespace edk |
| 221 } // namespace mojo | 111 } // namespace mojo |
| 222 | 112 |
| 223 #endif // MOJO_EDK_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ | 113 #endif // MOJO_EDK_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ |
| OLD | NEW |