Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(353)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 1403033003: Last set of fixes to make the src/mojo/edk pass the page cycler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: another small fix Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 10
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 } 163 }
164 164
165 // RawChannel ------------------------------------------------------------------ 165 // RawChannel ------------------------------------------------------------------
166 166
167 RawChannel::RawChannel() 167 RawChannel::RawChannel()
168 : message_loop_for_io_(nullptr), 168 : message_loop_for_io_(nullptr),
169 delegate_(nullptr), 169 delegate_(nullptr),
170 write_ready_(false), 170 write_ready_(false),
171 write_stopped_(false), 171 write_stopped_(false),
172 error_occurred_(false), 172 error_occurred_(false),
173 pending_error_(false),
173 weak_ptr_factory_(this) { 174 weak_ptr_factory_(this) {
174 read_buffer_.reset(new ReadBuffer); 175 read_buffer_.reset(new ReadBuffer);
175 write_buffer_.reset(new WriteBuffer()); 176 write_buffer_.reset(new WriteBuffer());
176 } 177 }
177 178
178 RawChannel::~RawChannel() { 179 RawChannel::~RawChannel() {
179 DCHECK(!read_buffer_); 180 DCHECK(!read_buffer_);
180 DCHECK(!write_buffer_); 181 DCHECK(!write_buffer_);
181 182
182 // Only want to decrement counter if Init was called. 183 // Only want to decrement counter if Init was called.
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after
323 324
324 bool RawChannel::SendQueuedMessagesNoLock() { 325 bool RawChannel::SendQueuedMessagesNoLock() {
325 DCHECK_EQ(write_buffer_->data_offset_, 0u); 326 DCHECK_EQ(write_buffer_->data_offset_, 0u);
326 327
327 size_t platform_handles_written = 0; 328 size_t platform_handles_written = 0;
328 size_t bytes_written = 0; 329 size_t bytes_written = 0;
329 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); 330 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
330 if (io_result == IO_PENDING) 331 if (io_result == IO_PENDING)
331 return true; 332 return true;
332 333
333 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, 334 bool result = OnWriteCompletedInternalNoLock(
334 bytes_written); 335 io_result, platform_handles_written, bytes_written);
335 if (!result) { 336 if (!result) {
336 // Even if we're on the I/O thread, don't call |OnError()| in the nested 337 // Even if we're on the I/O thread, don't call |OnError()| in the nested
337 // context. 338 // context.
339 pending_error_ = true;
yzshen1 2015/10/14 18:23:45 pending_error_ is never reset to false, is that in
jam 2015/10/14 20:01:44 yep
338 message_loop_for_io_->PostTask( 340 message_loop_for_io_->PostTask(
339 FROM_HERE, 341 FROM_HERE,
340 base::Bind(&RawChannel::LockAndCallOnError, 342 base::Bind(&RawChannel::LockAndCallOnError,
341 weak_ptr_factory_.GetWeakPtr(), 343 weak_ptr_factory_.GetWeakPtr(),
342 Delegate::ERROR_WRITE)); 344 Delegate::ERROR_WRITE));
343 } 345 }
344 346
345 return result; 347 return result;
346 } 348 }
347 349
(...skipping 22 matching lines...) Expand all
370 scoped_ptr<MessageInTransit> message(new MessageInTransit( 372 scoped_ptr<MessageInTransit> message(new MessageInTransit(
371 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes, 373 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes,
372 static_cast<const char*>(serialized_write_buffer) + offset)); 374 static_cast<const char*>(serialized_write_buffer) + offset));
373 write_buffer_->message_queue_.AddMessage(message.Pass()); 375 write_buffer_->message_queue_.AddMessage(message.Pass());
374 offset += message_num_bytes; 376 offset += message_num_bytes;
375 } 377 }
376 } 378 }
377 } 379 }
378 380
379 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { 381 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
382 base::AutoLock locker(read_lock_);
383 OnReadCompletedNoLock(io_result, bytes_read);
384 }
385
386 void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) {
380 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 387 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
381 388 read_lock_.AssertAcquired();
382 base::AutoLock locker(read_lock_);
383
384 // Keep reading data in a loop, and dispatch messages if enough data is 389 // Keep reading data in a loop, and dispatch messages if enough data is
385 // received. Exit the loop if any of the following happens: 390 // received. Exit the loop if any of the following happens:
386 // - one or more messages were dispatched; 391 // - one or more messages were dispatched;
387 // - the last read failed, was a partial read or would block; 392 // - the last read failed, was a partial read or would block;
388 // - |Shutdown()| was called. 393 // - |Shutdown()| was called.
389 do { 394 do {
390 switch (io_result) { 395 switch (io_result) {
391 case IO_SUCCEEDED: 396 case IO_SUCCEEDED:
392 break; 397 break;
393 case IO_FAILED_SHUTDOWN: 398 case IO_FAILED_SHUTDOWN:
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
433 // (2) If we didn't max out |kReadSize|, stop reading for now. 438 // (2) If we didn't max out |kReadSize|, stop reading for now.
434 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; 439 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
435 bytes_read = 0; 440 bytes_read = 0;
436 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); 441 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
437 } while (io_result != IO_PENDING); 442 } while (io_result != IO_PENDING);
438 } 443 }
439 444
440 void RawChannel::OnWriteCompleted(IOResult io_result, 445 void RawChannel::OnWriteCompleted(IOResult io_result,
441 size_t platform_handles_written, 446 size_t platform_handles_written,
442 size_t bytes_written) { 447 size_t bytes_written) {
448 base::AutoLock locker(write_lock_);
449 OnWriteCompletedNoLock(io_result, platform_handles_written, bytes_written);
450 }
451
452 void RawChannel::OnWriteCompletedNoLock(IOResult io_result,
453 size_t platform_handles_written,
454 size_t bytes_written) {
443 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 455 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
456 write_lock_.AssertAcquired();
444 DCHECK_NE(io_result, IO_PENDING); 457 DCHECK_NE(io_result, IO_PENDING);
445 458
446 bool did_fail = false; 459 bool did_fail = !OnWriteCompletedInternalNoLock(
447 { 460 io_result, platform_handles_written, bytes_written);
448 base::AutoLock locker(write_lock_); 461 if (did_fail)
449 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
450 bytes_written);
451 }
452
453 if (did_fail) {
454 LockAndCallOnError(Delegate::ERROR_WRITE); 462 LockAndCallOnError(Delegate::ERROR_WRITE);
455 return; // |this| may have been destroyed in |CallOnError()|.
456 }
457 } 463 }
458 464
459 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read, 465 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read,
460 std::vector<char>* buffer) { 466 std::vector<char>* buffer) {
461 read_buffer_->num_valid_bytes_ += additional_bytes_read; 467 read_buffer_->num_valid_bytes_ += additional_bytes_read;
462 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_); 468 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_);
463 read_buffer_->buffer_.swap(*buffer); 469 read_buffer_->buffer_.swap(*buffer);
464 read_buffer_->num_valid_bytes_ = 0; 470 read_buffer_->num_valid_bytes_ = 0;
465 } 471 }
466 472
467 void RawChannel::SerializeWriteBuffer( 473 void RawChannel::SerializeWriteBuffer(
468 std::vector<char>* buffer,
469 size_t additional_bytes_written, 474 size_t additional_bytes_written,
470 size_t additional_platform_handles_written) { 475 size_t additional_platform_handles_written,
476 std::vector<char>* buffer) {
471 if (write_buffer_->IsEmpty()) { 477 if (write_buffer_->IsEmpty()) {
472 DCHECK_EQ(0u, additional_bytes_written); 478 DCHECK_EQ(0u, additional_bytes_written);
473 DCHECK_EQ(0u, additional_platform_handles_written); 479 DCHECK_EQ(0u, additional_platform_handles_written);
474 return; 480 return;
475 } 481 }
476 482
477 UpdateWriteBuffer( 483 UpdateWriteBuffer(
478 additional_platform_handles_written, additional_bytes_written); 484 additional_platform_handles_written, additional_bytes_written);
479 while (!write_buffer_->message_queue_.IsEmpty()) { 485 while (!write_buffer_->message_queue_.IsEmpty()) {
480 SerializePlatformHandles(); 486 SerializePlatformHandles();
481 std::vector<WriteBuffer::Buffer> buffers; 487 std::vector<WriteBuffer::Buffer> buffers;
482 write_buffer_no_lock()->GetBuffers(&buffers); 488 write_buffer_no_lock()->GetBuffers(&buffers);
483 for (size_t i = 0; i < buffers.size(); ++i) { 489 for (size_t i = 0; i < buffers.size(); ++i) {
484 buffer->insert(buffer->end(), buffers[i].addr, 490 buffer->insert(buffer->end(), buffers[i].addr,
485 buffers[i].addr + buffers[i].size); 491 buffers[i].addr + buffers[i].size);
486 } 492 }
487 write_buffer_->message_queue_.DiscardMessage(); 493 write_buffer_->message_queue_.DiscardMessage();
488 } 494 }
489 } 495 }
490 496
491 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { 497 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
492 write_lock_.AssertAcquired(); 498 write_lock_.AssertAcquired();
493 DCHECK(HandleForDebuggingNoLock().is_valid()); 499 DCHECK(HandleForDebuggingNoLock().is_valid());
494 write_buffer_->message_queue_.AddMessage(message.Pass()); 500 write_buffer_->message_queue_.AddMessage(message.Pass());
495 } 501 }
496 502
497 bool RawChannel::OnReadMessageForRawChannel( 503 bool RawChannel::OnReadMessageForRawChannel(
498 const MessageInTransit::View& message_view) { 504 const MessageInTransit::View& message_view) {
499 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { 505 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) {
506 pending_error_ = true;
500 message_loop_for_io_->PostTask( 507 message_loop_for_io_->PostTask(
501 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, 508 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError,
502 weak_ptr_factory_.GetWeakPtr(), 509 weak_ptr_factory_.GetWeakPtr(),
503 Delegate::ERROR_READ_SHUTDOWN)); 510 Delegate::ERROR_READ_SHUTDOWN));
504 return true; 511 return true;
505 } 512 }
506 513
507 // No non-implementation specific |RawChannel| control messages. 514 // No non-implementation specific |RawChannel| control messages.
508 LOG(ERROR) << "Invalid control message (type " << message_view.type() 515 LOG(ERROR) << "Invalid control message (type " << message_view.type()
509 << ")"; 516 << ")";
(...skipping 13 matching lines...) Expand all
523 case IO_PENDING: 530 case IO_PENDING:
524 NOTREACHED(); 531 NOTREACHED();
525 break; 532 break;
526 } 533 }
527 return Delegate::ERROR_READ_UNKNOWN; 534 return Delegate::ERROR_READ_UNKNOWN;
528 } 535 }
529 536
530 void RawChannel::CallOnError(Delegate::Error error) { 537 void RawChannel::CallOnError(Delegate::Error error) {
531 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 538 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
532 read_lock_.AssertAcquired(); 539 read_lock_.AssertAcquired();
540 bool called_on_error_before = error_occurred_;
533 error_occurred_ = true; 541 error_occurred_ = true;
534 if (delegate_) { 542 if (delegate_) {
535 delegate_->OnError(error); 543 // We only want to call OnError once. The delegate may have posted a task
544 // to call Shutdown though, so we don't want to null delegate_ the first
545 // time and have two shutdown calls.
546 if (!called_on_error_before)
547 delegate_->OnError(error);
536 } else { 548 } else {
537 // We depend on delegate to delete since it could be waiting to call 549 // We depend on delegate to delete since it could be waiting to call
538 // ReleaseHandle. 550 // ReleaseHandle.
539 base::MessageLoop::current()->PostTask( 551 base::MessageLoop::current()->PostTask(
540 FROM_HERE, 552 FROM_HERE,
541 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); 553 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
542 } 554 }
543 } 555 }
544 556
545 void RawChannel::LockAndCallOnError(Delegate::Error error) { 557 void RawChannel::LockAndCallOnError(Delegate::Error error) {
546 base::AutoLock locker(read_lock_); 558 base::AutoLock locker(read_lock_);
547 CallOnError(error); 559 CallOnError(error);
548 } 560 }
549 561
550 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, 562 bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result,
551 size_t platform_handles_written, 563 size_t platform_handles_written,
552 size_t bytes_written) { 564 size_t bytes_written) {
553 write_lock_.AssertAcquired(); 565 write_lock_.AssertAcquired();
554 566
555 DCHECK(!write_buffer_->message_queue_.IsEmpty()); 567 DCHECK(!write_buffer_->message_queue_.IsEmpty());
556 568
557 if (io_result == IO_SUCCEEDED) { 569 if (io_result == IO_SUCCEEDED) {
558 UpdateWriteBuffer(platform_handles_written, bytes_written); 570 UpdateWriteBuffer(platform_handles_written, bytes_written);
559 if (write_buffer_->message_queue_.IsEmpty()) 571 if (write_buffer_->message_queue_.IsEmpty())
560 return true; 572 return true;
561 573
562 // Schedule the next write. 574 // Schedule the next write.
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
675 // Complete write. 687 // Complete write.
676 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); 688 CHECK_EQ(write_buffer_->data_offset_, message->total_size());
677 write_buffer_->message_queue_.DiscardMessage(); 689 write_buffer_->message_queue_.DiscardMessage();
678 write_buffer_->platform_handles_offset_ = 0; 690 write_buffer_->platform_handles_offset_ = 0;
679 write_buffer_->data_offset_ = 0; 691 write_buffer_->data_offset_ = 0;
680 } 692 }
681 } 693 }
682 694
683 } // namespace edk 695 } // namespace edk
684 } // namespace mojo 696 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698