| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 | 10 |
| (...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 163 } | 163 } |
| 164 | 164 |
| 165 // RawChannel ------------------------------------------------------------------ | 165 // RawChannel ------------------------------------------------------------------ |
| 166 | 166 |
| 167 RawChannel::RawChannel() | 167 RawChannel::RawChannel() |
| 168 : message_loop_for_io_(nullptr), | 168 : message_loop_for_io_(nullptr), |
| 169 delegate_(nullptr), | 169 delegate_(nullptr), |
| 170 write_ready_(false), | 170 write_ready_(false), |
| 171 write_stopped_(false), | 171 write_stopped_(false), |
| 172 error_occurred_(false), | 172 error_occurred_(false), |
| 173 pending_error_(false), |
| 173 weak_ptr_factory_(this) { | 174 weak_ptr_factory_(this) { |
| 174 read_buffer_.reset(new ReadBuffer); | 175 read_buffer_.reset(new ReadBuffer); |
| 175 write_buffer_.reset(new WriteBuffer()); | 176 write_buffer_.reset(new WriteBuffer()); |
| 176 } | 177 } |
| 177 | 178 |
| 178 RawChannel::~RawChannel() { | 179 RawChannel::~RawChannel() { |
| 179 DCHECK(!read_buffer_); | 180 DCHECK(!read_buffer_); |
| 180 DCHECK(!write_buffer_); | 181 DCHECK(!write_buffer_); |
| 181 | 182 |
| 182 // Only want to decrement counter if Init was called. | 183 // Only want to decrement counter if Init was called. |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 218 bool did_dispatch_message = false; | 219 bool did_dispatch_message = false; |
| 219 bool stop_dispatching = false; | 220 bool stop_dispatching = false; |
| 220 DispatchMessages(&did_dispatch_message, &stop_dispatching); | 221 DispatchMessages(&did_dispatch_message, &stop_dispatching); |
| 221 } | 222 } |
| 222 | 223 |
| 223 IOResult io_result = ScheduleRead(); | 224 IOResult io_result = ScheduleRead(); |
| 224 if (io_result != IO_PENDING) { | 225 if (io_result != IO_PENDING) { |
| 225 // This will notify the delegate about the read failure. Although we're on | 226 // This will notify the delegate about the read failure. Although we're on |
| 226 // the I/O thread, don't call it in the nested context. | 227 // the I/O thread, don't call it in the nested context. |
| 227 message_loop_for_io_->PostTask( | 228 message_loop_for_io_->PostTask( |
| 228 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, | 229 FROM_HERE, base::Bind(&RawChannel::CallOnReadCompleted, |
| 229 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); | 230 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); |
| 230 } | 231 } |
| 231 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying | 232 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying |
| 232 // the delegate), not an initialization failure. | 233 // the delegate), not an initialization failure. |
| 233 | 234 |
| 234 write_ready_ = true; | 235 write_ready_ = true; |
| 235 write_buffer_->serialized_platform_handle_size_ = | 236 write_buffer_->serialized_platform_handle_size_ = |
| 236 GetSerializedPlatformHandleSize(); | 237 GetSerializedPlatformHandleSize(); |
| 237 if (!write_buffer_->message_queue_.IsEmpty()) | 238 if (!write_buffer_->message_queue_.IsEmpty()) |
| 238 SendQueuedMessagesNoLock(); | 239 SendQueuedMessagesNoLock(); |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 323 | 324 |
| 324 bool RawChannel::SendQueuedMessagesNoLock() { | 325 bool RawChannel::SendQueuedMessagesNoLock() { |
| 325 DCHECK_EQ(write_buffer_->data_offset_, 0u); | 326 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
| 326 | 327 |
| 327 size_t platform_handles_written = 0; | 328 size_t platform_handles_written = 0; |
| 328 size_t bytes_written = 0; | 329 size_t bytes_written = 0; |
| 329 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 330 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
| 330 if (io_result == IO_PENDING) | 331 if (io_result == IO_PENDING) |
| 331 return true; | 332 return true; |
| 332 | 333 |
| 333 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, | 334 bool result = OnWriteCompletedInternalNoLock( |
| 334 bytes_written); | 335 io_result, platform_handles_written, bytes_written); |
| 335 if (!result) { | 336 if (!result) { |
| 336 // Even if we're on the I/O thread, don't call |OnError()| in the nested | 337 // Even if we're on the I/O thread, don't call |OnError()| in the nested |
| 337 // context. | 338 // context. |
| 339 pending_error_ = true; |
| 338 message_loop_for_io_->PostTask( | 340 message_loop_for_io_->PostTask( |
| 339 FROM_HERE, | 341 FROM_HERE, |
| 340 base::Bind(&RawChannel::LockAndCallOnError, | 342 base::Bind(&RawChannel::LockAndCallOnError, |
| 341 weak_ptr_factory_.GetWeakPtr(), | 343 weak_ptr_factory_.GetWeakPtr(), |
| 342 Delegate::ERROR_WRITE)); | 344 Delegate::ERROR_WRITE)); |
| 343 } | 345 } |
| 344 | 346 |
| 345 return result; | 347 return result; |
| 346 } | 348 } |
| 347 | 349 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 369 offset); | 371 offset); |
| 370 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 372 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 371 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes, | 373 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes, |
| 372 static_cast<const char*>(serialized_write_buffer) + offset)); | 374 static_cast<const char*>(serialized_write_buffer) + offset)); |
| 373 write_buffer_->message_queue_.AddMessage(message.Pass()); | 375 write_buffer_->message_queue_.AddMessage(message.Pass()); |
| 374 offset += message_num_bytes; | 376 offset += message_num_bytes; |
| 375 } | 377 } |
| 376 } | 378 } |
| 377 } | 379 } |
| 378 | 380 |
| 379 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { | 381 void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) { |
| 380 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 382 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 381 | 383 read_lock_.AssertAcquired(); |
| 382 base::AutoLock locker(read_lock_); | |
| 383 | |
| 384 // Keep reading data in a loop, and dispatch messages if enough data is | 384 // Keep reading data in a loop, and dispatch messages if enough data is |
| 385 // received. Exit the loop if any of the following happens: | 385 // received. Exit the loop if any of the following happens: |
| 386 // - one or more messages were dispatched; | 386 // - one or more messages were dispatched; |
| 387 // - the last read failed, was a partial read or would block; | 387 // - the last read failed, was a partial read or would block; |
| 388 // - |Shutdown()| was called. | 388 // - |Shutdown()| was called. |
| 389 do { | 389 do { |
| 390 switch (io_result) { | 390 switch (io_result) { |
| 391 case IO_SUCCEEDED: | 391 case IO_SUCCEEDED: |
| 392 break; | 392 break; |
| 393 case IO_FAILED_SHUTDOWN: | 393 case IO_FAILED_SHUTDOWN: |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 430 // a single message. Risks: slower, more complex if we want to avoid lots of | 430 // a single message. Risks: slower, more complex if we want to avoid lots of |
| 431 // copying. ii. Keep reading until there's no more data and dispatch all the | 431 // copying. ii. Keep reading until there's no more data and dispatch all the |
| 432 // messages we can. Risks: starvation of other users of the message loop.) | 432 // messages we can. Risks: starvation of other users of the message loop.) |
| 433 // (2) If we didn't max out |kReadSize|, stop reading for now. | 433 // (2) If we didn't max out |kReadSize|, stop reading for now. |
| 434 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; | 434 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
| 435 bytes_read = 0; | 435 bytes_read = 0; |
| 436 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); | 436 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); |
| 437 } while (io_result != IO_PENDING); | 437 } while (io_result != IO_PENDING); |
| 438 } | 438 } |
| 439 | 439 |
| 440 void RawChannel::OnWriteCompleted(IOResult io_result, | 440 void RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
| 441 size_t platform_handles_written, | 441 size_t platform_handles_written, |
| 442 size_t bytes_written) { | 442 size_t bytes_written) { |
| 443 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 443 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 444 write_lock_.AssertAcquired(); |
| 444 DCHECK_NE(io_result, IO_PENDING); | 445 DCHECK_NE(io_result, IO_PENDING); |
| 445 | 446 |
| 446 bool did_fail = false; | 447 bool did_fail = !OnWriteCompletedInternalNoLock( |
| 447 { | 448 io_result, platform_handles_written, bytes_written); |
| 448 base::AutoLock locker(write_lock_); | 449 if (did_fail) |
| 449 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, | |
| 450 bytes_written); | |
| 451 } | |
| 452 | |
| 453 if (did_fail) { | |
| 454 LockAndCallOnError(Delegate::ERROR_WRITE); | 450 LockAndCallOnError(Delegate::ERROR_WRITE); |
| 455 return; // |this| may have been destroyed in |CallOnError()|. | |
| 456 } | |
| 457 } | 451 } |
| 458 | 452 |
| 459 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read, | 453 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read, |
| 460 std::vector<char>* buffer) { | 454 std::vector<char>* buffer) { |
| 461 read_buffer_->num_valid_bytes_ += additional_bytes_read; | 455 read_buffer_->num_valid_bytes_ += additional_bytes_read; |
| 462 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_); | 456 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_); |
| 463 read_buffer_->buffer_.swap(*buffer); | 457 read_buffer_->buffer_.swap(*buffer); |
| 464 read_buffer_->num_valid_bytes_ = 0; | 458 read_buffer_->num_valid_bytes_ = 0; |
| 465 } | 459 } |
| 466 | 460 |
| 467 void RawChannel::SerializeWriteBuffer( | 461 void RawChannel::SerializeWriteBuffer( |
| 468 std::vector<char>* buffer, | |
| 469 size_t additional_bytes_written, | 462 size_t additional_bytes_written, |
| 470 size_t additional_platform_handles_written) { | 463 size_t additional_platform_handles_written, |
| 464 std::vector<char>* buffer) { |
| 471 if (write_buffer_->IsEmpty()) { | 465 if (write_buffer_->IsEmpty()) { |
| 472 DCHECK_EQ(0u, additional_bytes_written); | 466 DCHECK_EQ(0u, additional_bytes_written); |
| 473 DCHECK_EQ(0u, additional_platform_handles_written); | 467 DCHECK_EQ(0u, additional_platform_handles_written); |
| 474 return; | 468 return; |
| 475 } | 469 } |
| 476 | 470 |
| 477 UpdateWriteBuffer( | 471 UpdateWriteBuffer( |
| 478 additional_platform_handles_written, additional_bytes_written); | 472 additional_platform_handles_written, additional_bytes_written); |
| 479 while (!write_buffer_->message_queue_.IsEmpty()) { | 473 while (!write_buffer_->message_queue_.IsEmpty()) { |
| 480 SerializePlatformHandles(); | 474 SerializePlatformHandles(); |
| 481 std::vector<WriteBuffer::Buffer> buffers; | 475 std::vector<WriteBuffer::Buffer> buffers; |
| 482 write_buffer_no_lock()->GetBuffers(&buffers); | 476 write_buffer_no_lock()->GetBuffers(&buffers); |
| 483 for (size_t i = 0; i < buffers.size(); ++i) { | 477 for (size_t i = 0; i < buffers.size(); ++i) { |
| 484 buffer->insert(buffer->end(), buffers[i].addr, | 478 buffer->insert(buffer->end(), buffers[i].addr, |
| 485 buffers[i].addr + buffers[i].size); | 479 buffers[i].addr + buffers[i].size); |
| 486 } | 480 } |
| 487 write_buffer_->message_queue_.DiscardMessage(); | 481 write_buffer_->message_queue_.DiscardMessage(); |
| 488 } | 482 } |
| 489 } | 483 } |
| 490 | 484 |
| 491 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 485 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
| 492 write_lock_.AssertAcquired(); | 486 write_lock_.AssertAcquired(); |
| 493 DCHECK(HandleForDebuggingNoLock().is_valid()); | 487 DCHECK(HandleForDebuggingNoLock().is_valid()); |
| 494 write_buffer_->message_queue_.AddMessage(message.Pass()); | 488 write_buffer_->message_queue_.AddMessage(message.Pass()); |
| 495 } | 489 } |
| 496 | 490 |
| 497 bool RawChannel::OnReadMessageForRawChannel( | 491 bool RawChannel::OnReadMessageForRawChannel( |
| 498 const MessageInTransit::View& message_view) { | 492 const MessageInTransit::View& message_view) { |
| 499 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { | 493 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { |
| 494 pending_error_ = true; |
| 500 message_loop_for_io_->PostTask( | 495 message_loop_for_io_->PostTask( |
| 501 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, | 496 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, |
| 502 weak_ptr_factory_.GetWeakPtr(), | 497 weak_ptr_factory_.GetWeakPtr(), |
| 503 Delegate::ERROR_READ_SHUTDOWN)); | 498 Delegate::ERROR_READ_SHUTDOWN)); |
| 504 return true; | 499 return true; |
| 505 } | 500 } |
| 506 | 501 |
| 507 // No non-implementation specific |RawChannel| control messages. | 502 // No non-implementation specific |RawChannel| control messages. |
| 508 LOG(ERROR) << "Invalid control message (type " << message_view.type() | 503 LOG(ERROR) << "Invalid control message (type " << message_view.type() |
| 509 << ")"; | 504 << ")"; |
| (...skipping 30 matching lines...) Expand all Loading... |
| 540 FROM_HERE, | 535 FROM_HERE, |
| 541 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | 536 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); |
| 542 } | 537 } |
| 543 } | 538 } |
| 544 | 539 |
| 545 void RawChannel::LockAndCallOnError(Delegate::Error error) { | 540 void RawChannel::LockAndCallOnError(Delegate::Error error) { |
| 546 base::AutoLock locker(read_lock_); | 541 base::AutoLock locker(read_lock_); |
| 547 CallOnError(error); | 542 CallOnError(error); |
| 548 } | 543 } |
| 549 | 544 |
| 550 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, | 545 bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result, |
| 551 size_t platform_handles_written, | 546 size_t platform_handles_written, |
| 552 size_t bytes_written) { | 547 size_t bytes_written) { |
| 553 write_lock_.AssertAcquired(); | 548 write_lock_.AssertAcquired(); |
| 554 | 549 |
| 555 DCHECK(!write_buffer_->message_queue_.IsEmpty()); | 550 DCHECK(!write_buffer_->message_queue_.IsEmpty()); |
| 556 | 551 |
| 557 if (io_result == IO_SUCCEEDED) { | 552 if (io_result == IO_SUCCEEDED) { |
| 558 UpdateWriteBuffer(platform_handles_written, bytes_written); | 553 UpdateWriteBuffer(platform_handles_written, bytes_written); |
| 559 if (write_buffer_->message_queue_.IsEmpty()) | 554 if (write_buffer_->message_queue_.IsEmpty()) |
| 560 return true; | 555 return true; |
| 561 | 556 |
| 562 // Schedule the next write. | 557 // Schedule the next write. |
| (...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 673 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); | 668 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); |
| 674 if (write_buffer_->data_offset_ >= message->total_size()) { | 669 if (write_buffer_->data_offset_ >= message->total_size()) { |
| 675 // Complete write. | 670 // Complete write. |
| 676 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); | 671 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); |
| 677 write_buffer_->message_queue_.DiscardMessage(); | 672 write_buffer_->message_queue_.DiscardMessage(); |
| 678 write_buffer_->platform_handles_offset_ = 0; | 673 write_buffer_->platform_handles_offset_ = 0; |
| 679 write_buffer_->data_offset_ = 0; | 674 write_buffer_->data_offset_ = 0; |
| 680 } | 675 } |
| 681 } | 676 } |
| 682 | 677 |
| 678 void RawChannel::CallOnReadCompleted(IOResult io_result, size_t bytes_read) { |
| 679 base::AutoLock locker(read_lock()); |
| 680 OnReadCompletedNoLock(io_result, bytes_read); |
| 681 } |
| 682 |
| 683 } // namespace edk | 683 } // namespace edk |
| 684 } // namespace mojo | 684 } // namespace mojo |
| OLD | NEW |