| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/edk/system/raw_channel.h" | |
| 6 | |
| 7 #include <stddef.h> | |
| 8 #include <stdint.h> | |
| 9 #include <string.h> | |
| 10 #include <algorithm> | |
| 11 #include <utility> | |
| 12 | |
| 13 #include "base/bind.h" | |
| 14 #include "base/location.h" | |
| 15 #include "base/logging.h" | |
| 16 #include "base/message_loop/message_loop.h" | |
| 17 #include "mojo/edk/embedder/embedder_internal.h" | |
| 18 #include "mojo/edk/system/configuration.h" | |
| 19 #include "mojo/edk/system/message_in_transit.h" | |
| 20 #include "mojo/edk/system/transport_data.h" | |
| 21 | |
| 22 namespace mojo { | |
| 23 namespace edk { | |
| 24 | |
| 25 const size_t kReadSize = 4096; | |
| 26 | |
| 27 // RawChannel::ReadBuffer ------------------------------------------------------ | |
| 28 | |
| 29 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { | |
| 30 } | |
| 31 | |
| 32 RawChannel::ReadBuffer::~ReadBuffer() { | |
| 33 } | |
| 34 | |
| 35 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { | |
| 36 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); | |
| 37 *addr = &buffer_[0] + num_valid_bytes_; | |
| 38 *size = kReadSize; | |
| 39 } | |
| 40 | |
| 41 // RawChannel::WriteBuffer ----------------------------------------------------- | |
| 42 | |
| 43 RawChannel::WriteBuffer::WriteBuffer() | |
| 44 : serialized_platform_handle_size_(0), | |
| 45 platform_handles_offset_(0), | |
| 46 data_offset_(0) { | |
| 47 } | |
| 48 | |
| 49 RawChannel::WriteBuffer::~WriteBuffer() { | |
| 50 message_queue_.Clear(); | |
| 51 } | |
| 52 | |
| 53 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { | |
| 54 if (message_queue_.IsEmpty()) | |
| 55 return false; | |
| 56 | |
| 57 const TransportData* transport_data = | |
| 58 message_queue_.PeekMessage()->transport_data(); | |
| 59 if (!transport_data) | |
| 60 return false; | |
| 61 | |
| 62 const PlatformHandleVector* all_platform_handles = | |
| 63 transport_data->platform_handles(); | |
| 64 if (!all_platform_handles) { | |
| 65 DCHECK_EQ(platform_handles_offset_, 0u); | |
| 66 return false; | |
| 67 } | |
| 68 if (platform_handles_offset_ >= all_platform_handles->size()) { | |
| 69 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size()); | |
| 70 return false; | |
| 71 } | |
| 72 | |
| 73 return true; | |
| 74 } | |
| 75 | |
| 76 void RawChannel::WriteBuffer::GetPlatformHandlesToSend( | |
| 77 size_t* num_platform_handles, | |
| 78 PlatformHandle** platform_handles, | |
| 79 void** serialization_data) { | |
| 80 DCHECK(HavePlatformHandlesToSend()); | |
| 81 | |
| 82 MessageInTransit* message = message_queue_.PeekMessage(); | |
| 83 TransportData* transport_data = message->transport_data(); | |
| 84 PlatformHandleVector* all_platform_handles = | |
| 85 transport_data->platform_handles(); | |
| 86 *num_platform_handles = | |
| 87 all_platform_handles->size() - platform_handles_offset_; | |
| 88 *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; | |
| 89 | |
| 90 if (serialized_platform_handle_size_ > 0) { | |
| 91 size_t serialization_data_offset = | |
| 92 transport_data->platform_handle_table_offset(); | |
| 93 serialization_data_offset += | |
| 94 platform_handles_offset_ * serialized_platform_handle_size_; | |
| 95 *serialization_data = static_cast<char*>(transport_data->buffer()) + | |
| 96 serialization_data_offset; | |
| 97 } else { | |
| 98 *serialization_data = nullptr; | |
| 99 } | |
| 100 } | |
| 101 | |
| 102 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) { | |
| 103 buffers->clear(); | |
| 104 | |
| 105 if (message_queue_.IsEmpty()) | |
| 106 return; | |
| 107 | |
| 108 const MessageInTransit* message = message_queue_.PeekMessage(); | |
| 109 if (message->type() == MessageInTransit::Type::RAW_MESSAGE) { | |
| 110 // These are already-serialized messages so we don't want to write another | |
| 111 // header as they include that. | |
| 112 if (data_offset_ == 0) { | |
| 113 size_t header_size = message->total_size() - message->num_bytes(); | |
| 114 data_offset_ = header_size; | |
| 115 } | |
| 116 } | |
| 117 | |
| 118 DCHECK_LT(data_offset_, message->total_size()); | |
| 119 size_t bytes_to_write = message->total_size() - data_offset_; | |
| 120 | |
| 121 size_t transport_data_buffer_size = | |
| 122 message->transport_data() ? message->transport_data()->buffer_size() : 0; | |
| 123 | |
| 124 if (!transport_data_buffer_size) { | |
| 125 // Only write from the main buffer. | |
| 126 DCHECK_LT(data_offset_, message->main_buffer_size()); | |
| 127 DCHECK_LE(bytes_to_write, message->main_buffer_size()); | |
| 128 Buffer buffer = { | |
| 129 static_cast<const char*>(message->main_buffer()) + data_offset_, | |
| 130 bytes_to_write}; | |
| 131 | |
| 132 buffers->push_back(buffer); | |
| 133 return; | |
| 134 } | |
| 135 | |
| 136 if (data_offset_ >= message->main_buffer_size()) { | |
| 137 // Only write from the transport data buffer. | |
| 138 DCHECK_LT(data_offset_ - message->main_buffer_size(), | |
| 139 transport_data_buffer_size); | |
| 140 DCHECK_LE(bytes_to_write, transport_data_buffer_size); | |
| 141 Buffer buffer = { | |
| 142 static_cast<const char*>(message->transport_data()->buffer()) + | |
| 143 (data_offset_ - message->main_buffer_size()), | |
| 144 bytes_to_write}; | |
| 145 | |
| 146 buffers->push_back(buffer); | |
| 147 return; | |
| 148 } | |
| 149 | |
| 150 // TODO(vtl): We could actually send out buffers from multiple messages, with | |
| 151 // the "stopping" condition being reaching a message with platform handles | |
| 152 // attached. | |
| 153 | |
| 154 // Write from both buffers. | |
| 155 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + | |
| 156 transport_data_buffer_size); | |
| 157 Buffer buffer1 = { | |
| 158 static_cast<const char*>(message->main_buffer()) + data_offset_, | |
| 159 message->main_buffer_size() - data_offset_}; | |
| 160 buffers->push_back(buffer1); | |
| 161 Buffer buffer2 = { | |
| 162 static_cast<const char*>(message->transport_data()->buffer()), | |
| 163 transport_data_buffer_size}; | |
| 164 buffers->push_back(buffer2); | |
| 165 } | |
| 166 | |
| 167 // RawChannel ------------------------------------------------------------------ | |
| 168 | |
| 169 RawChannel::RawChannel() | |
| 170 : delegate_(nullptr), | |
| 171 error_occurred_(false), | |
| 172 calling_delegate_(false), | |
| 173 write_ready_(false), | |
| 174 write_stopped_(false), | |
| 175 pending_write_error_(false), | |
| 176 initialized_(false), | |
| 177 weak_ptr_factory_(this) { | |
| 178 read_buffer_.reset(new ReadBuffer); | |
| 179 write_buffer_.reset(new WriteBuffer()); | |
| 180 } | |
| 181 | |
| 182 RawChannel::~RawChannel() { | |
| 183 DCHECK(!read_buffer_); | |
| 184 DCHECK(!write_buffer_); | |
| 185 } | |
| 186 | |
| 187 void RawChannel::Init(Delegate* delegate) { | |
| 188 DCHECK(delegate); | |
| 189 | |
| 190 base::AutoLock read_locker(read_lock_); | |
| 191 // Solves race where initialiing on io thread while main thread is serializing | |
| 192 // this channel and releases handle. | |
| 193 base::AutoLock locker(write_lock_); | |
| 194 | |
| 195 DCHECK(!delegate_); | |
| 196 delegate_ = delegate; | |
| 197 | |
| 198 if (read_buffer_->num_valid_bytes_ || | |
| 199 !write_buffer_->message_queue_.IsEmpty()) { | |
| 200 LazyInitialize(); | |
| 201 } | |
| 202 } | |
| 203 | |
| 204 void RawChannel::EnsureLazyInitialized() { | |
| 205 { | |
| 206 base::AutoLock locker(write_lock_); | |
| 207 if (initialized_) | |
| 208 return; | |
| 209 } | |
| 210 | |
| 211 internal::g_io_thread_task_runner->PostTask( | |
| 212 FROM_HERE, | |
| 213 base::Bind(&RawChannel::LockAndCallLazyInitialize, | |
| 214 weak_ptr_factory_.GetWeakPtr())); | |
| 215 } | |
| 216 | |
| 217 void RawChannel::LockAndCallLazyInitialize() { | |
| 218 base::AutoLock read_locker(read_lock_); | |
| 219 base::AutoLock locker(write_lock_); | |
| 220 LazyInitialize(); | |
| 221 } | |
| 222 | |
| 223 void RawChannel::LazyInitialize() { | |
| 224 read_lock_.AssertAcquired(); | |
| 225 write_lock_.AssertAcquired(); | |
| 226 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
| 227 if (initialized_) | |
| 228 return; | |
| 229 initialized_ = true; | |
| 230 base::MessageLoop::current()->AddDestructionObserver(this); | |
| 231 | |
| 232 OnInit(); | |
| 233 | |
| 234 if (read_buffer_->num_valid_bytes_) { | |
| 235 // We had serialized read buffer data through SetSerializedData call. | |
| 236 // Make sure we read messages out of it now, otherwise the delegate won't | |
| 237 // get notified if no other data gets written to the pipe. | |
| 238 // Although this means that we can call back synchronously into the caller, | |
| 239 // that's easier than posting a task to do this. That is because if we post | |
| 240 // a task, a pending read could have started and we wouldn't be able to move | |
| 241 // the read buffer since it can be in use by the OS in an async operation. | |
| 242 bool did_dispatch_message = false; | |
| 243 bool stop_dispatching = false; | |
| 244 DispatchMessages(&did_dispatch_message, &stop_dispatching); | |
| 245 } | |
| 246 | |
| 247 IOResult io_result = ScheduleRead(); | |
| 248 if (io_result != IO_PENDING) { | |
| 249 // This will notify the delegate about the read failure. Although we're on | |
| 250 // the I/O thread, don't call it in the nested context. | |
| 251 internal::g_io_thread_task_runner->PostTask( | |
| 252 FROM_HERE, base::Bind(&RawChannel::CallOnReadCompleted, | |
| 253 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); | |
| 254 } | |
| 255 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying | |
| 256 // the delegate), not an initialization failure. | |
| 257 | |
| 258 write_ready_ = true; | |
| 259 write_buffer_->serialized_platform_handle_size_ = | |
| 260 GetSerializedPlatformHandleSize(); | |
| 261 if (!write_buffer_->message_queue_.IsEmpty()) | |
| 262 SendQueuedMessagesNoLock(); | |
| 263 } | |
| 264 | |
| 265 void RawChannel::Shutdown() { | |
| 266 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
| 267 | |
| 268 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 269 // Reset the delegate so that it won't receive further calls. | |
| 270 delegate_ = nullptr; | |
| 271 if (calling_delegate_) { | |
| 272 internal::g_io_thread_task_runner->PostTask( | |
| 273 FROM_HERE, | |
| 274 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
| 275 return; | |
| 276 } | |
| 277 | |
| 278 bool empty = false; | |
| 279 { | |
| 280 base::AutoLock locker(write_lock_); | |
| 281 empty = write_buffer_->message_queue_.IsEmpty(); | |
| 282 } | |
| 283 | |
| 284 // Normally, we want to flush any pending writes before shutting down. This | |
| 285 // doesn't apply when 1) we don't have a handle (for obvious reasons), | |
| 286 // 2) we have a read or write error before (doesn't matter which), or 3) when | |
| 287 // there are no pending messages to be written. | |
| 288 if (!IsHandleValid() || error_occurred_ || empty) { | |
| 289 { | |
| 290 base::AutoLock read_locker(read_lock_); | |
| 291 base::AutoLock locker(write_lock_); | |
| 292 OnShutdownNoLock(std::move(read_buffer_), std::move(write_buffer_)); | |
| 293 if (initialized_) | |
| 294 base::MessageLoop::current()->RemoveDestructionObserver(this); | |
| 295 } | |
| 296 | |
| 297 delete this; | |
| 298 return; | |
| 299 } | |
| 300 | |
| 301 base::AutoLock read_locker(read_lock_); | |
| 302 base::AutoLock locker(write_lock_); | |
| 303 DCHECK(read_buffer_->IsEmpty()) << | |
| 304 "RawChannel::Shutdown called but there is pending data to be read"; | |
| 305 | |
| 306 write_stopped_ = true; | |
| 307 } | |
| 308 | |
| 309 ScopedPlatformHandle RawChannel::ReleaseHandle( | |
| 310 std::vector<char>* serialized_read_buffer, | |
| 311 std::vector<char>* serialized_write_buffer, | |
| 312 std::vector<int>* serialized_read_fds, | |
| 313 std::vector<int>* serialized_write_fds, | |
| 314 bool* write_error) { | |
| 315 ScopedPlatformHandle rv; | |
| 316 *write_error = false; | |
| 317 { | |
| 318 base::AutoLock read_locker(read_lock_); | |
| 319 base::AutoLock locker(write_lock_); | |
| 320 rv = ReleaseHandleNoLock(serialized_read_buffer, | |
| 321 serialized_write_buffer, | |
| 322 serialized_read_fds, | |
| 323 serialized_write_fds, | |
| 324 write_error); | |
| 325 delegate_ = nullptr; | |
| 326 internal::g_io_thread_task_runner->PostTask( | |
| 327 FROM_HERE, | |
| 328 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
| 329 } | |
| 330 | |
| 331 return rv; | |
| 332 } | |
| 333 | |
| 334 // Reminder: This must be thread-safe. | |
| 335 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | |
| 336 DCHECK(message); | |
| 337 EnsureLazyInitialized(); | |
| 338 base::AutoLock locker(write_lock_); | |
| 339 if (write_stopped_) | |
| 340 return false; | |
| 341 | |
| 342 bool queue_was_empty = write_buffer_->message_queue_.IsEmpty(); | |
| 343 EnqueueMessageNoLock(std::move(message)); | |
| 344 if (queue_was_empty && write_ready_) | |
| 345 return SendQueuedMessagesNoLock(); | |
| 346 | |
| 347 return true; | |
| 348 } | |
| 349 | |
| 350 bool RawChannel::SendQueuedMessagesNoLock() { | |
| 351 DCHECK_EQ(write_buffer_->data_offset_, 0u); | |
| 352 | |
| 353 size_t platform_handles_written = 0; | |
| 354 size_t bytes_written = 0; | |
| 355 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | |
| 356 if (io_result == IO_PENDING) | |
| 357 return true; | |
| 358 | |
| 359 bool result = OnWriteCompletedInternalNoLock( | |
| 360 io_result, platform_handles_written, bytes_written); | |
| 361 if (!result) { | |
| 362 // Even if we're on the I/O thread, don't call |OnError()| in the nested | |
| 363 // context. | |
| 364 pending_write_error_ = true; | |
| 365 internal::g_io_thread_task_runner->PostTask( | |
| 366 FROM_HERE, | |
| 367 base::Bind(&RawChannel::LockAndCallOnError, | |
| 368 weak_ptr_factory_.GetWeakPtr(), | |
| 369 Delegate::ERROR_WRITE)); | |
| 370 } | |
| 371 | |
| 372 return result; | |
| 373 } | |
| 374 | |
| 375 void RawChannel::SetSerializedData( | |
| 376 char* serialized_read_buffer, size_t serialized_read_buffer_size, | |
| 377 char* serialized_write_buffer, size_t serialized_write_buffer_size, | |
| 378 std::vector<int>* serialized_read_fds, | |
| 379 std::vector<int>* serialized_write_fds) { | |
| 380 base::AutoLock locker(read_lock_); | |
| 381 | |
| 382 #if defined(OS_POSIX) | |
| 383 SetSerializedFDs(serialized_read_fds, serialized_write_fds); | |
| 384 #endif | |
| 385 | |
| 386 if (serialized_read_buffer_size) { | |
| 387 // TODO(jam): copy power of 2 algorithm below? or share. | |
| 388 read_buffer_->buffer_.resize(serialized_read_buffer_size + kReadSize); | |
| 389 memcpy(&read_buffer_->buffer_[0], serialized_read_buffer, | |
| 390 serialized_read_buffer_size); | |
| 391 read_buffer_->num_valid_bytes_ = serialized_read_buffer_size; | |
| 392 } | |
| 393 | |
| 394 if (serialized_write_buffer_size) { | |
| 395 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | |
| 396 | |
| 397 uint32_t offset = 0; | |
| 398 while (offset < serialized_write_buffer_size) { | |
| 399 uint32_t message_num_bytes = | |
| 400 std::min(static_cast<uint32_t>(max_message_num_bytes), | |
| 401 static_cast<uint32_t>(serialized_write_buffer_size) - | |
| 402 offset); | |
| 403 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 404 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes, | |
| 405 static_cast<const char*>(serialized_write_buffer) + offset)); | |
| 406 write_buffer_->message_queue_.AddMessage(std::move(message)); | |
| 407 offset += message_num_bytes; | |
| 408 } | |
| 409 } | |
| 410 } | |
| 411 | |
| 412 void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) { | |
| 413 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
| 414 read_lock_.AssertAcquired(); | |
| 415 // Keep reading data in a loop, and dispatch messages if enough data is | |
| 416 // received. Exit the loop if any of the following happens: | |
| 417 // - one or more messages were dispatched; | |
| 418 // - the last read failed, was a partial read or would block; | |
| 419 // - |Shutdown()| was called. | |
| 420 do { | |
| 421 switch (io_result) { | |
| 422 case IO_SUCCEEDED: | |
| 423 break; | |
| 424 case IO_FAILED_SHUTDOWN: | |
| 425 case IO_FAILED_BROKEN: | |
| 426 case IO_FAILED_UNKNOWN: | |
| 427 CallOnError(ReadIOResultToError(io_result)); | |
| 428 return; // |this| may have been destroyed in |CallOnError()|. | |
| 429 case IO_PENDING: | |
| 430 NOTREACHED(); | |
| 431 return; | |
| 432 } | |
| 433 | |
| 434 read_buffer_->num_valid_bytes_ += bytes_read; | |
| 435 | |
| 436 // Dispatch all the messages that we can. | |
| 437 bool did_dispatch_message = false; | |
| 438 bool stop_dispatching = false; | |
| 439 DispatchMessages(&did_dispatch_message, &stop_dispatching); | |
| 440 if (stop_dispatching) | |
| 441 return; | |
| 442 | |
| 443 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < | |
| 444 kReadSize) { | |
| 445 // Use power-of-2 buffer sizes. | |
| 446 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | |
| 447 // maximum message size to whatever extent necessary). | |
| 448 // TODO(vtl): We may often be able to peek at the header and get the real | |
| 449 // required extra space (which may be much bigger than |kReadSize|). | |
| 450 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); | |
| 451 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) | |
| 452 new_size *= 2; | |
| 453 | |
| 454 // TODO(vtl): It's suboptimal to zero out the fresh memory. | |
| 455 read_buffer_->buffer_.resize(new_size, 0); | |
| 456 } | |
| 457 | |
| 458 // (1) If we dispatched any messages, stop reading for now (and let the | |
| 459 // message loop do its thing for another round). | |
| 460 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | |
| 461 // a single message. Risks: slower, more complex if we want to avoid lots of | |
| 462 // copying. ii. Keep reading until there's no more data and dispatch all the | |
| 463 // messages we can. Risks: starvation of other users of the message loop.) | |
| 464 // (2) If we didn't max out |kReadSize|, stop reading for now. | |
| 465 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; | |
| 466 bytes_read = 0; | |
| 467 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); | |
| 468 } while (io_result != IO_PENDING); | |
| 469 } | |
| 470 | |
| 471 void RawChannel::OnWriteCompletedNoLock(IOResult io_result, | |
| 472 size_t platform_handles_written, | |
| 473 size_t bytes_written) { | |
| 474 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
| 475 write_lock_.AssertAcquired(); | |
| 476 DCHECK_NE(io_result, IO_PENDING); | |
| 477 | |
| 478 bool did_fail = !OnWriteCompletedInternalNoLock( | |
| 479 io_result, platform_handles_written, bytes_written); | |
| 480 if (did_fail) { | |
| 481 // Don't want to call the delegate with the current callstack for two | |
| 482 // reasons: | |
| 483 // 1) We already have write_lock_ acquired, and calling the delegate means | |
| 484 // also acquiring the read lock. We need to acquire read and then write to | |
| 485 // avoid deadlocks. | |
| 486 // 2) We shouldn't call the delegate with write_lock acquired, since the | |
| 487 // delegate could be calling WriteMessage and that can cause deadlocks. | |
| 488 pending_write_error_ = true; | |
| 489 internal::g_io_thread_task_runner->PostTask( | |
| 490 FROM_HERE, | |
| 491 base::Bind(&RawChannel::LockAndCallOnError, | |
| 492 weak_ptr_factory_.GetWeakPtr(), | |
| 493 Delegate::ERROR_WRITE)); | |
| 494 } | |
| 495 } | |
| 496 | |
| 497 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read, | |
| 498 std::vector<char>* buffer) { | |
| 499 read_lock_.AssertAcquired(); | |
| 500 read_buffer_->num_valid_bytes_ += additional_bytes_read; | |
| 501 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_); | |
| 502 read_buffer_->buffer_.swap(*buffer); | |
| 503 read_buffer_->num_valid_bytes_ = 0; | |
| 504 } | |
| 505 | |
| 506 void RawChannel::SerializeWriteBuffer( | |
| 507 size_t additional_bytes_written, | |
| 508 size_t additional_platform_handles_written, | |
| 509 std::vector<char>* buffer, | |
| 510 std::vector<int>* fds) { | |
| 511 write_lock_.AssertAcquired(); | |
| 512 if (write_buffer_->IsEmpty()) { | |
| 513 DCHECK_EQ(0u, additional_bytes_written); | |
| 514 DCHECK_EQ(0u, additional_platform_handles_written); | |
| 515 return; | |
| 516 } | |
| 517 | |
| 518 UpdateWriteBuffer( | |
| 519 additional_platform_handles_written, additional_bytes_written); | |
| 520 while (!write_buffer_->message_queue_.IsEmpty()) { | |
| 521 SerializePlatformHandles(fds); | |
| 522 std::vector<WriteBuffer::Buffer> buffers; | |
| 523 write_buffer_no_lock()->GetBuffers(&buffers); | |
| 524 for (size_t i = 0; i < buffers.size(); ++i) { | |
| 525 buffer->insert(buffer->end(), buffers[i].addr, | |
| 526 buffers[i].addr + buffers[i].size); | |
| 527 } | |
| 528 write_buffer_->message_queue_.DiscardMessage(); | |
| 529 } | |
| 530 } | |
| 531 | |
| 532 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | |
| 533 write_lock_.AssertAcquired(); | |
| 534 write_buffer_->message_queue_.AddMessage(std::move(message)); | |
| 535 } | |
| 536 | |
| 537 bool RawChannel::OnReadMessageForRawChannel( | |
| 538 const MessageInTransit::View& message_view) { | |
| 539 LOG(ERROR) << "Invalid control message (type " << message_view.type() | |
| 540 << ")"; | |
| 541 return false; | |
| 542 } | |
| 543 | |
| 544 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( | |
| 545 IOResult io_result) { | |
| 546 switch (io_result) { | |
| 547 case IO_FAILED_SHUTDOWN: | |
| 548 return Delegate::ERROR_READ_SHUTDOWN; | |
| 549 case IO_FAILED_BROKEN: | |
| 550 return Delegate::ERROR_READ_BROKEN; | |
| 551 case IO_FAILED_UNKNOWN: | |
| 552 return Delegate::ERROR_READ_UNKNOWN; | |
| 553 case IO_SUCCEEDED: | |
| 554 case IO_PENDING: | |
| 555 NOTREACHED(); | |
| 556 break; | |
| 557 } | |
| 558 return Delegate::ERROR_READ_UNKNOWN; | |
| 559 } | |
| 560 | |
| 561 void RawChannel::CallOnError(Delegate::Error error) { | |
| 562 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
| 563 read_lock_.AssertAcquired(); | |
| 564 error_occurred_ = true; | |
| 565 if (delegate_) { | |
| 566 DCHECK(!calling_delegate_); | |
| 567 calling_delegate_ = true; | |
| 568 delegate_->OnError(error); | |
| 569 calling_delegate_ = false; | |
| 570 } else { | |
| 571 // We depend on delegate to delete since it could be waiting to call | |
| 572 // ReleaseHandle. | |
| 573 internal::g_io_thread_task_runner->PostTask( | |
| 574 FROM_HERE, | |
| 575 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
| 576 } | |
| 577 } | |
| 578 | |
| 579 void RawChannel::LockAndCallOnError(Delegate::Error error) { | |
| 580 base::AutoLock locker(read_lock_); | |
| 581 CallOnError(error); | |
| 582 } | |
| 583 | |
| 584 bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result, | |
| 585 size_t platform_handles_written, | |
| 586 size_t bytes_written) { | |
| 587 write_lock_.AssertAcquired(); | |
| 588 | |
| 589 DCHECK(!write_buffer_->message_queue_.IsEmpty()); | |
| 590 | |
| 591 if (io_result == IO_SUCCEEDED) { | |
| 592 UpdateWriteBuffer(platform_handles_written, bytes_written); | |
| 593 if (write_buffer_->message_queue_.IsEmpty()) { | |
| 594 if (!delegate_) { | |
| 595 // Shutdown must have been called and we were waiting to flush all | |
| 596 // pending writes. Now we're done. | |
| 597 internal::g_io_thread_task_runner->PostTask( | |
| 598 FROM_HERE, | |
| 599 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
| 600 } | |
| 601 return true; | |
| 602 } | |
| 603 | |
| 604 // Schedule the next write. | |
| 605 io_result = ScheduleWriteNoLock(); | |
| 606 if (io_result == IO_PENDING) | |
| 607 return true; | |
| 608 DCHECK_NE(io_result, IO_SUCCEEDED); | |
| 609 } | |
| 610 | |
| 611 write_stopped_ = true; | |
| 612 write_buffer_->message_queue_.Clear(); | |
| 613 write_buffer_->platform_handles_offset_ = 0; | |
| 614 write_buffer_->data_offset_ = 0; | |
| 615 return false; | |
| 616 } | |
| 617 | |
| 618 void RawChannel::DispatchMessages(bool* did_dispatch_message, | |
| 619 bool* stop_dispatching) { | |
| 620 *did_dispatch_message = false; | |
| 621 *stop_dispatching = false; | |
| 622 // Tracks the offset of the first undispatched message in |read_buffer_|. | |
| 623 // Currently, we copy data to ensure that this is zero at the beginning. | |
| 624 size_t read_buffer_start = 0; | |
| 625 size_t remaining_bytes = read_buffer_->num_valid_bytes_; | |
| 626 size_t message_size; | |
| 627 // Note that we rely on short-circuit evaluation here: | |
| 628 // - |read_buffer_start| may be an invalid index into | |
| 629 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | |
| 630 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
| 631 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
| 632 // next read). | |
| 633 // TODO(vtl): Validate that |message_size| is sane. | |
| 634 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | |
| 635 &read_buffer_->buffer_[read_buffer_start], | |
| 636 remaining_bytes, &message_size) && | |
| 637 remaining_bytes >= message_size) { | |
| 638 MessageInTransit::View message_view( | |
| 639 message_size, &read_buffer_->buffer_[read_buffer_start]); | |
| 640 DCHECK_EQ(message_view.total_size(), message_size); | |
| 641 | |
| 642 const char* error_message = nullptr; | |
| 643 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | |
| 644 &error_message)) { | |
| 645 DCHECK(error_message); | |
| 646 LOG(ERROR) << "Received invalid message: " << error_message; | |
| 647 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 648 *stop_dispatching = true; | |
| 649 return; // |this| may have been destroyed in |CallOnError()|. | |
| 650 } | |
| 651 | |
| 652 if (message_view.type() != MessageInTransit::Type::MESSAGE && | |
| 653 message_view.type() != MessageInTransit::Type::QUIT_MESSAGE) { | |
| 654 if (!OnReadMessageForRawChannel(message_view)) { | |
| 655 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 656 *stop_dispatching = true; | |
| 657 return; // |this| may have been destroyed in |CallOnError()|. | |
| 658 } | |
| 659 } else { | |
| 660 ScopedPlatformHandleVectorPtr platform_handles; | |
| 661 if (message_view.transport_data_buffer()) { | |
| 662 size_t num_platform_handles; | |
| 663 const void* platform_handle_table; | |
| 664 TransportData::GetPlatformHandleTable( | |
| 665 message_view.transport_data_buffer(), &num_platform_handles, | |
| 666 &platform_handle_table); | |
| 667 | |
| 668 if (num_platform_handles > 0) { | |
| 669 platform_handles = GetReadPlatformHandles(num_platform_handles, | |
| 670 platform_handle_table); | |
| 671 if (!platform_handles) { | |
| 672 LOG(ERROR) << "Invalid number of platform handles received"; | |
| 673 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 674 *stop_dispatching = true; | |
| 675 return; // |this| may have been destroyed in |CallOnError()|. | |
| 676 } | |
| 677 } | |
| 678 } | |
| 679 | |
| 680 // TODO(vtl): In the case that we aren't expecting any platform handles, | |
| 681 // for the POSIX implementation, we should confirm that none are stored. | |
| 682 if (delegate_) { | |
| 683 DCHECK(!calling_delegate_); | |
| 684 calling_delegate_ = true; | |
| 685 delegate_->OnReadMessage(message_view, std::move(platform_handles)); | |
| 686 calling_delegate_ = false; | |
| 687 } | |
| 688 } | |
| 689 | |
| 690 *did_dispatch_message = true; | |
| 691 | |
| 692 // Update our state. | |
| 693 read_buffer_start += message_size; | |
| 694 remaining_bytes -= message_size; | |
| 695 } | |
| 696 | |
| 697 if (read_buffer_start > 0) { | |
| 698 // Move data back to start. | |
| 699 read_buffer_->num_valid_bytes_ = remaining_bytes; | |
| 700 if (read_buffer_->num_valid_bytes_ > 0) { | |
| 701 memmove(&read_buffer_->buffer_[0], | |
| 702 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); | |
| 703 } | |
| 704 read_buffer_start = 0; | |
| 705 } | |
| 706 } | |
| 707 | |
| 708 void RawChannel::UpdateWriteBuffer(size_t platform_handles_written, | |
| 709 size_t bytes_written) { | |
| 710 write_buffer_->platform_handles_offset_ += platform_handles_written; | |
| 711 write_buffer_->data_offset_ += bytes_written; | |
| 712 | |
| 713 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); | |
| 714 if (write_buffer_->data_offset_ >= message->total_size()) { | |
| 715 // Complete write. | |
| 716 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); | |
| 717 write_buffer_->message_queue_.DiscardMessage(); | |
| 718 write_buffer_->platform_handles_offset_ = 0; | |
| 719 write_buffer_->data_offset_ = 0; | |
| 720 } | |
| 721 } | |
| 722 | |
| 723 void RawChannel::CallOnReadCompleted(IOResult io_result, size_t bytes_read) { | |
| 724 base::AutoLock locker(read_lock_); | |
| 725 OnReadCompletedNoLock(io_result, bytes_read); | |
| 726 } | |
| 727 | |
| 728 void RawChannel::WillDestroyCurrentMessageLoop() { | |
| 729 { | |
| 730 base::AutoLock locker(read_lock_); | |
| 731 OnReadCompletedNoLock(IO_FAILED_SHUTDOWN, 0); | |
| 732 } | |
| 733 // The PostTask inside Shutdown() will never be called, so manually call it | |
| 734 // here to avoid leaks in LSAN builds. | |
| 735 Shutdown(); | |
| 736 } | |
| 737 | |
| 738 } // namespace edk | |
| 739 } // namespace mojo | |
| OLD | NEW |