| OLD | NEW |
| (Empty) |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | |
| 6 | |
| 7 #include "base/logging.h" | |
| 8 #include "mojo/edk/system/channel.h" | |
| 9 #include "mojo/edk/system/channel_endpoint.h" | |
| 10 #include "mojo/edk/system/constants.h" | |
| 11 #include "mojo/edk/system/local_message_pipe_endpoint.h" | |
| 12 #include "mojo/edk/system/memory.h" | |
| 13 #include "mojo/edk/system/message_in_transit.h" | |
| 14 #include "mojo/edk/system/message_pipe.h" | |
| 15 #include "mojo/edk/system/options_validation.h" | |
| 16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h" | |
| 17 | |
| 18 namespace mojo { | |
| 19 namespace system { | |
| 20 | |
| 21 namespace { | |
| 22 | |
| 23 const unsigned kInvalidPort = static_cast<unsigned>(-1); | |
| 24 | |
| 25 struct SerializedMessagePipeDispatcher { | |
| 26 MessageInTransit::EndpointId endpoint_id; | |
| 27 }; | |
| 28 | |
| 29 } // namespace | |
| 30 | |
| 31 // MessagePipeDispatcher ------------------------------------------------------- | |
| 32 | |
| 33 // static | |
| 34 const MojoCreateMessagePipeOptions | |
| 35 MessagePipeDispatcher::kDefaultCreateOptions = { | |
| 36 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | |
| 37 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | |
| 38 | |
| 39 MessagePipeDispatcher::MessagePipeDispatcher( | |
| 40 const MojoCreateMessagePipeOptions& /*validated_options*/) | |
| 41 : port_(kInvalidPort) { | |
| 42 } | |
| 43 | |
| 44 // static | |
| 45 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | |
| 46 UserPointer<const MojoCreateMessagePipeOptions> in_options, | |
| 47 MojoCreateMessagePipeOptions* out_options) { | |
| 48 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | |
| 49 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; | |
| 50 | |
| 51 *out_options = kDefaultCreateOptions; | |
| 52 if (in_options.IsNull()) | |
| 53 return MOJO_RESULT_OK; | |
| 54 | |
| 55 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | |
| 56 if (!reader.is_valid()) | |
| 57 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 58 | |
| 59 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | |
| 60 return MOJO_RESULT_OK; | |
| 61 if ((reader.options().flags & ~kKnownFlags)) | |
| 62 return MOJO_RESULT_UNIMPLEMENTED; | |
| 63 out_options->flags = reader.options().flags; | |
| 64 | |
| 65 // Checks for fields beyond |flags|: | |
| 66 | |
| 67 // (Nothing here yet.) | |
| 68 | |
| 69 return MOJO_RESULT_OK; | |
| 70 } | |
| 71 | |
| 72 void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe, | |
| 73 unsigned port) { | |
| 74 DCHECK(message_pipe.get()); | |
| 75 DCHECK(port == 0 || port == 1); | |
| 76 | |
| 77 message_pipe_ = message_pipe; | |
| 78 port_ = port; | |
| 79 } | |
| 80 | |
| 81 Dispatcher::Type MessagePipeDispatcher::GetType() const { | |
| 82 return kTypeMessagePipe; | |
| 83 } | |
| 84 | |
| 85 // static | |
| 86 scoped_refptr<MessagePipeDispatcher> | |
| 87 MessagePipeDispatcher::CreateRemoteMessagePipe( | |
| 88 scoped_refptr<ChannelEndpoint>* channel_endpoint) { | |
| 89 scoped_refptr<MessagePipe> message_pipe( | |
| 90 MessagePipe::CreateLocalProxy(channel_endpoint)); | |
| 91 scoped_refptr<MessagePipeDispatcher> dispatcher( | |
| 92 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
| 93 dispatcher->Init(message_pipe, 0); | |
| 94 return dispatcher; | |
| 95 } | |
| 96 | |
| 97 // static | |
| 98 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( | |
| 99 Channel* channel, | |
| 100 const void* source, | |
| 101 size_t size) { | |
| 102 if (size != sizeof(SerializedMessagePipeDispatcher)) { | |
| 103 LOG(ERROR) << "Invalid serialized message pipe dispatcher"; | |
| 104 return scoped_refptr<MessagePipeDispatcher>(); | |
| 105 } | |
| 106 | |
| 107 scoped_refptr<ChannelEndpoint> channel_endpoint; | |
| 108 scoped_refptr<MessagePipeDispatcher> dispatcher = | |
| 109 CreateRemoteMessagePipe(&channel_endpoint); | |
| 110 | |
| 111 MessageInTransit::EndpointId remote_id = | |
| 112 static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id; | |
| 113 if (remote_id == MessageInTransit::kInvalidEndpointId) { | |
| 114 // This means that the other end was closed, and there were no messages | |
| 115 // enqueued for us. | |
| 116 // TODO(vtl): This is wrong. We should produce a "dead" message pipe | |
| 117 // dispatcher. | |
| 118 NOTIMPLEMENTED(); | |
| 119 return scoped_refptr<MessagePipeDispatcher>(); | |
| 120 } | |
| 121 MessageInTransit::EndpointId local_id = | |
| 122 channel->AttachEndpoint(channel_endpoint); | |
| 123 if (local_id == MessageInTransit::kInvalidEndpointId) { | |
| 124 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to " | |
| 125 "attach; remote ID = " << remote_id << ")"; | |
| 126 return scoped_refptr<MessagePipeDispatcher>(); | |
| 127 } | |
| 128 DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id | |
| 129 << ", new local ID = " << local_id << ")"; | |
| 130 | |
| 131 if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) { | |
| 132 // In general, this shouldn't fail, since we generated |local_id| locally. | |
| 133 NOTREACHED(); | |
| 134 return scoped_refptr<MessagePipeDispatcher>(); | |
| 135 } | |
| 136 | |
| 137 // TODO(vtl): FIXME -- Need some error handling here. | |
| 138 channel->RunRemoteMessagePipeEndpoint(local_id, remote_id); | |
| 139 return dispatcher; | |
| 140 } | |
| 141 | |
| 142 MessagePipeDispatcher::~MessagePipeDispatcher() { | |
| 143 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe. | |
| 144 DCHECK(!message_pipe_.get()); | |
| 145 } | |
| 146 | |
| 147 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const { | |
| 148 lock().AssertAcquired(); | |
| 149 return message_pipe_.get(); | |
| 150 } | |
| 151 | |
| 152 unsigned MessagePipeDispatcher::GetPortNoLock() const { | |
| 153 lock().AssertAcquired(); | |
| 154 return port_; | |
| 155 } | |
| 156 | |
| 157 void MessagePipeDispatcher::CancelAllWaitersNoLock() { | |
| 158 lock().AssertAcquired(); | |
| 159 message_pipe_->CancelAllWaiters(port_); | |
| 160 } | |
| 161 | |
| 162 void MessagePipeDispatcher::CloseImplNoLock() { | |
| 163 lock().AssertAcquired(); | |
| 164 message_pipe_->Close(port_); | |
| 165 message_pipe_ = nullptr; | |
| 166 port_ = kInvalidPort; | |
| 167 } | |
| 168 | |
| 169 scoped_refptr<Dispatcher> | |
| 170 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
| 171 lock().AssertAcquired(); | |
| 172 | |
| 173 // TODO(vtl): Currently, there are no options, so we just use | |
| 174 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options | |
| 175 // too. | |
| 176 scoped_refptr<MessagePipeDispatcher> rv = | |
| 177 new MessagePipeDispatcher(kDefaultCreateOptions); | |
| 178 rv->Init(message_pipe_, port_); | |
| 179 message_pipe_ = nullptr; | |
| 180 port_ = kInvalidPort; | |
| 181 return scoped_refptr<Dispatcher>(rv.get()); | |
| 182 } | |
| 183 | |
| 184 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | |
| 185 UserPointer<const void> bytes, | |
| 186 uint32_t num_bytes, | |
| 187 std::vector<DispatcherTransport>* transports, | |
| 188 MojoWriteMessageFlags flags) { | |
| 189 DCHECK(!transports || (transports->size() > 0 && | |
| 190 transports->size() <= kMaxMessageNumHandles)); | |
| 191 | |
| 192 lock().AssertAcquired(); | |
| 193 | |
| 194 if (num_bytes > kMaxMessageNumBytes) | |
| 195 return MOJO_RESULT_RESOURCE_EXHAUSTED; | |
| 196 | |
| 197 return message_pipe_->WriteMessage( | |
| 198 port_, bytes, num_bytes, transports, flags); | |
| 199 } | |
| 200 | |
| 201 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | |
| 202 UserPointer<void> bytes, | |
| 203 UserPointer<uint32_t> num_bytes, | |
| 204 DispatcherVector* dispatchers, | |
| 205 uint32_t* num_dispatchers, | |
| 206 MojoReadMessageFlags flags) { | |
| 207 lock().AssertAcquired(); | |
| 208 return message_pipe_->ReadMessage( | |
| 209 port_, bytes, num_bytes, dispatchers, num_dispatchers, flags); | |
| 210 } | |
| 211 | |
| 212 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | |
| 213 const { | |
| 214 lock().AssertAcquired(); | |
| 215 return message_pipe_->GetHandleSignalsState(port_); | |
| 216 } | |
| 217 | |
| 218 MojoResult MessagePipeDispatcher::AddWaiterImplNoLock( | |
| 219 Waiter* waiter, | |
| 220 MojoHandleSignals signals, | |
| 221 uint32_t context, | |
| 222 HandleSignalsState* signals_state) { | |
| 223 lock().AssertAcquired(); | |
| 224 return message_pipe_->AddWaiter( | |
| 225 port_, waiter, signals, context, signals_state); | |
| 226 } | |
| 227 | |
| 228 void MessagePipeDispatcher::RemoveWaiterImplNoLock( | |
| 229 Waiter* waiter, | |
| 230 HandleSignalsState* signals_state) { | |
| 231 lock().AssertAcquired(); | |
| 232 message_pipe_->RemoveWaiter(port_, waiter, signals_state); | |
| 233 } | |
| 234 | |
| 235 void MessagePipeDispatcher::StartSerializeImplNoLock( | |
| 236 Channel* /*channel*/, | |
| 237 size_t* max_size, | |
| 238 size_t* max_platform_handles) { | |
| 239 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. | |
| 240 *max_size = sizeof(SerializedMessagePipeDispatcher); | |
| 241 *max_platform_handles = 0; | |
| 242 } | |
| 243 | |
| 244 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | |
| 245 Channel* channel, | |
| 246 void* destination, | |
| 247 size_t* actual_size, | |
| 248 embedder::PlatformHandleVector* /*platform_handles*/) { | |
| 249 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. | |
| 250 | |
| 251 // Convert the local endpoint to a proxy endpoint (moving the message queue) | |
| 252 // and attach it to the channel. | |
| 253 MessageInTransit::EndpointId endpoint_id = | |
| 254 channel->AttachEndpoint(message_pipe_->ConvertLocalToProxy(port_)); | |
| 255 // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's | |
| 256 // possible that the other endpoint -- the one that we're not sending -- was | |
| 257 // closed in the intervening time.) In that case, we need to deserialize a | |
| 258 // "dead" message pipe dispatcher on the other end. (Note that this is | |
| 259 // different from just producing |MOJO_HANDLE_INVALID|.) | |
| 260 DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id | |
| 261 << ")"; | |
| 262 | |
| 263 // We now have a local ID. Before we can run the proxy endpoint, we need to | |
| 264 // get an ack back from the other side with the remote ID. | |
| 265 static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id = | |
| 266 endpoint_id; | |
| 267 | |
| 268 message_pipe_ = nullptr; | |
| 269 port_ = kInvalidPort; | |
| 270 | |
| 271 *actual_size = sizeof(SerializedMessagePipeDispatcher); | |
| 272 return true; | |
| 273 } | |
| 274 | |
| 275 // MessagePipeDispatcherTransport ---------------------------------------------- | |
| 276 | |
| 277 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport( | |
| 278 DispatcherTransport transport) | |
| 279 : DispatcherTransport(transport) { | |
| 280 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe); | |
| 281 } | |
| 282 | |
| 283 } // namespace system | |
| 284 } // namespace mojo | |
| OLD | NEW |