| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
| 6 |
| 7 #include "base/bind.h" |
| 8 #include "base/logging.h" |
| 9 #include "base/message_loop/message_loop.h" |
| 10 #include "mojo/edk/embedder/embedder_internal.h" |
| 11 #include "mojo/edk/system/configuration.h" |
| 12 #include "mojo/edk/system/data_pipe.h" |
| 13 #include "mojo/edk/system/memory.h" |
| 14 |
| 15 namespace mojo { |
| 16 namespace system { |
| 17 |
| 18 void DataPipeProducerDispatcher::Init( |
| 19 embedder::ScopedPlatformHandle message_pipe) { |
| 20 if (message_pipe.is_valid()) { |
| 21 channel_ = RawChannel::Create(message_pipe.Pass()); |
| 22 mojo::embedder::internal::g_io_thread_task_runner->PostTask( |
| 23 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); |
| 24 } |
| 25 } |
| 26 |
| 27 void DataPipeProducerDispatcher::InitOnIO() { |
| 28 base::AutoLock locker(lock()); |
| 29 if (channel_) |
| 30 channel_->Init(this); |
| 31 } |
| 32 |
| 33 void DataPipeProducerDispatcher::CloseOnIO() { |
| 34 base::AutoLock locker(lock()); |
| 35 if (channel_) { |
| 36 channel_->Shutdown(); |
| 37 channel_ = nullptr; |
| 38 } |
| 39 } |
| 40 |
| 41 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
| 42 return Type::DATA_PIPE_PRODUCER; |
| 43 } |
| 44 |
| 45 scoped_refptr<DataPipeProducerDispatcher> |
| 46 DataPipeProducerDispatcher::Deserialize( |
| 47 const void* source, |
| 48 size_t size, |
| 49 embedder::PlatformHandleVector* platform_handles) { |
| 50 MojoCreateDataPipeOptions options; |
| 51 embedder::ScopedPlatformHandle platform_handle = |
| 52 DataPipe::Deserialize(source, size, platform_handles, &options, |
| 53 nullptr, 0); |
| 54 |
| 55 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); |
| 56 if (platform_handle.is_valid()) |
| 57 rv->Init(platform_handle.Pass()); |
| 58 return rv; |
| 59 } |
| 60 |
| 61 DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
| 62 const MojoCreateDataPipeOptions& options) |
| 63 : options_(options), channel_(nullptr), error_(false) { |
| 64 } |
| 65 |
| 66 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
| 67 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. |
| 68 DCHECK(!channel_); |
| 69 } |
| 70 |
| 71 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { |
| 72 lock().AssertAcquired(); |
| 73 awakable_list_.CancelAll(); |
| 74 } |
| 75 |
| 76 void DataPipeProducerDispatcher::CloseImplNoLock() { |
| 77 lock().AssertAcquired(); |
| 78 mojo::embedder::internal::g_io_thread_task_runner->PostTask( |
| 79 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); |
| 80 } |
| 81 |
| 82 scoped_refptr<Dispatcher> |
| 83 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| 84 lock().AssertAcquired(); |
| 85 |
| 86 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); |
| 87 rv->channel_ = channel_; |
| 88 channel_ = nullptr; |
| 89 rv->options_ = options_; |
| 90 return scoped_refptr<Dispatcher>(rv.get()); |
| 91 } |
| 92 |
| 93 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( |
| 94 UserPointer<const void> elements, |
| 95 UserPointer<uint32_t> num_bytes, |
| 96 MojoWriteDataFlags flags) { |
| 97 lock().AssertAcquired(); |
| 98 if (InTwoPhaseWrite()) |
| 99 return MOJO_RESULT_BUSY; |
| 100 if (error_) |
| 101 return MOJO_RESULT_FAILED_PRECONDITION; |
| 102 if (num_bytes.Get() % options_.element_num_bytes != 0) |
| 103 return MOJO_RESULT_INVALID_ARGUMENT; |
| 104 if (num_bytes.Get() == 0) |
| 105 return MOJO_RESULT_OK; // Nothing to do. |
| 106 |
| 107 // For now, we ignore options.capacity_num_bytes as a total of all pending |
| 108 // writes (and just treat this per message). We will implement that later if |
| 109 // we need to. All current uses want all their data to be sent, and it's not |
| 110 // clear that this backpressure should be done at the mojo layer or at a |
| 111 // higher application layer. |
| 112 if (num_bytes.Get() > options_.capacity_num_bytes) |
| 113 return MOJO_RESULT_OUT_OF_RANGE; |
| 114 |
| 115 // Ignore MOJO_WRITE_DATA_FLAG_ALL_OR_NONE for now, it's also not clear we |
| 116 // need/want the functionality to only write a subset at this layer. |
| 117 |
| 118 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
| 119 |
| 120 WriteDataIntoMessages(elements, num_bytes); |
| 121 |
| 122 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| 123 if (!new_state.equals(old_state)) |
| 124 awakable_list_.AwakeForStateChange(new_state); |
| 125 return MOJO_RESULT_OK; |
| 126 } |
| 127 |
| 128 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( |
| 129 UserPointer<void*> buffer, |
| 130 UserPointer<uint32_t> buffer_num_bytes, |
| 131 MojoWriteDataFlags flags) { |
| 132 lock().AssertAcquired(); |
| 133 if (InTwoPhaseWrite()) |
| 134 return MOJO_RESULT_BUSY; |
| 135 if (error_) |
| 136 return MOJO_RESULT_FAILED_PRECONDITION; |
| 137 if (buffer_num_bytes.Get() % options_.element_num_bytes != 0) |
| 138 return MOJO_RESULT_INVALID_ARGUMENT; |
| 139 if (buffer_num_bytes.Get() > options_.capacity_num_bytes) |
| 140 return MOJO_RESULT_OUT_OF_RANGE; |
| 141 |
| 142 if (buffer_num_bytes.Get() == 0) |
| 143 buffer_num_bytes.Put(options_.capacity_num_bytes); |
| 144 |
| 145 two_phase_data_.resize(buffer_num_bytes.Get()); |
| 146 buffer.Put(&two_phase_data_[0]); |
| 147 |
| 148 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes |
| 149 // we can construct a MessageInTransit here. But then we need to make |
| 150 // MessageInTransit support changing its data size later. |
| 151 |
| 152 return MOJO_RESULT_OK; |
| 153 } |
| 154 |
| 155 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( |
| 156 uint32_t num_bytes_written) { |
| 157 lock().AssertAcquired(); |
| 158 if (!InTwoPhaseWrite()) |
| 159 return MOJO_RESULT_FAILED_PRECONDITION; |
| 160 |
| 161 // Note: Allow successful completion of the two-phase write even if the other |
| 162 // side has been closed. |
| 163 MojoResult rv; |
| 164 if (num_bytes_written > two_phase_data_.size() || |
| 165 num_bytes_written % options_.element_num_bytes != 0) { |
| 166 rv = MOJO_RESULT_INVALID_ARGUMENT; |
| 167 } else { |
| 168 WriteDataIntoMessages( |
| 169 MakeUserPointer(static_cast<const void*>(&two_phase_data_[0])), |
| 170 MakeUserPointer(&num_bytes_written)); |
| 171 rv = MOJO_RESULT_OK; |
| 172 } |
| 173 |
| 174 // Two-phase write ended even on failure. |
| 175 two_phase_data_.clear(); |
| 176 // If we're now writable, we *became* writable (since we weren't writable |
| 177 // during the two-phase write), so awake producer awakables. |
| 178 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| 179 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
| 180 awakable_list_.AwakeForStateChange(new_state); |
| 181 |
| 182 return rv; |
| 183 } |
| 184 |
| 185 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() |
| 186 const { |
| 187 lock().AssertAcquired(); |
| 188 |
| 189 HandleSignalsState rv; |
| 190 if (!error_) { |
| 191 if (!InTwoPhaseWrite()) |
| 192 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 193 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 194 } else { |
| 195 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 196 } |
| 197 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 198 return rv; |
| 199 } |
| 200 |
| 201 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( |
| 202 Awakable* awakable, |
| 203 MojoHandleSignals signals, |
| 204 uint32_t context, |
| 205 HandleSignalsState* signals_state) { |
| 206 lock().AssertAcquired(); |
| 207 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| 208 if (state.satisfies(signals)) { |
| 209 if (signals_state) |
| 210 *signals_state = state; |
| 211 return MOJO_RESULT_ALREADY_EXISTS; |
| 212 } |
| 213 if (!state.can_satisfy(signals)) { |
| 214 if (signals_state) |
| 215 *signals_state = state; |
| 216 return MOJO_RESULT_FAILED_PRECONDITION; |
| 217 } |
| 218 |
| 219 awakable_list_.Add(awakable, signals, context); |
| 220 return MOJO_RESULT_OK; |
| 221 } |
| 222 |
| 223 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( |
| 224 Awakable* awakable, |
| 225 HandleSignalsState* signals_state) { |
| 226 lock().AssertAcquired(); |
| 227 awakable_list_.Remove(awakable); |
| 228 if (signals_state) |
| 229 *signals_state = GetHandleSignalsStateImplNoLock(); |
| 230 } |
| 231 |
| 232 void DataPipeProducerDispatcher::StartSerializeImplNoLock( |
| 233 size_t* max_size, |
| 234 size_t* max_platform_handles) { |
| 235 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. |
| 236 |
| 237 if (channel_) { |
| 238 std::vector<char> temp; |
| 239 serialized_platform_handle_ = channel_->ReleaseHandle(&temp); |
| 240 channel_ = nullptr; |
| 241 DCHECK(temp.empty()); |
| 242 } |
| 243 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), |
| 244 false, max_size, max_platform_handles); |
| 245 } |
| 246 |
| 247 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( |
| 248 void* destination, |
| 249 size_t* actual_size, |
| 250 embedder::PlatformHandleVector* platform_handles) { |
| 251 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. |
| 252 |
| 253 DataPipe::EndSerialize( |
| 254 options_, |
| 255 serialized_platform_handle_.Pass(), |
| 256 embedder::ScopedPlatformHandle(), 0, |
| 257 destination, actual_size, platform_handles); |
| 258 CloseImplNoLock(); |
| 259 return true; |
| 260 } |
| 261 |
| 262 void DataPipeProducerDispatcher::TransportStarted() { |
| 263 started_transport_.Acquire(); |
| 264 } |
| 265 |
| 266 void DataPipeProducerDispatcher::TransportEnded() { |
| 267 started_transport_.Release(); |
| 268 } |
| 269 |
| 270 bool DataPipeProducerDispatcher::IsBusyNoLock() const { |
| 271 lock().AssertAcquired(); |
| 272 return InTwoPhaseWrite(); |
| 273 } |
| 274 |
| 275 void DataPipeProducerDispatcher::OnReadMessage( |
| 276 const MessageInTransit::View& message_view, |
| 277 embedder::ScopedPlatformHandleVectorPtr platform_handles) { |
| 278 NOTREACHED(); |
| 279 } |
| 280 |
| 281 void DataPipeProducerDispatcher::OnError(Error error) { |
| 282 switch (error) { |
| 283 case ERROR_READ_SHUTDOWN: |
| 284 case ERROR_READ_BROKEN: |
| 285 case ERROR_READ_BAD_MESSAGE: |
| 286 case ERROR_READ_UNKNOWN: |
| 287 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages"; |
| 288 break; |
| 289 case ERROR_WRITE: |
| 290 // Write errors are slightly notable: they probably shouldn't happen under |
| 291 // normal operation (but maybe the other side crashed). |
| 292 LOG(WARNING) << "DataPipeProducerDispatcher write error"; |
| 293 break; |
| 294 } |
| 295 |
| 296 error_ = true; |
| 297 if (started_transport_.Try()) { |
| 298 base::AutoLock locker(lock()); |
| 299 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 300 |
| 301 base::MessageLoop::current()->PostTask( |
| 302 FROM_HERE, |
| 303 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); |
| 304 channel_ = nullptr; |
| 305 started_transport_.Release(); |
| 306 } else { |
| 307 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| 308 } |
| 309 } |
| 310 |
| 311 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { |
| 312 return !two_phase_data_.empty(); |
| 313 } |
| 314 |
| 315 bool DataPipeProducerDispatcher::WriteDataIntoMessages( |
| 316 UserPointer<const void> elements, |
| 317 UserPointer<uint32_t> num_bytes) { |
| 318 // The maximum amount of data to send per message (make it a multiple of the |
| 319 // element size. |
| 320 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
| 321 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; |
| 322 DCHECK_GT(max_message_num_bytes, 0u); |
| 323 |
| 324 size_t offset = 0; |
| 325 while (offset < num_bytes.Get()) { |
| 326 size_t message_num_bytes = |
| 327 std::min(max_message_num_bytes, num_bytes.Get() - offset); |
| 328 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 329 MessageInTransit::Type::MESSAGE, message_num_bytes, |
| 330 elements.At(offset))); |
| 331 if (!channel_->WriteMessage(message.Pass())) { |
| 332 error_ = true; |
| 333 return false; |
| 334 } |
| 335 |
| 336 offset += message_num_bytes; |
| 337 } |
| 338 |
| 339 return true; |
| 340 } |
| 341 |
| 342 } // namespace system |
| 343 } // namespace mojo |
| OLD | NEW |