| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
| 6 |
| 7 #include <algorithm> |
| 8 |
| 9 #include "base/bind.h" |
| 10 #include "base/logging.h" |
| 11 #include "base/message_loop/message_loop.h" |
| 12 #include "mojo/edk/embedder/embedder_internal.h" |
| 13 #include "mojo/edk/embedder/platform_shared_buffer.h" |
| 14 #include "mojo/edk/embedder/platform_support.h" |
| 15 #include "mojo/edk/system/data_pipe.h" |
| 16 #include "mojo/edk/system/memory.h" |
| 17 |
| 18 namespace mojo { |
| 19 namespace system { |
| 20 |
| 21 struct SharedMemoryHeader { |
| 22 uint32_t data_size; |
| 23 uint32_t read_buffer_size; |
| 24 }; |
| 25 |
| 26 void DataPipeConsumerDispatcher::Init( |
| 27 embedder::ScopedPlatformHandle message_pipe) { |
| 28 if (message_pipe.is_valid()) { |
| 29 channel_ = RawChannel::Create(message_pipe.Pass()); |
| 30 if (!serialized_read_buffer_.empty()) |
| 31 channel_->SetInitialReadBufferData( |
| 32 &serialized_read_buffer_[0], serialized_read_buffer_.size()); |
| 33 serialized_read_buffer_.clear(); |
| 34 mojo::embedder::internal::g_io_thread_task_runner->PostTask( |
| 35 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this)); |
| 36 } |
| 37 } |
| 38 |
| 39 void DataPipeConsumerDispatcher::InitOnIO() { |
| 40 base::AutoLock locker(lock()); |
| 41 calling_init_ = true; |
| 42 if (channel_) |
| 43 channel_->Init(this); |
| 44 calling_init_ = false; |
| 45 } |
| 46 |
| 47 void DataPipeConsumerDispatcher::CloseOnIO() { |
| 48 base::AutoLock locker(lock()); |
| 49 if (channel_) { |
| 50 channel_->Shutdown(); |
| 51 channel_ = nullptr; |
| 52 } |
| 53 } |
| 54 |
| 55 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { |
| 56 return Type::DATA_PIPE_CONSUMER; |
| 57 } |
| 58 |
| 59 scoped_refptr<DataPipeConsumerDispatcher> |
| 60 DataPipeConsumerDispatcher::Deserialize( |
| 61 const void* source, |
| 62 size_t size, |
| 63 embedder::PlatformHandleVector* platform_handles) { |
| 64 MojoCreateDataPipeOptions options; |
| 65 embedder::ScopedPlatformHandle shared_memory_handle; |
| 66 size_t shared_memory_size = 0; |
| 67 |
| 68 embedder::ScopedPlatformHandle platform_handle = |
| 69 DataPipe::Deserialize(source, size, platform_handles, &options, |
| 70 &shared_memory_handle, &shared_memory_size); |
| 71 |
| 72 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options)); |
| 73 |
| 74 if (shared_memory_size) { |
| 75 scoped_refptr<embedder::PlatformSharedBuffer> shared_buffer( |
| 76 embedder::internal::g_platform_support->CreateSharedBufferFromHandle( |
| 77 shared_memory_size, shared_memory_handle.Pass()));; |
| 78 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping( |
| 79 shared_buffer->Map(0, shared_memory_size)); |
| 80 char* buffer = static_cast<char*>(mapping->GetBase()); |
| 81 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer); |
| 82 buffer+= sizeof(SharedMemoryHeader); |
| 83 if (header->data_size) { |
| 84 rv->data_.resize(header->data_size); |
| 85 memcpy(&rv->data_[0], buffer, header->data_size); |
| 86 buffer += header->data_size; |
| 87 } |
| 88 if (header->read_buffer_size) { |
| 89 rv->serialized_read_buffer_.resize(header->read_buffer_size); |
| 90 memcpy(&rv->serialized_read_buffer_[0], buffer, header->read_buffer_size); |
| 91 buffer += header->read_buffer_size; |
| 92 } |
| 93 |
| 94 } |
| 95 |
| 96 if (platform_handle.is_valid()) |
| 97 rv->Init(platform_handle.Pass()); |
| 98 return rv; |
| 99 } |
| 100 |
| 101 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( |
| 102 const MojoCreateDataPipeOptions& options) |
| 103 : options_(options), |
| 104 channel_(nullptr), |
| 105 calling_init_(false), |
| 106 in_two_phase_read_(false), |
| 107 two_phase_max_bytes_read_(0), |
| 108 error_(false), |
| 109 serialized_(false) { |
| 110 } |
| 111 |
| 112 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { |
| 113 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. |
| 114 DCHECK(!channel_); |
| 115 } |
| 116 |
| 117 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() { |
| 118 lock().AssertAcquired(); |
| 119 awakable_list_.CancelAll(); |
| 120 } |
| 121 |
| 122 void DataPipeConsumerDispatcher::CloseImplNoLock() { |
| 123 lock().AssertAcquired(); |
| 124 mojo::embedder::internal::g_io_thread_task_runner->PostTask( |
| 125 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this)); |
| 126 } |
| 127 |
| 128 scoped_refptr<Dispatcher> |
| 129 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| 130 lock().AssertAcquired(); |
| 131 |
| 132 SerializeInternal(); |
| 133 |
| 134 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_); |
| 135 rv->channel_ = channel_; |
| 136 channel_ = nullptr; |
| 137 rv->options_ = options_; |
| 138 data_.swap(rv->data_); |
| 139 serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| 140 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); |
| 141 rv->serialized_ = true; |
| 142 |
| 143 return scoped_refptr<Dispatcher>(rv.get()); |
| 144 } |
| 145 |
| 146 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( |
| 147 UserPointer<void> elements, |
| 148 UserPointer<uint32_t> num_bytes, |
| 149 MojoReadDataFlags flags) { |
| 150 lock().AssertAcquired(); |
| 151 if (in_two_phase_read_) |
| 152 return MOJO_RESULT_BUSY; |
| 153 |
| 154 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { |
| 155 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || |
| 156 (flags & MOJO_READ_DATA_FLAG_DISCARD)) |
| 157 return MOJO_RESULT_INVALID_ARGUMENT; |
| 158 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. |
| 159 DVLOG_IF(2, !elements.IsNull()) |
| 160 << "Query mode: ignoring non-null |elements|"; |
| 161 num_bytes.Put(data_.size()); |
| 162 return MOJO_RESULT_OK; |
| 163 } |
| 164 |
| 165 bool discard = false; |
| 166 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { |
| 167 // These flags are mutally exclusive. |
| 168 if (flags & MOJO_READ_DATA_FLAG_PEEK) |
| 169 return MOJO_RESULT_INVALID_ARGUMENT; |
| 170 DVLOG_IF(2, !elements.IsNull()) |
| 171 << "Discard mode: ignoring non-null |elements|"; |
| 172 discard = true; |
| 173 } |
| 174 |
| 175 uint32_t max_num_bytes_to_read = num_bytes.Get(); |
| 176 if (max_num_bytes_to_read % options_.element_num_bytes != 0) |
| 177 return MOJO_RESULT_INVALID_ARGUMENT; |
| 178 |
| 179 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; |
| 180 uint32_t min_num_bytes_to_read = |
| 181 all_or_none ? max_num_bytes_to_read : 0; |
| 182 |
| 183 if (min_num_bytes_to_read > data_.size()) |
| 184 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; |
| 185 |
| 186 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, |
| 187 static_cast<uint32_t>(data_.size())); |
| 188 if (bytes_to_read == 0) |
| 189 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; |
| 190 |
| 191 if (!discard) |
| 192 elements.PutArray(&data_[0], bytes_to_read); |
| 193 num_bytes.Put(bytes_to_read); |
| 194 |
| 195 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); |
| 196 if (discard || !peek) |
| 197 data_.erase(data_.begin(), data_.begin() + bytes_to_read); |
| 198 |
| 199 return MOJO_RESULT_OK; |
| 200 } |
| 201 |
| 202 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( |
| 203 UserPointer<const void*> buffer, |
| 204 UserPointer<uint32_t> buffer_num_bytes, |
| 205 MojoReadDataFlags flags) { |
| 206 lock().AssertAcquired(); |
| 207 if (in_two_phase_read_) |
| 208 return MOJO_RESULT_BUSY; |
| 209 |
| 210 // These flags may not be used in two-phase mode. |
| 211 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
| 212 (flags & MOJO_READ_DATA_FLAG_QUERY) || |
| 213 (flags & MOJO_READ_DATA_FLAG_PEEK)) |
| 214 return MOJO_RESULT_INVALID_ARGUMENT; |
| 215 |
| 216 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; |
| 217 uint32_t min_num_bytes_to_read = 0; |
| 218 if (all_or_none) { |
| 219 min_num_bytes_to_read = buffer_num_bytes.Get(); |
| 220 if (min_num_bytes_to_read % options_.element_num_bytes != 0) |
| 221 return MOJO_RESULT_INVALID_ARGUMENT; |
| 222 } |
| 223 |
| 224 uint32_t max_num_bytes_to_read = data_.size(); |
| 225 if (min_num_bytes_to_read > max_num_bytes_to_read) |
| 226 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; |
| 227 if (max_num_bytes_to_read == 0) |
| 228 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; |
| 229 |
| 230 in_two_phase_read_ = true; |
| 231 buffer.Put(&data_[0]); |
| 232 buffer_num_bytes.Put(max_num_bytes_to_read); |
| 233 two_phase_max_bytes_read_ = max_num_bytes_to_read; |
| 234 |
| 235 return MOJO_RESULT_OK; |
| 236 } |
| 237 |
| 238 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( |
| 239 uint32_t num_bytes_read) { |
| 240 lock().AssertAcquired(); |
| 241 if (!in_two_phase_read_) |
| 242 return MOJO_RESULT_FAILED_PRECONDITION; |
| 243 |
| 244 MojoResult rv; |
| 245 if (num_bytes_read > two_phase_max_bytes_read_ || |
| 246 num_bytes_read % options_.element_num_bytes != 0) { |
| 247 rv = MOJO_RESULT_INVALID_ARGUMENT; |
| 248 } else { |
| 249 rv = MOJO_RESULT_OK; |
| 250 data_.erase(data_.begin(), data_.begin() + num_bytes_read); |
| 251 } |
| 252 |
| 253 in_two_phase_read_ = false; |
| 254 two_phase_max_bytes_read_ = 0; |
| 255 |
| 256 // If we're now readable, we *became* readable (since we weren't readable |
| 257 // during the two-phase read), so awake consumer awakables. |
| 258 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| 259 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) |
| 260 awakable_list_.AwakeForStateChange(new_state); |
| 261 |
| 262 return rv; |
| 263 } |
| 264 |
| 265 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() |
| 266 const { |
| 267 lock().AssertAcquired(); |
| 268 |
| 269 HandleSignalsState rv; |
| 270 if (!data_.empty()) { |
| 271 if (!in_two_phase_read_) |
| 272 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 273 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 274 } else if (!error_) { |
| 275 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 276 } |
| 277 |
| 278 if (error_) |
| 279 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 280 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 281 return rv; |
| 282 } |
| 283 |
| 284 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( |
| 285 Awakable* awakable, |
| 286 MojoHandleSignals signals, |
| 287 uint32_t context, |
| 288 HandleSignalsState* signals_state) { |
| 289 lock().AssertAcquired(); |
| 290 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| 291 if (state.satisfies(signals)) { |
| 292 if (signals_state) |
| 293 *signals_state = state; |
| 294 return MOJO_RESULT_ALREADY_EXISTS; |
| 295 } |
| 296 if (!state.can_satisfy(signals)) { |
| 297 if (signals_state) |
| 298 *signals_state = state; |
| 299 return MOJO_RESULT_FAILED_PRECONDITION; |
| 300 } |
| 301 |
| 302 awakable_list_.Add(awakable, signals, context); |
| 303 return MOJO_RESULT_OK; |
| 304 } |
| 305 |
| 306 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock( |
| 307 Awakable* awakable, |
| 308 HandleSignalsState* signals_state) { |
| 309 lock().AssertAcquired(); |
| 310 awakable_list_.Remove(awakable); |
| 311 if (signals_state) |
| 312 *signals_state = GetHandleSignalsStateImplNoLock(); |
| 313 } |
| 314 |
| 315 void DataPipeConsumerDispatcher::StartSerializeImplNoLock( |
| 316 size_t* max_size, |
| 317 size_t* max_platform_handles) { |
| 318 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock. |
| 319 |
| 320 if (!serialized_) { |
| 321 // handles the case where we have messages read off rawchannel but not |
| 322 // ready by MojoReadMessage. |
| 323 SerializeInternal(); |
| 324 } |
| 325 |
| 326 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), |
| 327 !data_.empty(), |
| 328 max_size, max_platform_handles); |
| 329 } |
| 330 |
| 331 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock( |
| 332 void* destination, |
| 333 size_t* actual_size, |
| 334 embedder::PlatformHandleVector* platform_handles) { |
| 335 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock. |
| 336 |
| 337 embedder::ScopedPlatformHandle shared_memory_handle; |
| 338 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size(); |
| 339 if (shared_memory_size) { |
| 340 shared_memory_size += sizeof(SharedMemoryHeader); |
| 341 SharedMemoryHeader header; |
| 342 header.data_size = data_.size(); |
| 343 header.read_buffer_size = serialized_read_buffer_.size(); |
| 344 |
| 345 scoped_refptr<embedder::PlatformSharedBuffer> shared_buffer( |
| 346 embedder::internal::g_platform_support->CreateSharedBuffer( |
| 347 shared_memory_size)); |
| 348 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping( |
| 349 shared_buffer->Map(0, shared_memory_size)); |
| 350 |
| 351 char* start = static_cast<char*>(mapping->GetBase()); |
| 352 memcpy(start, &header, sizeof(SharedMemoryHeader)); |
| 353 start += sizeof(SharedMemoryHeader); |
| 354 |
| 355 |
| 356 if (!data_.empty()) { |
| 357 memcpy(start, &data_[0], data_.size()); |
| 358 start += data_.size(); |
| 359 } |
| 360 |
| 361 if (!serialized_read_buffer_.empty()) { |
| 362 memcpy(start, &serialized_read_buffer_[0], |
| 363 serialized_read_buffer_.size()); |
| 364 start += serialized_read_buffer_.size(); |
| 365 } |
| 366 |
| 367 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); |
| 368 } |
| 369 |
| 370 DataPipe::EndSerialize( |
| 371 options_, |
| 372 serialized_platform_handle_.Pass(), |
| 373 shared_memory_handle.Pass(), |
| 374 shared_memory_size, destination, actual_size, |
| 375 platform_handles); |
| 376 CloseImplNoLock(); |
| 377 return true; |
| 378 } |
| 379 |
| 380 void DataPipeConsumerDispatcher::TransportStarted() { |
| 381 started_transport_.Acquire(); |
| 382 } |
| 383 |
| 384 void DataPipeConsumerDispatcher::TransportEnded() { |
| 385 started_transport_.Release(); |
| 386 |
| 387 base::AutoLock locker(lock()); |
| 388 |
| 389 // If transporting of DP failed, we might have got more data and didn't awake |
| 390 // for. |
| 391 // TODO(jam): should we care about only alerting if it was empty before |
| 392 // TransportStarted? |
| 393 if (!data_.empty()) |
| 394 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 395 } |
| 396 |
| 397 bool DataPipeConsumerDispatcher::IsBusyNoLock() const { |
| 398 lock().AssertAcquired(); |
| 399 return in_two_phase_read_; |
| 400 } |
| 401 |
| 402 void DataPipeConsumerDispatcher::OnReadMessage( |
| 403 const MessageInTransit::View& message_view, |
| 404 embedder::ScopedPlatformHandleVectorPtr platform_handles) { |
| 405 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
| 406 |
| 407 if (started_transport_.Try()) { |
| 408 // we're not in the middle of being sent |
| 409 |
| 410 // Can get synchronously called back in Init if there was initial data. |
| 411 scoped_ptr<base::AutoLock> locker; |
| 412 if (!calling_init_) { |
| 413 locker.reset(new base::AutoLock(lock())); |
| 414 } |
| 415 |
| 416 size_t old_size = data_.size(); |
| 417 data_.resize(old_size + message->num_bytes()); |
| 418 memcpy(&data_[old_size], message->bytes(), message->num_bytes()); |
| 419 if (!old_size) |
| 420 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 421 started_transport_.Release(); |
| 422 } else { |
| 423 size_t old_size = data_.size(); |
| 424 data_.resize(old_size + message->num_bytes()); |
| 425 memcpy(&data_[old_size], message->bytes(), message->num_bytes()); |
| 426 } |
| 427 } |
| 428 |
| 429 void DataPipeConsumerDispatcher::OnError(Error error) { |
| 430 switch (error) { |
| 431 case ERROR_READ_SHUTDOWN: |
| 432 // The other side was cleanly closed, so this isn't actually an error. |
| 433 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)"; |
| 434 break; |
| 435 case ERROR_READ_BROKEN: |
| 436 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)"; |
| 437 break; |
| 438 case ERROR_READ_BAD_MESSAGE: |
| 439 // Receiving a bad message means either a bug, data corruption, or |
| 440 // malicious attack (probably due to some other bug). |
| 441 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad " |
| 442 << "message)"; |
| 443 break; |
| 444 case ERROR_READ_UNKNOWN: |
| 445 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)"; |
| 446 break; |
| 447 case ERROR_WRITE: |
| 448 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages"; |
| 449 break; |
| 450 } |
| 451 |
| 452 error_ = true; |
| 453 if (started_transport_.Try()) { |
| 454 base::AutoLock locker(lock()); |
| 455 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 456 started_transport_.Release(); |
| 457 |
| 458 base::MessageLoop::current()->PostTask( |
| 459 FROM_HERE, |
| 460 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); |
| 461 channel_ = nullptr; |
| 462 } else { |
| 463 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| 464 } |
| 465 } |
| 466 |
| 467 void DataPipeConsumerDispatcher::SerializeInternal() { |
| 468 // need to stop watching handle immediately, even tho not on IO thread, so |
| 469 // that other messages aren't read after this. |
| 470 if (channel_) { |
| 471 serialized_platform_handle_ = |
| 472 channel_->ReleaseHandle(&serialized_read_buffer_); |
| 473 |
| 474 channel_ = nullptr; |
| 475 serialized_ = true; |
| 476 } |
| 477 } |
| 478 |
| 479 } // namespace system |
| 480 } // namespace mojo |
| OLD | NEW |