OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
163 } | 163 } |
164 | 164 |
165 // RawChannel ------------------------------------------------------------------ | 165 // RawChannel ------------------------------------------------------------------ |
166 | 166 |
167 RawChannel::RawChannel() | 167 RawChannel::RawChannel() |
168 : message_loop_for_io_(nullptr), | 168 : message_loop_for_io_(nullptr), |
169 delegate_(nullptr), | 169 delegate_(nullptr), |
170 write_ready_(false), | 170 write_ready_(false), |
171 write_stopped_(false), | 171 write_stopped_(false), |
172 error_occurred_(false), | 172 error_occurred_(false), |
173 pending_error_(false), | |
173 weak_ptr_factory_(this) { | 174 weak_ptr_factory_(this) { |
174 read_buffer_.reset(new ReadBuffer); | 175 read_buffer_.reset(new ReadBuffer); |
175 write_buffer_.reset(new WriteBuffer()); | 176 write_buffer_.reset(new WriteBuffer()); |
176 } | 177 } |
177 | 178 |
178 RawChannel::~RawChannel() { | 179 RawChannel::~RawChannel() { |
179 DCHECK(!read_buffer_); | 180 DCHECK(!read_buffer_); |
180 DCHECK(!write_buffer_); | 181 DCHECK(!write_buffer_); |
181 | 182 |
182 // Only want to decrement counter if Init was called. | 183 // Only want to decrement counter if Init was called. |
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
323 | 324 |
324 bool RawChannel::SendQueuedMessagesNoLock() { | 325 bool RawChannel::SendQueuedMessagesNoLock() { |
325 DCHECK_EQ(write_buffer_->data_offset_, 0u); | 326 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
326 | 327 |
327 size_t platform_handles_written = 0; | 328 size_t platform_handles_written = 0; |
328 size_t bytes_written = 0; | 329 size_t bytes_written = 0; |
329 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 330 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
330 if (io_result == IO_PENDING) | 331 if (io_result == IO_PENDING) |
331 return true; | 332 return true; |
332 | 333 |
333 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, | 334 bool result = OnWriteCompletedInternalNoLock( |
334 bytes_written); | 335 io_result, platform_handles_written, bytes_written); |
335 if (!result) { | 336 if (!result) { |
336 // Even if we're on the I/O thread, don't call |OnError()| in the nested | 337 // Even if we're on the I/O thread, don't call |OnError()| in the nested |
337 // context. | 338 // context. |
339 pending_error_ = true; | |
yzshen1
2015/10/14 18:23:45
pending_error_ is never reset to false, is that in
jam
2015/10/14 20:01:44
yep
| |
338 message_loop_for_io_->PostTask( | 340 message_loop_for_io_->PostTask( |
339 FROM_HERE, | 341 FROM_HERE, |
340 base::Bind(&RawChannel::LockAndCallOnError, | 342 base::Bind(&RawChannel::LockAndCallOnError, |
341 weak_ptr_factory_.GetWeakPtr(), | 343 weak_ptr_factory_.GetWeakPtr(), |
342 Delegate::ERROR_WRITE)); | 344 Delegate::ERROR_WRITE)); |
343 } | 345 } |
344 | 346 |
345 return result; | 347 return result; |
346 } | 348 } |
347 | 349 |
(...skipping 22 matching lines...) Expand all Loading... | |
370 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 372 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
371 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes, | 373 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes, |
372 static_cast<const char*>(serialized_write_buffer) + offset)); | 374 static_cast<const char*>(serialized_write_buffer) + offset)); |
373 write_buffer_->message_queue_.AddMessage(message.Pass()); | 375 write_buffer_->message_queue_.AddMessage(message.Pass()); |
374 offset += message_num_bytes; | 376 offset += message_num_bytes; |
375 } | 377 } |
376 } | 378 } |
377 } | 379 } |
378 | 380 |
379 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { | 381 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
382 base::AutoLock locker(read_lock_); | |
383 OnReadCompletedNoLock(io_result, bytes_read); | |
384 } | |
385 | |
386 void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) { | |
380 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 387 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
381 | 388 read_lock_.AssertAcquired(); |
382 base::AutoLock locker(read_lock_); | |
383 | |
384 // Keep reading data in a loop, and dispatch messages if enough data is | 389 // Keep reading data in a loop, and dispatch messages if enough data is |
385 // received. Exit the loop if any of the following happens: | 390 // received. Exit the loop if any of the following happens: |
386 // - one or more messages were dispatched; | 391 // - one or more messages were dispatched; |
387 // - the last read failed, was a partial read or would block; | 392 // - the last read failed, was a partial read or would block; |
388 // - |Shutdown()| was called. | 393 // - |Shutdown()| was called. |
389 do { | 394 do { |
390 switch (io_result) { | 395 switch (io_result) { |
391 case IO_SUCCEEDED: | 396 case IO_SUCCEEDED: |
392 break; | 397 break; |
393 case IO_FAILED_SHUTDOWN: | 398 case IO_FAILED_SHUTDOWN: |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
433 // (2) If we didn't max out |kReadSize|, stop reading for now. | 438 // (2) If we didn't max out |kReadSize|, stop reading for now. |
434 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; | 439 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
435 bytes_read = 0; | 440 bytes_read = 0; |
436 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); | 441 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); |
437 } while (io_result != IO_PENDING); | 442 } while (io_result != IO_PENDING); |
438 } | 443 } |
439 | 444 |
440 void RawChannel::OnWriteCompleted(IOResult io_result, | 445 void RawChannel::OnWriteCompleted(IOResult io_result, |
441 size_t platform_handles_written, | 446 size_t platform_handles_written, |
442 size_t bytes_written) { | 447 size_t bytes_written) { |
448 base::AutoLock locker(write_lock_); | |
449 OnWriteCompletedNoLock(io_result, platform_handles_written, bytes_written); | |
450 } | |
451 | |
452 void RawChannel::OnWriteCompletedNoLock(IOResult io_result, | |
453 size_t platform_handles_written, | |
454 size_t bytes_written) { | |
443 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 455 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
456 write_lock_.AssertAcquired(); | |
444 DCHECK_NE(io_result, IO_PENDING); | 457 DCHECK_NE(io_result, IO_PENDING); |
445 | 458 |
446 bool did_fail = false; | 459 bool did_fail = !OnWriteCompletedInternalNoLock( |
447 { | 460 io_result, platform_handles_written, bytes_written); |
448 base::AutoLock locker(write_lock_); | 461 if (did_fail) |
449 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, | |
450 bytes_written); | |
451 } | |
452 | |
453 if (did_fail) { | |
454 LockAndCallOnError(Delegate::ERROR_WRITE); | 462 LockAndCallOnError(Delegate::ERROR_WRITE); |
455 return; // |this| may have been destroyed in |CallOnError()|. | |
456 } | |
457 } | 463 } |
458 | 464 |
459 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read, | 465 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read, |
460 std::vector<char>* buffer) { | 466 std::vector<char>* buffer) { |
461 read_buffer_->num_valid_bytes_ += additional_bytes_read; | 467 read_buffer_->num_valid_bytes_ += additional_bytes_read; |
462 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_); | 468 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_); |
463 read_buffer_->buffer_.swap(*buffer); | 469 read_buffer_->buffer_.swap(*buffer); |
464 read_buffer_->num_valid_bytes_ = 0; | 470 read_buffer_->num_valid_bytes_ = 0; |
465 } | 471 } |
466 | 472 |
467 void RawChannel::SerializeWriteBuffer( | 473 void RawChannel::SerializeWriteBuffer( |
468 std::vector<char>* buffer, | |
469 size_t additional_bytes_written, | 474 size_t additional_bytes_written, |
470 size_t additional_platform_handles_written) { | 475 size_t additional_platform_handles_written, |
476 std::vector<char>* buffer) { | |
471 if (write_buffer_->IsEmpty()) { | 477 if (write_buffer_->IsEmpty()) { |
472 DCHECK_EQ(0u, additional_bytes_written); | 478 DCHECK_EQ(0u, additional_bytes_written); |
473 DCHECK_EQ(0u, additional_platform_handles_written); | 479 DCHECK_EQ(0u, additional_platform_handles_written); |
474 return; | 480 return; |
475 } | 481 } |
476 | 482 |
477 UpdateWriteBuffer( | 483 UpdateWriteBuffer( |
478 additional_platform_handles_written, additional_bytes_written); | 484 additional_platform_handles_written, additional_bytes_written); |
479 while (!write_buffer_->message_queue_.IsEmpty()) { | 485 while (!write_buffer_->message_queue_.IsEmpty()) { |
480 SerializePlatformHandles(); | 486 SerializePlatformHandles(); |
481 std::vector<WriteBuffer::Buffer> buffers; | 487 std::vector<WriteBuffer::Buffer> buffers; |
482 write_buffer_no_lock()->GetBuffers(&buffers); | 488 write_buffer_no_lock()->GetBuffers(&buffers); |
483 for (size_t i = 0; i < buffers.size(); ++i) { | 489 for (size_t i = 0; i < buffers.size(); ++i) { |
484 buffer->insert(buffer->end(), buffers[i].addr, | 490 buffer->insert(buffer->end(), buffers[i].addr, |
485 buffers[i].addr + buffers[i].size); | 491 buffers[i].addr + buffers[i].size); |
486 } | 492 } |
487 write_buffer_->message_queue_.DiscardMessage(); | 493 write_buffer_->message_queue_.DiscardMessage(); |
488 } | 494 } |
489 } | 495 } |
490 | 496 |
491 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 497 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
492 write_lock_.AssertAcquired(); | 498 write_lock_.AssertAcquired(); |
493 DCHECK(HandleForDebuggingNoLock().is_valid()); | 499 DCHECK(HandleForDebuggingNoLock().is_valid()); |
494 write_buffer_->message_queue_.AddMessage(message.Pass()); | 500 write_buffer_->message_queue_.AddMessage(message.Pass()); |
495 } | 501 } |
496 | 502 |
497 bool RawChannel::OnReadMessageForRawChannel( | 503 bool RawChannel::OnReadMessageForRawChannel( |
498 const MessageInTransit::View& message_view) { | 504 const MessageInTransit::View& message_view) { |
499 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { | 505 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { |
506 pending_error_ = true; | |
500 message_loop_for_io_->PostTask( | 507 message_loop_for_io_->PostTask( |
501 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, | 508 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, |
502 weak_ptr_factory_.GetWeakPtr(), | 509 weak_ptr_factory_.GetWeakPtr(), |
503 Delegate::ERROR_READ_SHUTDOWN)); | 510 Delegate::ERROR_READ_SHUTDOWN)); |
504 return true; | 511 return true; |
505 } | 512 } |
506 | 513 |
507 // No non-implementation specific |RawChannel| control messages. | 514 // No non-implementation specific |RawChannel| control messages. |
508 LOG(ERROR) << "Invalid control message (type " << message_view.type() | 515 LOG(ERROR) << "Invalid control message (type " << message_view.type() |
509 << ")"; | 516 << ")"; |
(...skipping 13 matching lines...) Expand all Loading... | |
523 case IO_PENDING: | 530 case IO_PENDING: |
524 NOTREACHED(); | 531 NOTREACHED(); |
525 break; | 532 break; |
526 } | 533 } |
527 return Delegate::ERROR_READ_UNKNOWN; | 534 return Delegate::ERROR_READ_UNKNOWN; |
528 } | 535 } |
529 | 536 |
530 void RawChannel::CallOnError(Delegate::Error error) { | 537 void RawChannel::CallOnError(Delegate::Error error) { |
531 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 538 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
532 read_lock_.AssertAcquired(); | 539 read_lock_.AssertAcquired(); |
540 bool called_on_error_before = error_occurred_; | |
533 error_occurred_ = true; | 541 error_occurred_ = true; |
534 if (delegate_) { | 542 if (delegate_) { |
535 delegate_->OnError(error); | 543 // We only want to call OnError once. The delegate may have posted a task |
544 // to call Shutdown though, so we don't want to null delegate_ the first | |
545 // time and have two shutdown calls. | |
546 if (!called_on_error_before) | |
547 delegate_->OnError(error); | |
536 } else { | 548 } else { |
537 // We depend on delegate to delete since it could be waiting to call | 549 // We depend on delegate to delete since it could be waiting to call |
538 // ReleaseHandle. | 550 // ReleaseHandle. |
539 base::MessageLoop::current()->PostTask( | 551 base::MessageLoop::current()->PostTask( |
540 FROM_HERE, | 552 FROM_HERE, |
541 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | 553 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); |
542 } | 554 } |
543 } | 555 } |
544 | 556 |
545 void RawChannel::LockAndCallOnError(Delegate::Error error) { | 557 void RawChannel::LockAndCallOnError(Delegate::Error error) { |
546 base::AutoLock locker(read_lock_); | 558 base::AutoLock locker(read_lock_); |
547 CallOnError(error); | 559 CallOnError(error); |
548 } | 560 } |
549 | 561 |
550 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, | 562 bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result, |
551 size_t platform_handles_written, | 563 size_t platform_handles_written, |
552 size_t bytes_written) { | 564 size_t bytes_written) { |
553 write_lock_.AssertAcquired(); | 565 write_lock_.AssertAcquired(); |
554 | 566 |
555 DCHECK(!write_buffer_->message_queue_.IsEmpty()); | 567 DCHECK(!write_buffer_->message_queue_.IsEmpty()); |
556 | 568 |
557 if (io_result == IO_SUCCEEDED) { | 569 if (io_result == IO_SUCCEEDED) { |
558 UpdateWriteBuffer(platform_handles_written, bytes_written); | 570 UpdateWriteBuffer(platform_handles_written, bytes_written); |
559 if (write_buffer_->message_queue_.IsEmpty()) | 571 if (write_buffer_->message_queue_.IsEmpty()) |
560 return true; | 572 return true; |
561 | 573 |
562 // Schedule the next write. | 574 // Schedule the next write. |
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
675 // Complete write. | 687 // Complete write. |
676 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); | 688 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); |
677 write_buffer_->message_queue_.DiscardMessage(); | 689 write_buffer_->message_queue_.DiscardMessage(); |
678 write_buffer_->platform_handles_offset_ = 0; | 690 write_buffer_->platform_handles_offset_ = 0; |
679 write_buffer_->data_offset_ = 0; | 691 write_buffer_->data_offset_ = 0; |
680 } | 692 } |
681 } | 693 } |
682 | 694 |
683 } // namespace edk | 695 } // namespace edk |
684 } // namespace mojo | 696 } // namespace mojo |
OLD | NEW |