Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1529303004: Convert Pass()→std::move() in mojo/edk/ (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_in_transit.cc ('k') | mojo/edk/system/message_pipe_perftest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <utility>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "base/debug/stack_trace.h" 10 #include "base/debug/stack_trace.h"
9 #include "base/logging.h" 11 #include "base/logging.h"
10 #include "base/message_loop/message_loop.h" 12 #include "base/message_loop/message_loop.h"
11 #include "mojo/edk/embedder/embedder_internal.h" 13 #include "mojo/edk/embedder/embedder_internal.h"
12 #include "mojo/edk/embedder/platform_handle_utils.h" 14 #include "mojo/edk/embedder/platform_handle_utils.h"
13 #include "mojo/edk/embedder/platform_shared_buffer.h" 15 #include "mojo/edk/embedder/platform_shared_buffer.h"
14 #include "mojo/edk/embedder/platform_support.h" 16 #include "mojo/edk/embedder/platform_support.h"
15 #include "mojo/edk/system/broker.h" 17 #include "mojo/edk/system/broker.h"
16 #include "mojo/edk/system/configuration.h" 18 #include "mojo/edk/system/configuration.h"
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
119 } 121 }
120 122
121 void MessagePipeDispatcher::Init( 123 void MessagePipeDispatcher::Init(
122 ScopedPlatformHandle message_pipe, 124 ScopedPlatformHandle message_pipe,
123 char* serialized_read_buffer, size_t serialized_read_buffer_size, 125 char* serialized_read_buffer, size_t serialized_read_buffer_size,
124 char* serialized_write_buffer, size_t serialized_write_buffer_size, 126 char* serialized_write_buffer, size_t serialized_write_buffer_size,
125 std::vector<int>* serialized_read_fds, 127 std::vector<int>* serialized_read_fds,
126 std::vector<int>* serialized_write_fds) { 128 std::vector<int>* serialized_write_fds) {
127 CHECK(transferable_); 129 CHECK(transferable_);
128 if (message_pipe.get().is_valid()) { 130 if (message_pipe.get().is_valid()) {
129 channel_ = RawChannel::Create(message_pipe.Pass()); 131 channel_ = RawChannel::Create(std::move(message_pipe));
130 132
131 // TODO(jam): It's probably cleaner to pass this in Init call. 133 // TODO(jam): It's probably cleaner to pass this in Init call.
132 channel_->SetSerializedData( 134 channel_->SetSerializedData(
133 serialized_read_buffer, serialized_read_buffer_size, 135 serialized_read_buffer, serialized_read_buffer_size,
134 serialized_write_buffer, serialized_write_buffer_size, 136 serialized_write_buffer, serialized_write_buffer_size,
135 serialized_read_fds, serialized_write_fds); 137 serialized_read_fds, serialized_write_fds);
136 internal::g_io_thread_task_runner->PostTask( 138 internal::g_io_thread_task_runner->PostTask(
137 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); 139 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this));
138 } 140 }
139 } 141 }
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
263 char* serialized_read_buffer = nullptr; 265 char* serialized_read_buffer = nullptr;
264 size_t serialized_read_buffer_size = 0; 266 size_t serialized_read_buffer_size = 0;
265 char* serialized_write_buffer = nullptr; 267 char* serialized_write_buffer = nullptr;
266 size_t serialized_write_buffer_size = 0; 268 size_t serialized_write_buffer_size = 0;
267 char* message_queue_data = nullptr; 269 char* message_queue_data = nullptr;
268 size_t message_queue_size = 0; 270 size_t message_queue_size = 0;
269 scoped_refptr<PlatformSharedBuffer> shared_buffer; 271 scoped_refptr<PlatformSharedBuffer> shared_buffer;
270 scoped_ptr<PlatformSharedBufferMapping> mapping; 272 scoped_ptr<PlatformSharedBufferMapping> mapping;
271 if (shared_memory_handle.is_valid()) { 273 if (shared_memory_handle.is_valid()) {
272 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( 274 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
273 serialization->shared_memory_size, shared_memory_handle.Pass()); 275 serialization->shared_memory_size, std::move(shared_memory_handle));
274 mapping = shared_buffer->Map(0, serialization->shared_memory_size); 276 mapping = shared_buffer->Map(0, serialization->shared_memory_size);
275 char* buffer = static_cast<char*>(mapping->GetBase()); 277 char* buffer = static_cast<char*>(mapping->GetBase());
276 if (serialization->serialized_read_buffer_size) { 278 if (serialization->serialized_read_buffer_size) {
277 serialized_read_buffer = buffer; 279 serialized_read_buffer = buffer;
278 serialized_read_buffer_size = serialization->serialized_read_buffer_size; 280 serialized_read_buffer_size = serialization->serialized_read_buffer_size;
279 buffer += serialized_read_buffer_size; 281 buffer += serialized_read_buffer_size;
280 } 282 }
281 if (serialization->serialized_write_buffer_size) { 283 if (serialization->serialized_write_buffer_size) {
282 serialized_write_buffer = buffer; 284 serialized_write_buffer = buffer;
283 serialized_write_buffer_size = 285 serialized_write_buffer_size =
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
367 } 369 }
368 370
369 // TODO(jam): Copied below from RawChannelWin. See commment above 371 // TODO(jam): Copied below from RawChannelWin. See commment above
370 // GetReadPlatformHandles. 372 // GetReadPlatformHandles.
371 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 373 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
372 if (message_view.transport_data_buffer_size() > 0) { 374 if (message_view.transport_data_buffer_size() > 0) {
373 DCHECK(message_view.transport_data_buffer()); 375 DCHECK(message_view.transport_data_buffer());
374 message->SetDispatchers(TransportData::DeserializeDispatchers( 376 message->SetDispatchers(TransportData::DeserializeDispatchers(
375 message_view.transport_data_buffer(), 377 message_view.transport_data_buffer(),
376 message_view.transport_data_buffer_size(), 378 message_view.transport_data_buffer_size(),
377 temp_platform_handles.Pass())); 379 std::move(temp_platform_handles)));
378 } 380 }
379 381
380 rv->message_queue_.AddMessage(message.Pass()); 382 rv->message_queue_.AddMessage(std::move(message));
381 } 383 }
382 384
383 rv->Init(platform_handle.Pass(), 385 rv->Init(std::move(platform_handle), serialized_read_buffer,
384 serialized_read_buffer, 386 serialized_read_buffer_size, serialized_write_buffer,
385 serialized_read_buffer_size, 387 serialized_write_buffer_size, &serialized_read_fds,
386 serialized_write_buffer,
387 serialized_write_buffer_size,
388 &serialized_read_fds,
389 &serialized_write_fds); 388 &serialized_write_fds);
390 389
391 if (message_queue_size) { // Should be empty by now. 390 if (message_queue_size) { // Should be empty by now.
392 LOG(ERROR) << "Invalid queued messages"; 391 LOG(ERROR) << "Invalid queued messages";
393 return nullptr; 392 return nullptr;
394 } 393 }
395 394
396 return rv; 395 return rv;
397 } 396 }
398 397
(...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after
547 scoped_refptr<Dispatcher> 546 scoped_refptr<Dispatcher>
548 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 547 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
549 lock().AssertAcquired(); 548 lock().AssertAcquired();
550 549
551 SerializeInternal(); 550 SerializeInternal();
552 551
553 scoped_refptr<MessagePipeDispatcher> rv( 552 scoped_refptr<MessagePipeDispatcher> rv(
554 new MessagePipeDispatcher(transferable_)); 553 new MessagePipeDispatcher(transferable_));
555 rv->serialized_ = true; 554 rv->serialized_ = true;
556 if (transferable_) { 555 if (transferable_) {
557 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); 556 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
558 serialized_message_queue_.swap(rv->serialized_message_queue_); 557 serialized_message_queue_.swap(rv->serialized_message_queue_);
559 serialized_read_buffer_.swap(rv->serialized_read_buffer_); 558 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
560 serialized_write_buffer_.swap(rv->serialized_write_buffer_); 559 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
561 serialized_fds_.swap(rv->serialized_fds_); 560 serialized_fds_.swap(rv->serialized_fds_);
562 rv->serialized_read_fds_length_ = serialized_read_fds_length_; 561 rv->serialized_read_fds_length_ = serialized_read_fds_length_;
563 rv->serialized_write_fds_length_ = serialized_write_fds_length_; 562 rv->serialized_write_fds_length_ = serialized_write_fds_length_;
564 rv->serialized_message_fds_length_ = serialized_message_fds_length_; 563 rv->serialized_message_fds_length_ = serialized_message_fds_length_;
565 rv->write_error_ = write_error_; 564 rv->write_error_ = write_error_;
566 } else { 565 } else {
567 rv->pipe_id_ = pipe_id_; 566 rv->pipe_id_ = pipe_id_;
(...skipping 30 matching lines...) Expand all
598 } 597 }
599 598
600 message->SerializeAndCloseDispatchers(); 599 message->SerializeAndCloseDispatchers();
601 if (!transferable_) 600 if (!transferable_)
602 message->set_route_id(pipe_id_); 601 message->set_route_id(pipe_id_);
603 if (!transferable_ && 602 if (!transferable_ &&
604 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || 603 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
605 non_transferable_state_ == CONNECT_CALLED)) { 604 non_transferable_state_ == CONNECT_CALLED)) {
606 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) 605 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
607 RequestNontransferableChannel(); 606 RequestNontransferableChannel();
608 non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); 607 non_transferable_outgoing_message_queue_.AddMessage(std::move(message));
609 } else { 608 } else {
610 channel_->WriteMessage(message.Pass()); 609 channel_->WriteMessage(std::move(message));
611 } 610 }
612 611
613 return MOJO_RESULT_OK; 612 return MOJO_RESULT_OK;
614 } 613 }
615 614
616 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( 615 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
617 void* bytes, 616 void* bytes,
618 uint32_t* num_bytes, 617 uint32_t* num_bytes,
619 DispatcherVector* dispatchers, 618 DispatcherVector* dispatchers,
620 uint32_t* num_dispatchers, 619 uint32_t* num_dispatchers,
(...skipping 228 matching lines...) Expand 10 before | Expand all | Expand 10 after
849 } 848 }
850 849
851 void MessagePipeDispatcher::OnReadMessage( 850 void MessagePipeDispatcher::OnReadMessage(
852 const MessageInTransit::View& message_view, 851 const MessageInTransit::View& message_view,
853 ScopedPlatformHandleVectorPtr platform_handles) { 852 ScopedPlatformHandleVectorPtr platform_handles) {
854 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 853 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
855 if (message_view.transport_data_buffer_size() > 0) { 854 if (message_view.transport_data_buffer_size() > 0) {
856 DCHECK(message_view.transport_data_buffer()); 855 DCHECK(message_view.transport_data_buffer());
857 message->SetDispatchers(TransportData::DeserializeDispatchers( 856 message->SetDispatchers(TransportData::DeserializeDispatchers(
858 message_view.transport_data_buffer(), 857 message_view.transport_data_buffer(),
859 message_view.transport_data_buffer_size(), platform_handles.Pass())); 858 message_view.transport_data_buffer_size(),
859 std::move(platform_handles)));
860 } 860 }
861 861
862 if (started_transport_.Try()) { 862 if (started_transport_.Try()) {
863 // we're not in the middle of being sent 863 // we're not in the middle of being sent
864 864
865 // Can get synchronously called back in Init if there was initial data. 865 // Can get synchronously called back in Init if there was initial data.
866 scoped_ptr<base::AutoLock> locker; 866 scoped_ptr<base::AutoLock> locker;
867 if (!calling_init_) { 867 if (!calling_init_) {
868 locker.reset(new base::AutoLock(lock())); 868 locker.reset(new base::AutoLock(lock()));
869 } 869 }
870 870
871 bool was_empty = message_queue_.IsEmpty(); 871 bool was_empty = message_queue_.IsEmpty();
872 message_queue_.AddMessage(message.Pass()); 872 message_queue_.AddMessage(std::move(message));
873 if (was_empty) 873 if (was_empty)
874 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 874 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
875 875
876 started_transport_.Release(); 876 started_transport_.Release();
877 } else { 877 } else {
878 // If RawChannel is calling OnRead, that means it has its read_lock_ 878 // If RawChannel is calling OnRead, that means it has its read_lock_
879 // acquired. That means StartSerialize can't be accessing message queue as 879 // acquired. That means StartSerialize can't be accessing message queue as
880 // it waits on ReleaseHandle first which acquires readlock_. 880 // it waits on ReleaseHandle first which acquires readlock_.
881 message_queue_.AddMessage(message.Pass()); 881 message_queue_.AddMessage(std::move(message));
882 } 882 }
883 } 883 }
884 884
885 void MessagePipeDispatcher::OnError(Error error) { 885 void MessagePipeDispatcher::OnError(Error error) {
886 // If there's a read error, then the other side of the pipe is closed. By 886 // If there's a read error, then the other side of the pipe is closed. By
887 // definition, we can't write since there's no one to read it. And we can't 887 // definition, we can't write since there's no one to read it. And we can't
888 // read anymore, since we just got a read erorr. So we close the pipe. 888 // read anymore, since we just got a read erorr. So we close the pipe.
889 // If there's a write error, then we stop writing. But we keep the pipe open 889 // If there's a write error, then we stop writing. But we keep the pipe open
890 // until we finish reading everything in it. This is because it's valid for 890 // until we finish reading everything in it. This is because it's valid for
891 // one endpoint to write some data and close their pipe immediately. Even 891 // one endpoint to write some data and close their pipe immediately. Even
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
974 dispatchers->reserve(transports->size()); 974 dispatchers->reserve(transports->size());
975 for (size_t i = 0; i < transports->size(); i++) { 975 for (size_t i = 0; i < transports->size(); i++) {
976 if ((*transports)[i].is_valid()) { 976 if ((*transports)[i].is_valid()) {
977 dispatchers->push_back( 977 dispatchers->push_back(
978 (*transports)[i].CreateEquivalentDispatcherAndClose()); 978 (*transports)[i].CreateEquivalentDispatcherAndClose());
979 } else { 979 } else {
980 LOG(WARNING) << "Enqueueing null dispatcher"; 980 LOG(WARNING) << "Enqueueing null dispatcher";
981 dispatchers->push_back(nullptr); 981 dispatchers->push_back(nullptr);
982 } 982 }
983 } 983 }
984 message->SetDispatchers(dispatchers.Pass()); 984 message->SetDispatchers(std::move(dispatchers));
985 return MOJO_RESULT_OK; 985 return MOJO_RESULT_OK;
986 } 986 }
987 987
988 void MessagePipeDispatcher::RequestNontransferableChannel() { 988 void MessagePipeDispatcher::RequestNontransferableChannel() {
989 lock().AssertAcquired(); 989 lock().AssertAcquired();
990 CHECK(!transferable_); 990 CHECK(!transferable_);
991 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); 991 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
992 non_transferable_state_ = CONNECT_CALLED; 992 non_transferable_state_ = CONNECT_CALLED;
993 #if !defined(OFFICIAL_BUILD) 993 #if !defined(OFFICIAL_BUILD)
994 non_transferable_bound_stack_.reset(new base::debug::StackTrace); 994 non_transferable_bound_stack_.reset(new base::debug::StackTrace);
995 #endif 995 #endif
996 996
997 // PostTask since the broker can call us back synchronously. 997 // PostTask since the broker can call us back synchronously.
998 internal::g_io_thread_task_runner->PostTask( 998 internal::g_io_thread_task_runner->PostTask(
999 FROM_HERE, 999 FROM_HERE,
1000 base::Bind(&Broker::ConnectMessagePipe, 1000 base::Bind(&Broker::ConnectMessagePipe,
1001 base::Unretained(internal::g_broker), pipe_id_, 1001 base::Unretained(internal::g_broker), pipe_id_,
1002 base::Unretained(this))); 1002 base::Unretained(this)));
1003 } 1003 }
1004 1004
1005 } // namespace edk 1005 } // namespace edk
1006 } // namespace mojo 1006 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_in_transit.cc ('k') | mojo/edk/system/message_pipe_perftest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698