Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(62)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 1403033003: Last set of fixes to make the src/mojo/edk pass the page cycler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: nit Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 10
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 } 163 }
164 164
165 // RawChannel ------------------------------------------------------------------ 165 // RawChannel ------------------------------------------------------------------
166 166
167 RawChannel::RawChannel() 167 RawChannel::RawChannel()
168 : message_loop_for_io_(nullptr), 168 : message_loop_for_io_(nullptr),
169 delegate_(nullptr), 169 delegate_(nullptr),
170 write_ready_(false), 170 write_ready_(false),
171 write_stopped_(false), 171 write_stopped_(false),
172 error_occurred_(false), 172 error_occurred_(false),
173 pending_error_(false),
173 weak_ptr_factory_(this) { 174 weak_ptr_factory_(this) {
174 read_buffer_.reset(new ReadBuffer); 175 read_buffer_.reset(new ReadBuffer);
175 write_buffer_.reset(new WriteBuffer()); 176 write_buffer_.reset(new WriteBuffer());
176 } 177 }
177 178
178 RawChannel::~RawChannel() { 179 RawChannel::~RawChannel() {
179 DCHECK(!read_buffer_); 180 DCHECK(!read_buffer_);
180 DCHECK(!write_buffer_); 181 DCHECK(!write_buffer_);
181 182
182 // Only want to decrement counter if Init was called. 183 // Only want to decrement counter if Init was called.
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
218 bool did_dispatch_message = false; 219 bool did_dispatch_message = false;
219 bool stop_dispatching = false; 220 bool stop_dispatching = false;
220 DispatchMessages(&did_dispatch_message, &stop_dispatching); 221 DispatchMessages(&did_dispatch_message, &stop_dispatching);
221 } 222 }
222 223
223 IOResult io_result = ScheduleRead(); 224 IOResult io_result = ScheduleRead();
224 if (io_result != IO_PENDING) { 225 if (io_result != IO_PENDING) {
225 // This will notify the delegate about the read failure. Although we're on 226 // This will notify the delegate about the read failure. Although we're on
226 // the I/O thread, don't call it in the nested context. 227 // the I/O thread, don't call it in the nested context.
227 message_loop_for_io_->PostTask( 228 message_loop_for_io_->PostTask(
228 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, 229 FROM_HERE, base::Bind(&RawChannel::CallOnReadCompleted,
229 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); 230 weak_ptr_factory_.GetWeakPtr(), io_result, 0));
230 } 231 }
231 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying 232 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
232 // the delegate), not an initialization failure. 233 // the delegate), not an initialization failure.
233 234
234 write_ready_ = true; 235 write_ready_ = true;
235 write_buffer_->serialized_platform_handle_size_ = 236 write_buffer_->serialized_platform_handle_size_ =
236 GetSerializedPlatformHandleSize(); 237 GetSerializedPlatformHandleSize();
237 if (!write_buffer_->message_queue_.IsEmpty()) 238 if (!write_buffer_->message_queue_.IsEmpty())
238 SendQueuedMessagesNoLock(); 239 SendQueuedMessagesNoLock();
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
323 324
324 bool RawChannel::SendQueuedMessagesNoLock() { 325 bool RawChannel::SendQueuedMessagesNoLock() {
325 DCHECK_EQ(write_buffer_->data_offset_, 0u); 326 DCHECK_EQ(write_buffer_->data_offset_, 0u);
326 327
327 size_t platform_handles_written = 0; 328 size_t platform_handles_written = 0;
328 size_t bytes_written = 0; 329 size_t bytes_written = 0;
329 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); 330 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
330 if (io_result == IO_PENDING) 331 if (io_result == IO_PENDING)
331 return true; 332 return true;
332 333
333 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, 334 bool result = OnWriteCompletedInternalNoLock(
334 bytes_written); 335 io_result, platform_handles_written, bytes_written);
335 if (!result) { 336 if (!result) {
336 // Even if we're on the I/O thread, don't call |OnError()| in the nested 337 // Even if we're on the I/O thread, don't call |OnError()| in the nested
337 // context. 338 // context.
339 pending_error_ = true;
338 message_loop_for_io_->PostTask( 340 message_loop_for_io_->PostTask(
339 FROM_HERE, 341 FROM_HERE,
340 base::Bind(&RawChannel::LockAndCallOnError, 342 base::Bind(&RawChannel::LockAndCallOnError,
341 weak_ptr_factory_.GetWeakPtr(), 343 weak_ptr_factory_.GetWeakPtr(),
342 Delegate::ERROR_WRITE)); 344 Delegate::ERROR_WRITE));
343 } 345 }
344 346
345 return result; 347 return result;
346 } 348 }
347 349
(...skipping 21 matching lines...) Expand all
369 offset); 371 offset);
370 scoped_ptr<MessageInTransit> message(new MessageInTransit( 372 scoped_ptr<MessageInTransit> message(new MessageInTransit(
371 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes, 373 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes,
372 static_cast<const char*>(serialized_write_buffer) + offset)); 374 static_cast<const char*>(serialized_write_buffer) + offset));
373 write_buffer_->message_queue_.AddMessage(message.Pass()); 375 write_buffer_->message_queue_.AddMessage(message.Pass());
374 offset += message_num_bytes; 376 offset += message_num_bytes;
375 } 377 }
376 } 378 }
377 } 379 }
378 380
379 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { 381 void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) {
380 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 382 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
381 383 read_lock_.AssertAcquired();
382 base::AutoLock locker(read_lock_);
383
384 // Keep reading data in a loop, and dispatch messages if enough data is 384 // Keep reading data in a loop, and dispatch messages if enough data is
385 // received. Exit the loop if any of the following happens: 385 // received. Exit the loop if any of the following happens:
386 // - one or more messages were dispatched; 386 // - one or more messages were dispatched;
387 // - the last read failed, was a partial read or would block; 387 // - the last read failed, was a partial read or would block;
388 // - |Shutdown()| was called. 388 // - |Shutdown()| was called.
389 do { 389 do {
390 switch (io_result) { 390 switch (io_result) {
391 case IO_SUCCEEDED: 391 case IO_SUCCEEDED:
392 break; 392 break;
393 case IO_FAILED_SHUTDOWN: 393 case IO_FAILED_SHUTDOWN:
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
430 // a single message. Risks: slower, more complex if we want to avoid lots of 430 // a single message. Risks: slower, more complex if we want to avoid lots of
431 // copying. ii. Keep reading until there's no more data and dispatch all the 431 // copying. ii. Keep reading until there's no more data and dispatch all the
432 // messages we can. Risks: starvation of other users of the message loop.) 432 // messages we can. Risks: starvation of other users of the message loop.)
433 // (2) If we didn't max out |kReadSize|, stop reading for now. 433 // (2) If we didn't max out |kReadSize|, stop reading for now.
434 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; 434 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
435 bytes_read = 0; 435 bytes_read = 0;
436 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); 436 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
437 } while (io_result != IO_PENDING); 437 } while (io_result != IO_PENDING);
438 } 438 }
439 439
440 void RawChannel::OnWriteCompleted(IOResult io_result, 440 void RawChannel::OnWriteCompletedNoLock(IOResult io_result,
441 size_t platform_handles_written, 441 size_t platform_handles_written,
442 size_t bytes_written) { 442 size_t bytes_written) {
443 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 443 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
444 write_lock_.AssertAcquired();
444 DCHECK_NE(io_result, IO_PENDING); 445 DCHECK_NE(io_result, IO_PENDING);
445 446
446 bool did_fail = false; 447 bool did_fail = !OnWriteCompletedInternalNoLock(
447 { 448 io_result, platform_handles_written, bytes_written);
448 base::AutoLock locker(write_lock_); 449 if (did_fail)
449 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
450 bytes_written);
451 }
452
453 if (did_fail) {
454 LockAndCallOnError(Delegate::ERROR_WRITE); 450 LockAndCallOnError(Delegate::ERROR_WRITE);
455 return; // |this| may have been destroyed in |CallOnError()|.
456 }
457 } 451 }
458 452
459 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read, 453 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read,
460 std::vector<char>* buffer) { 454 std::vector<char>* buffer) {
461 read_buffer_->num_valid_bytes_ += additional_bytes_read; 455 read_buffer_->num_valid_bytes_ += additional_bytes_read;
462 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_); 456 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_);
463 read_buffer_->buffer_.swap(*buffer); 457 read_buffer_->buffer_.swap(*buffer);
464 read_buffer_->num_valid_bytes_ = 0; 458 read_buffer_->num_valid_bytes_ = 0;
465 } 459 }
466 460
467 void RawChannel::SerializeWriteBuffer( 461 void RawChannel::SerializeWriteBuffer(
468 std::vector<char>* buffer,
469 size_t additional_bytes_written, 462 size_t additional_bytes_written,
470 size_t additional_platform_handles_written) { 463 size_t additional_platform_handles_written,
464 std::vector<char>* buffer) {
471 if (write_buffer_->IsEmpty()) { 465 if (write_buffer_->IsEmpty()) {
472 DCHECK_EQ(0u, additional_bytes_written); 466 DCHECK_EQ(0u, additional_bytes_written);
473 DCHECK_EQ(0u, additional_platform_handles_written); 467 DCHECK_EQ(0u, additional_platform_handles_written);
474 return; 468 return;
475 } 469 }
476 470
477 UpdateWriteBuffer( 471 UpdateWriteBuffer(
478 additional_platform_handles_written, additional_bytes_written); 472 additional_platform_handles_written, additional_bytes_written);
479 while (!write_buffer_->message_queue_.IsEmpty()) { 473 while (!write_buffer_->message_queue_.IsEmpty()) {
480 SerializePlatformHandles(); 474 SerializePlatformHandles();
481 std::vector<WriteBuffer::Buffer> buffers; 475 std::vector<WriteBuffer::Buffer> buffers;
482 write_buffer_no_lock()->GetBuffers(&buffers); 476 write_buffer_no_lock()->GetBuffers(&buffers);
483 for (size_t i = 0; i < buffers.size(); ++i) { 477 for (size_t i = 0; i < buffers.size(); ++i) {
484 buffer->insert(buffer->end(), buffers[i].addr, 478 buffer->insert(buffer->end(), buffers[i].addr,
485 buffers[i].addr + buffers[i].size); 479 buffers[i].addr + buffers[i].size);
486 } 480 }
487 write_buffer_->message_queue_.DiscardMessage(); 481 write_buffer_->message_queue_.DiscardMessage();
488 } 482 }
489 } 483 }
490 484
491 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { 485 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
492 write_lock_.AssertAcquired(); 486 write_lock_.AssertAcquired();
493 DCHECK(HandleForDebuggingNoLock().is_valid()); 487 DCHECK(HandleForDebuggingNoLock().is_valid());
494 write_buffer_->message_queue_.AddMessage(message.Pass()); 488 write_buffer_->message_queue_.AddMessage(message.Pass());
495 } 489 }
496 490
497 bool RawChannel::OnReadMessageForRawChannel( 491 bool RawChannel::OnReadMessageForRawChannel(
498 const MessageInTransit::View& message_view) { 492 const MessageInTransit::View& message_view) {
499 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { 493 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) {
494 pending_error_ = true;
500 message_loop_for_io_->PostTask( 495 message_loop_for_io_->PostTask(
501 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, 496 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError,
502 weak_ptr_factory_.GetWeakPtr(), 497 weak_ptr_factory_.GetWeakPtr(),
503 Delegate::ERROR_READ_SHUTDOWN)); 498 Delegate::ERROR_READ_SHUTDOWN));
504 return true; 499 return true;
505 } 500 }
506 501
507 // No non-implementation specific |RawChannel| control messages. 502 // No non-implementation specific |RawChannel| control messages.
508 LOG(ERROR) << "Invalid control message (type " << message_view.type() 503 LOG(ERROR) << "Invalid control message (type " << message_view.type()
509 << ")"; 504 << ")";
(...skipping 30 matching lines...) Expand all
540 FROM_HERE, 535 FROM_HERE,
541 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); 536 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
542 } 537 }
543 } 538 }
544 539
545 void RawChannel::LockAndCallOnError(Delegate::Error error) { 540 void RawChannel::LockAndCallOnError(Delegate::Error error) {
546 base::AutoLock locker(read_lock_); 541 base::AutoLock locker(read_lock_);
547 CallOnError(error); 542 CallOnError(error);
548 } 543 }
549 544
550 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, 545 bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result,
551 size_t platform_handles_written, 546 size_t platform_handles_written,
552 size_t bytes_written) { 547 size_t bytes_written) {
553 write_lock_.AssertAcquired(); 548 write_lock_.AssertAcquired();
554 549
555 DCHECK(!write_buffer_->message_queue_.IsEmpty()); 550 DCHECK(!write_buffer_->message_queue_.IsEmpty());
556 551
557 if (io_result == IO_SUCCEEDED) { 552 if (io_result == IO_SUCCEEDED) {
558 UpdateWriteBuffer(platform_handles_written, bytes_written); 553 UpdateWriteBuffer(platform_handles_written, bytes_written);
559 if (write_buffer_->message_queue_.IsEmpty()) 554 if (write_buffer_->message_queue_.IsEmpty())
560 return true; 555 return true;
561 556
562 // Schedule the next write. 557 // Schedule the next write.
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
673 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); 668 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage();
674 if (write_buffer_->data_offset_ >= message->total_size()) { 669 if (write_buffer_->data_offset_ >= message->total_size()) {
675 // Complete write. 670 // Complete write.
676 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); 671 CHECK_EQ(write_buffer_->data_offset_, message->total_size());
677 write_buffer_->message_queue_.DiscardMessage(); 672 write_buffer_->message_queue_.DiscardMessage();
678 write_buffer_->platform_handles_offset_ = 0; 673 write_buffer_->platform_handles_offset_ = 0;
679 write_buffer_->data_offset_ = 0; 674 write_buffer_->data_offset_ = 0;
680 } 675 }
681 } 676 }
682 677
678 void RawChannel::CallOnReadCompleted(IOResult io_result, size_t bytes_read) {
679 base::AutoLock locker(read_lock());
680 OnReadCompletedNoLock(io_result, bytes_read);
681 }
682
683 } // namespace edk 683 } // namespace edk
684 } // namespace mojo 684 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698