Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | |
| 6 | |
| 7 #include "base/bind.h" | |
| 8 #include "base/logging.h" | |
| 9 #include "base/message_loop/message_loop.h" | |
| 10 #include "mojo/edk/embedder/embedder_internal.h" | |
| 11 #include "mojo/edk/system/configuration.h" | |
| 12 #include "mojo/edk/system/data_pipe.h" | |
| 13 #include "mojo/edk/system/memory.h" | |
| 14 | |
| 15 namespace mojo { | |
| 16 namespace edk { | |
| 17 | |
| 18 void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe) { | |
| 19 if (message_pipe.is_valid()) { | |
| 20 channel_ = RawChannel::Create(message_pipe.Pass()); | |
| 21 internal::g_io_thread_task_runner->PostTask( | |
| 22 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); | |
| 23 } | |
| 24 } | |
| 25 | |
| 26 void DataPipeProducerDispatcher::InitOnIO() { | |
| 27 base::AutoLock locker(lock()); | |
| 28 if (channel_) | |
| 29 channel_->Init(this); | |
| 30 } | |
| 31 | |
| 32 void DataPipeProducerDispatcher::CloseOnIO() { | |
| 33 base::AutoLock locker(lock()); | |
| 34 if (channel_) { | |
| 35 channel_->Shutdown(); | |
| 36 channel_ = nullptr; | |
| 37 } | |
| 38 } | |
| 39 | |
| 40 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | |
| 41 return Type::DATA_PIPE_PRODUCER; | |
| 42 } | |
| 43 | |
| 44 scoped_refptr<DataPipeProducerDispatcher> | |
| 45 DataPipeProducerDispatcher::Deserialize( | |
| 46 const void* source, | |
| 47 size_t size, | |
| 48 PlatformHandleVector* platform_handles) { | |
| 49 MojoCreateDataPipeOptions options; | |
| 50 ScopedPlatformHandle platform_handle = | |
| 51 DataPipe::Deserialize(source, size, platform_handles, &options, | |
| 52 nullptr, 0); | |
| 53 | |
| 54 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); | |
| 55 if (platform_handle.is_valid()) | |
| 56 rv->Init(platform_handle.Pass()); | |
| 57 return rv; | |
| 58 } | |
| 59 | |
| 60 DataPipeProducerDispatcher::DataPipeProducerDispatcher( | |
| 61 const MojoCreateDataPipeOptions& options) | |
| 62 : options_(options), channel_(nullptr), error_(false) { | |
| 63 } | |
| 64 | |
| 65 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { | |
| 66 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. | |
| 67 DCHECK(!channel_); | |
| 68 } | |
| 69 | |
| 70 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { | |
| 71 lock().AssertAcquired(); | |
| 72 awakable_list_.CancelAll(); | |
| 73 } | |
| 74 | |
| 75 void DataPipeProducerDispatcher::CloseImplNoLock() { | |
| 76 lock().AssertAcquired(); | |
| 77 internal::g_io_thread_task_runner->PostTask( | |
| 78 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); | |
| 79 } | |
| 80 | |
| 81 scoped_refptr<Dispatcher> | |
| 82 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
| 83 lock().AssertAcquired(); | |
| 84 | |
| 85 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); | |
| 86 rv->channel_ = channel_; | |
| 87 channel_ = nullptr; | |
| 88 rv->options_ = options_; | |
| 89 return scoped_refptr<Dispatcher>(rv.get()); | |
| 90 } | |
| 91 | |
| 92 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( | |
| 93 UserPointer<const void> elements, | |
| 94 UserPointer<uint32_t> num_bytes, | |
| 95 MojoWriteDataFlags flags) { | |
| 96 lock().AssertAcquired(); | |
| 97 if (InTwoPhaseWrite()) | |
| 98 return MOJO_RESULT_BUSY; | |
| 99 if (error_) | |
| 100 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 101 if (num_bytes.Get() % options_.element_num_bytes != 0) | |
| 102 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 103 if (num_bytes.Get() == 0) | |
| 104 return MOJO_RESULT_OK; // Nothing to do. | |
| 105 | |
| 106 // For now, we ignore options.capacity_num_bytes as a total of all pending | |
| 107 // writes (and just treat it per message). We will implement that later if | |
| 108 // we need to. All current uses want all their data to be sent, and it's not | |
| 109 // clear that this backpressure should be done at the mojo layer or at a | |
| 110 // higher application layer. | |
| 111 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; | |
| 112 uint32_t min_num_bytes_to_write = all_or_none ? num_bytes.Get() : 0; | |
| 113 if (min_num_bytes_to_write > options_.capacity_num_bytes) { | |
| 114 // Don't return "should wait" since you can't wait for a specified amount of | |
| 115 // data. | |
| 116 return MOJO_RESULT_OUT_OF_RANGE; | |
| 117 } | |
| 118 | |
| 119 size_t num_bytes_to_write = | |
| 120 std::min(static_cast<size_t>(num_bytes.Get()), | |
|
Ken Rockot(use gerrit already)
2015/09/23 22:32:17
size_t should be uint32_t
| |
| 121 options_.capacity_num_bytes); | |
| 122 if (num_bytes_to_write == 0) | |
| 123 return MOJO_RESULT_SHOULD_WAIT; | |
| 124 | |
| 125 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); | |
| 126 | |
| 127 num_bytes.Put(num_bytes_to_write); | |
| 128 WriteDataIntoMessages(elements, num_bytes_to_write); | |
| 129 | |
| 130 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | |
| 131 if (!new_state.equals(old_state)) | |
| 132 awakable_list_.AwakeForStateChange(new_state); | |
| 133 return MOJO_RESULT_OK; | |
| 134 } | |
| 135 | |
| 136 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( | |
| 137 UserPointer<void*> buffer, | |
| 138 UserPointer<uint32_t> buffer_num_bytes, | |
| 139 MojoWriteDataFlags flags) { | |
| 140 lock().AssertAcquired(); | |
| 141 if (InTwoPhaseWrite()) | |
| 142 return MOJO_RESULT_BUSY; | |
| 143 if (error_) | |
| 144 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 145 | |
| 146 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; | |
| 147 uint32_t min_num_bytes_to_write = 0; | |
| 148 if (all_or_none) { | |
| 149 min_num_bytes_to_write = buffer_num_bytes.Get(); | |
| 150 if (min_num_bytes_to_write % options_.element_num_bytes != 0) | |
| 151 return MOJO_RESULT_INVALID_ARGUMENT; | |
| 152 if (min_num_bytes_to_write > options_.capacity_num_bytes) { | |
| 153 // Don't return "should wait" since you can't wait for a specified amount | |
| 154 // of data. | |
| 155 return MOJO_RESULT_OUT_OF_RANGE; | |
| 156 } | |
| 157 } | |
| 158 | |
| 159 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. | |
| 160 if (buffer_num_bytes.Get() == 0) | |
| 161 buffer_num_bytes.Put(options_.capacity_num_bytes); | |
| 162 | |
| 163 two_phase_data_.resize(buffer_num_bytes.Get()); | |
| 164 buffer.Put(&two_phase_data_[0]); | |
| 165 | |
| 166 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes | |
| 167 // we can construct a MessageInTransit here. But then we need to make | |
| 168 // MessageInTransit support changing its data size later. | |
| 169 | |
| 170 return MOJO_RESULT_OK; | |
| 171 } | |
| 172 | |
| 173 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( | |
| 174 uint32_t num_bytes_written) { | |
| 175 lock().AssertAcquired(); | |
| 176 if (!InTwoPhaseWrite()) | |
| 177 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 178 | |
| 179 // Note: Allow successful completion of the two-phase write even if the other | |
| 180 // side has been closed. | |
| 181 MojoResult rv = MOJO_RESULT_OK; | |
| 182 if (num_bytes_written > two_phase_data_.size() || | |
| 183 num_bytes_written % options_.element_num_bytes != 0) { | |
| 184 rv = MOJO_RESULT_INVALID_ARGUMENT; | |
| 185 } else if (channel_) { | |
| 186 WriteDataIntoMessages( | |
| 187 MakeUserPointer(static_cast<const void*>(&two_phase_data_[0])), | |
| 188 num_bytes_written); | |
| 189 } | |
| 190 | |
| 191 // Two-phase write ended even on failure. | |
| 192 two_phase_data_.clear(); | |
| 193 // If we're now writable, we *became* writable (since we weren't writable | |
| 194 // during the two-phase write), so awake producer awakables. | |
| 195 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | |
| 196 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | |
| 197 awakable_list_.AwakeForStateChange(new_state); | |
| 198 | |
| 199 return rv; | |
| 200 } | |
| 201 | |
| 202 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() | |
| 203 const { | |
| 204 lock().AssertAcquired(); | |
| 205 | |
| 206 HandleSignalsState rv; | |
| 207 if (!error_) { | |
| 208 if (!InTwoPhaseWrite()) | |
| 209 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
| 210 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
| 211 } else { | |
| 212 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
| 213 } | |
| 214 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
| 215 return rv; | |
| 216 } | |
| 217 | |
| 218 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( | |
| 219 Awakable* awakable, | |
| 220 MojoHandleSignals signals, | |
| 221 uint32_t context, | |
| 222 HandleSignalsState* signals_state) { | |
| 223 lock().AssertAcquired(); | |
| 224 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | |
| 225 if (state.satisfies(signals)) { | |
| 226 if (signals_state) | |
| 227 *signals_state = state; | |
| 228 return MOJO_RESULT_ALREADY_EXISTS; | |
| 229 } | |
| 230 if (!state.can_satisfy(signals)) { | |
| 231 if (signals_state) | |
| 232 *signals_state = state; | |
| 233 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 234 } | |
| 235 | |
| 236 awakable_list_.Add(awakable, signals, context); | |
| 237 return MOJO_RESULT_OK; | |
| 238 } | |
| 239 | |
| 240 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( | |
| 241 Awakable* awakable, | |
| 242 HandleSignalsState* signals_state) { | |
| 243 lock().AssertAcquired(); | |
| 244 awakable_list_.Remove(awakable); | |
| 245 if (signals_state) | |
| 246 *signals_state = GetHandleSignalsStateImplNoLock(); | |
| 247 } | |
| 248 | |
| 249 void DataPipeProducerDispatcher::StartSerializeImplNoLock( | |
| 250 size_t* max_size, | |
| 251 size_t* max_platform_handles) { | |
| 252 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. | |
| 253 | |
| 254 if (channel_) { | |
| 255 std::vector<char> temp; | |
| 256 serialized_platform_handle_ = channel_->ReleaseHandle(&temp); | |
| 257 channel_ = nullptr; | |
| 258 DCHECK(temp.empty()); | |
| 259 } | |
| 260 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), | |
| 261 false, max_size, max_platform_handles); | |
| 262 } | |
| 263 | |
| 264 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( | |
| 265 void* destination, | |
| 266 size_t* actual_size, | |
| 267 PlatformHandleVector* platform_handles) { | |
| 268 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. | |
| 269 | |
| 270 DataPipe::EndSerialize( | |
| 271 options_, | |
| 272 serialized_platform_handle_.Pass(), | |
| 273 ScopedPlatformHandle(), 0, | |
| 274 destination, actual_size, platform_handles); | |
| 275 CloseImplNoLock(); | |
| 276 return true; | |
| 277 } | |
| 278 | |
| 279 void DataPipeProducerDispatcher::TransportStarted() { | |
| 280 started_transport_.Acquire(); | |
| 281 } | |
| 282 | |
| 283 void DataPipeProducerDispatcher::TransportEnded() { | |
| 284 started_transport_.Release(); | |
| 285 } | |
| 286 | |
| 287 bool DataPipeProducerDispatcher::IsBusyNoLock() const { | |
| 288 lock().AssertAcquired(); | |
| 289 return InTwoPhaseWrite(); | |
| 290 } | |
| 291 | |
| 292 void DataPipeProducerDispatcher::OnReadMessage( | |
| 293 const MessageInTransit::View& message_view, | |
| 294 ScopedPlatformHandleVectorPtr platform_handles) { | |
| 295 NOTREACHED(); | |
| 296 } | |
| 297 | |
| 298 void DataPipeProducerDispatcher::OnError(Error error) { | |
| 299 switch (error) { | |
| 300 case ERROR_READ_SHUTDOWN: | |
| 301 case ERROR_READ_BROKEN: | |
| 302 case ERROR_READ_BAD_MESSAGE: | |
| 303 case ERROR_READ_UNKNOWN: | |
| 304 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages"; | |
| 305 break; | |
| 306 case ERROR_WRITE: | |
| 307 // Write errors are slightly notable: they probably shouldn't happen under | |
| 308 // normal operation (but maybe the other side crashed). | |
| 309 LOG(WARNING) << "DataPipeProducerDispatcher write error"; | |
| 310 break; | |
| 311 } | |
| 312 | |
| 313 error_ = true; | |
| 314 if (started_transport_.Try()) { | |
| 315 base::AutoLock locker(lock()); | |
| 316 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
| 317 | |
| 318 base::MessageLoop::current()->PostTask( | |
| 319 FROM_HERE, | |
| 320 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); | |
| 321 channel_ = nullptr; | |
| 322 started_transport_.Release(); | |
| 323 } else { | |
| 324 // We must be waiting to call ReleaseHandle. It will call Shutdown. | |
| 325 } | |
| 326 } | |
| 327 | |
| 328 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { | |
| 329 return !two_phase_data_.empty(); | |
| 330 } | |
| 331 | |
| 332 bool DataPipeProducerDispatcher::WriteDataIntoMessages( | |
| 333 UserPointer<const void> elements, | |
| 334 size_t num_bytes) { | |
| 335 // The maximum amount of data to send per message (make it a multiple of the | |
| 336 // element size. | |
| 337 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | |
| 338 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; | |
| 339 DCHECK_GT(max_message_num_bytes, 0u); | |
| 340 | |
| 341 size_t offset = 0; | |
| 342 while (offset < num_bytes) { | |
| 343 size_t message_num_bytes = | |
| 344 std::min(max_message_num_bytes, num_bytes - offset); | |
| 345 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 346 MessageInTransit::Type::MESSAGE, message_num_bytes, | |
| 347 elements.At(offset))); | |
| 348 if (!channel_->WriteMessage(message.Pass())) { | |
| 349 error_ = true; | |
| 350 return false; | |
| 351 } | |
| 352 | |
| 353 offset += message_num_bytes; | |
| 354 } | |
| 355 | |
| 356 return true; | |
| 357 } | |
| 358 | |
| 359 } // namespace edk | |
| 360 } // namespace mojo | |
| OLD | NEW |