OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
| 7 #include <utility> |
| 8 |
7 #include "base/bind.h" | 9 #include "base/bind.h" |
8 #include "base/debug/stack_trace.h" | 10 #include "base/debug/stack_trace.h" |
9 #include "base/logging.h" | 11 #include "base/logging.h" |
10 #include "base/message_loop/message_loop.h" | 12 #include "base/message_loop/message_loop.h" |
11 #include "mojo/edk/embedder/embedder_internal.h" | 13 #include "mojo/edk/embedder/embedder_internal.h" |
12 #include "mojo/edk/embedder/platform_handle_utils.h" | 14 #include "mojo/edk/embedder/platform_handle_utils.h" |
13 #include "mojo/edk/embedder/platform_shared_buffer.h" | 15 #include "mojo/edk/embedder/platform_shared_buffer.h" |
14 #include "mojo/edk/embedder/platform_support.h" | 16 #include "mojo/edk/embedder/platform_support.h" |
15 #include "mojo/edk/system/broker.h" | 17 #include "mojo/edk/system/broker.h" |
16 #include "mojo/edk/system/configuration.h" | 18 #include "mojo/edk/system/configuration.h" |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
119 } | 121 } |
120 | 122 |
121 void MessagePipeDispatcher::Init( | 123 void MessagePipeDispatcher::Init( |
122 ScopedPlatformHandle message_pipe, | 124 ScopedPlatformHandle message_pipe, |
123 char* serialized_read_buffer, size_t serialized_read_buffer_size, | 125 char* serialized_read_buffer, size_t serialized_read_buffer_size, |
124 char* serialized_write_buffer, size_t serialized_write_buffer_size, | 126 char* serialized_write_buffer, size_t serialized_write_buffer_size, |
125 std::vector<int>* serialized_read_fds, | 127 std::vector<int>* serialized_read_fds, |
126 std::vector<int>* serialized_write_fds) { | 128 std::vector<int>* serialized_write_fds) { |
127 CHECK(transferable_); | 129 CHECK(transferable_); |
128 if (message_pipe.get().is_valid()) { | 130 if (message_pipe.get().is_valid()) { |
129 channel_ = RawChannel::Create(message_pipe.Pass()); | 131 channel_ = RawChannel::Create(std::move(message_pipe)); |
130 | 132 |
131 // TODO(jam): It's probably cleaner to pass this in Init call. | 133 // TODO(jam): It's probably cleaner to pass this in Init call. |
132 channel_->SetSerializedData( | 134 channel_->SetSerializedData( |
133 serialized_read_buffer, serialized_read_buffer_size, | 135 serialized_read_buffer, serialized_read_buffer_size, |
134 serialized_write_buffer, serialized_write_buffer_size, | 136 serialized_write_buffer, serialized_write_buffer_size, |
135 serialized_read_fds, serialized_write_fds); | 137 serialized_read_fds, serialized_write_fds); |
136 internal::g_io_thread_task_runner->PostTask( | 138 internal::g_io_thread_task_runner->PostTask( |
137 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | 139 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
138 } | 140 } |
139 } | 141 } |
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
263 char* serialized_read_buffer = nullptr; | 265 char* serialized_read_buffer = nullptr; |
264 size_t serialized_read_buffer_size = 0; | 266 size_t serialized_read_buffer_size = 0; |
265 char* serialized_write_buffer = nullptr; | 267 char* serialized_write_buffer = nullptr; |
266 size_t serialized_write_buffer_size = 0; | 268 size_t serialized_write_buffer_size = 0; |
267 char* message_queue_data = nullptr; | 269 char* message_queue_data = nullptr; |
268 size_t message_queue_size = 0; | 270 size_t message_queue_size = 0; |
269 scoped_refptr<PlatformSharedBuffer> shared_buffer; | 271 scoped_refptr<PlatformSharedBuffer> shared_buffer; |
270 scoped_ptr<PlatformSharedBufferMapping> mapping; | 272 scoped_ptr<PlatformSharedBufferMapping> mapping; |
271 if (shared_memory_handle.is_valid()) { | 273 if (shared_memory_handle.is_valid()) { |
272 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | 274 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
273 serialization->shared_memory_size, shared_memory_handle.Pass()); | 275 serialization->shared_memory_size, std::move(shared_memory_handle)); |
274 mapping = shared_buffer->Map(0, serialization->shared_memory_size); | 276 mapping = shared_buffer->Map(0, serialization->shared_memory_size); |
275 char* buffer = static_cast<char*>(mapping->GetBase()); | 277 char* buffer = static_cast<char*>(mapping->GetBase()); |
276 if (serialization->serialized_read_buffer_size) { | 278 if (serialization->serialized_read_buffer_size) { |
277 serialized_read_buffer = buffer; | 279 serialized_read_buffer = buffer; |
278 serialized_read_buffer_size = serialization->serialized_read_buffer_size; | 280 serialized_read_buffer_size = serialization->serialized_read_buffer_size; |
279 buffer += serialized_read_buffer_size; | 281 buffer += serialized_read_buffer_size; |
280 } | 282 } |
281 if (serialization->serialized_write_buffer_size) { | 283 if (serialization->serialized_write_buffer_size) { |
282 serialized_write_buffer = buffer; | 284 serialized_write_buffer = buffer; |
283 serialized_write_buffer_size = | 285 serialized_write_buffer_size = |
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
367 } | 369 } |
368 | 370 |
369 // TODO(jam): Copied below from RawChannelWin. See commment above | 371 // TODO(jam): Copied below from RawChannelWin. See commment above |
370 // GetReadPlatformHandles. | 372 // GetReadPlatformHandles. |
371 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 373 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
372 if (message_view.transport_data_buffer_size() > 0) { | 374 if (message_view.transport_data_buffer_size() > 0) { |
373 DCHECK(message_view.transport_data_buffer()); | 375 DCHECK(message_view.transport_data_buffer()); |
374 message->SetDispatchers(TransportData::DeserializeDispatchers( | 376 message->SetDispatchers(TransportData::DeserializeDispatchers( |
375 message_view.transport_data_buffer(), | 377 message_view.transport_data_buffer(), |
376 message_view.transport_data_buffer_size(), | 378 message_view.transport_data_buffer_size(), |
377 temp_platform_handles.Pass())); | 379 std::move(temp_platform_handles))); |
378 } | 380 } |
379 | 381 |
380 rv->message_queue_.AddMessage(message.Pass()); | 382 rv->message_queue_.AddMessage(std::move(message)); |
381 } | 383 } |
382 | 384 |
383 rv->Init(platform_handle.Pass(), | 385 rv->Init(std::move(platform_handle), serialized_read_buffer, |
384 serialized_read_buffer, | 386 serialized_read_buffer_size, serialized_write_buffer, |
385 serialized_read_buffer_size, | 387 serialized_write_buffer_size, &serialized_read_fds, |
386 serialized_write_buffer, | |
387 serialized_write_buffer_size, | |
388 &serialized_read_fds, | |
389 &serialized_write_fds); | 388 &serialized_write_fds); |
390 | 389 |
391 if (message_queue_size) { // Should be empty by now. | 390 if (message_queue_size) { // Should be empty by now. |
392 LOG(ERROR) << "Invalid queued messages"; | 391 LOG(ERROR) << "Invalid queued messages"; |
393 return nullptr; | 392 return nullptr; |
394 } | 393 } |
395 | 394 |
396 return rv; | 395 return rv; |
397 } | 396 } |
398 | 397 |
(...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
547 scoped_refptr<Dispatcher> | 546 scoped_refptr<Dispatcher> |
548 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 547 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
549 lock().AssertAcquired(); | 548 lock().AssertAcquired(); |
550 | 549 |
551 SerializeInternal(); | 550 SerializeInternal(); |
552 | 551 |
553 scoped_refptr<MessagePipeDispatcher> rv( | 552 scoped_refptr<MessagePipeDispatcher> rv( |
554 new MessagePipeDispatcher(transferable_)); | 553 new MessagePipeDispatcher(transferable_)); |
555 rv->serialized_ = true; | 554 rv->serialized_ = true; |
556 if (transferable_) { | 555 if (transferable_) { |
557 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); | 556 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); |
558 serialized_message_queue_.swap(rv->serialized_message_queue_); | 557 serialized_message_queue_.swap(rv->serialized_message_queue_); |
559 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | 558 serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
560 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | 559 serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
561 serialized_fds_.swap(rv->serialized_fds_); | 560 serialized_fds_.swap(rv->serialized_fds_); |
562 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | 561 rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
563 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | 562 rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
564 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | 563 rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
565 rv->write_error_ = write_error_; | 564 rv->write_error_ = write_error_; |
566 } else { | 565 } else { |
567 rv->pipe_id_ = pipe_id_; | 566 rv->pipe_id_ = pipe_id_; |
(...skipping 30 matching lines...) Expand all Loading... |
598 } | 597 } |
599 | 598 |
600 message->SerializeAndCloseDispatchers(); | 599 message->SerializeAndCloseDispatchers(); |
601 if (!transferable_) | 600 if (!transferable_) |
602 message->set_route_id(pipe_id_); | 601 message->set_route_id(pipe_id_); |
603 if (!transferable_ && | 602 if (!transferable_ && |
604 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || | 603 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
605 non_transferable_state_ == CONNECT_CALLED)) { | 604 non_transferable_state_ == CONNECT_CALLED)) { |
606 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | 605 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
607 RequestNontransferableChannel(); | 606 RequestNontransferableChannel(); |
608 non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); | 607 non_transferable_outgoing_message_queue_.AddMessage(std::move(message)); |
609 } else { | 608 } else { |
610 channel_->WriteMessage(message.Pass()); | 609 channel_->WriteMessage(std::move(message)); |
611 } | 610 } |
612 | 611 |
613 return MOJO_RESULT_OK; | 612 return MOJO_RESULT_OK; |
614 } | 613 } |
615 | 614 |
616 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | 615 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
617 void* bytes, | 616 void* bytes, |
618 uint32_t* num_bytes, | 617 uint32_t* num_bytes, |
619 DispatcherVector* dispatchers, | 618 DispatcherVector* dispatchers, |
620 uint32_t* num_dispatchers, | 619 uint32_t* num_dispatchers, |
(...skipping 228 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
849 } | 848 } |
850 | 849 |
851 void MessagePipeDispatcher::OnReadMessage( | 850 void MessagePipeDispatcher::OnReadMessage( |
852 const MessageInTransit::View& message_view, | 851 const MessageInTransit::View& message_view, |
853 ScopedPlatformHandleVectorPtr platform_handles) { | 852 ScopedPlatformHandleVectorPtr platform_handles) { |
854 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 853 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
855 if (message_view.transport_data_buffer_size() > 0) { | 854 if (message_view.transport_data_buffer_size() > 0) { |
856 DCHECK(message_view.transport_data_buffer()); | 855 DCHECK(message_view.transport_data_buffer()); |
857 message->SetDispatchers(TransportData::DeserializeDispatchers( | 856 message->SetDispatchers(TransportData::DeserializeDispatchers( |
858 message_view.transport_data_buffer(), | 857 message_view.transport_data_buffer(), |
859 message_view.transport_data_buffer_size(), platform_handles.Pass())); | 858 message_view.transport_data_buffer_size(), |
| 859 std::move(platform_handles))); |
860 } | 860 } |
861 | 861 |
862 if (started_transport_.Try()) { | 862 if (started_transport_.Try()) { |
863 // we're not in the middle of being sent | 863 // we're not in the middle of being sent |
864 | 864 |
865 // Can get synchronously called back in Init if there was initial data. | 865 // Can get synchronously called back in Init if there was initial data. |
866 scoped_ptr<base::AutoLock> locker; | 866 scoped_ptr<base::AutoLock> locker; |
867 if (!calling_init_) { | 867 if (!calling_init_) { |
868 locker.reset(new base::AutoLock(lock())); | 868 locker.reset(new base::AutoLock(lock())); |
869 } | 869 } |
870 | 870 |
871 bool was_empty = message_queue_.IsEmpty(); | 871 bool was_empty = message_queue_.IsEmpty(); |
872 message_queue_.AddMessage(message.Pass()); | 872 message_queue_.AddMessage(std::move(message)); |
873 if (was_empty) | 873 if (was_empty) |
874 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 874 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
875 | 875 |
876 started_transport_.Release(); | 876 started_transport_.Release(); |
877 } else { | 877 } else { |
878 // If RawChannel is calling OnRead, that means it has its read_lock_ | 878 // If RawChannel is calling OnRead, that means it has its read_lock_ |
879 // acquired. That means StartSerialize can't be accessing message queue as | 879 // acquired. That means StartSerialize can't be accessing message queue as |
880 // it waits on ReleaseHandle first which acquires readlock_. | 880 // it waits on ReleaseHandle first which acquires readlock_. |
881 message_queue_.AddMessage(message.Pass()); | 881 message_queue_.AddMessage(std::move(message)); |
882 } | 882 } |
883 } | 883 } |
884 | 884 |
885 void MessagePipeDispatcher::OnError(Error error) { | 885 void MessagePipeDispatcher::OnError(Error error) { |
886 // If there's a read error, then the other side of the pipe is closed. By | 886 // If there's a read error, then the other side of the pipe is closed. By |
887 // definition, we can't write since there's no one to read it. And we can't | 887 // definition, we can't write since there's no one to read it. And we can't |
888 // read anymore, since we just got a read erorr. So we close the pipe. | 888 // read anymore, since we just got a read erorr. So we close the pipe. |
889 // If there's a write error, then we stop writing. But we keep the pipe open | 889 // If there's a write error, then we stop writing. But we keep the pipe open |
890 // until we finish reading everything in it. This is because it's valid for | 890 // until we finish reading everything in it. This is because it's valid for |
891 // one endpoint to write some data and close their pipe immediately. Even | 891 // one endpoint to write some data and close their pipe immediately. Even |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
974 dispatchers->reserve(transports->size()); | 974 dispatchers->reserve(transports->size()); |
975 for (size_t i = 0; i < transports->size(); i++) { | 975 for (size_t i = 0; i < transports->size(); i++) { |
976 if ((*transports)[i].is_valid()) { | 976 if ((*transports)[i].is_valid()) { |
977 dispatchers->push_back( | 977 dispatchers->push_back( |
978 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 978 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
979 } else { | 979 } else { |
980 LOG(WARNING) << "Enqueueing null dispatcher"; | 980 LOG(WARNING) << "Enqueueing null dispatcher"; |
981 dispatchers->push_back(nullptr); | 981 dispatchers->push_back(nullptr); |
982 } | 982 } |
983 } | 983 } |
984 message->SetDispatchers(dispatchers.Pass()); | 984 message->SetDispatchers(std::move(dispatchers)); |
985 return MOJO_RESULT_OK; | 985 return MOJO_RESULT_OK; |
986 } | 986 } |
987 | 987 |
988 void MessagePipeDispatcher::RequestNontransferableChannel() { | 988 void MessagePipeDispatcher::RequestNontransferableChannel() { |
989 lock().AssertAcquired(); | 989 lock().AssertAcquired(); |
990 CHECK(!transferable_); | 990 CHECK(!transferable_); |
991 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); | 991 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
992 non_transferable_state_ = CONNECT_CALLED; | 992 non_transferable_state_ = CONNECT_CALLED; |
993 #if !defined(OFFICIAL_BUILD) | 993 #if !defined(OFFICIAL_BUILD) |
994 non_transferable_bound_stack_.reset(new base::debug::StackTrace); | 994 non_transferable_bound_stack_.reset(new base::debug::StackTrace); |
995 #endif | 995 #endif |
996 | 996 |
997 // PostTask since the broker can call us back synchronously. | 997 // PostTask since the broker can call us back synchronously. |
998 internal::g_io_thread_task_runner->PostTask( | 998 internal::g_io_thread_task_runner->PostTask( |
999 FROM_HERE, | 999 FROM_HERE, |
1000 base::Bind(&Broker::ConnectMessagePipe, | 1000 base::Bind(&Broker::ConnectMessagePipe, |
1001 base::Unretained(internal::g_broker), pipe_id_, | 1001 base::Unretained(internal::g_broker), pipe_id_, |
1002 base::Unretained(this))); | 1002 base::Unretained(this))); |
1003 } | 1003 } |
1004 | 1004 |
1005 } // namespace edk | 1005 } // namespace edk |
1006 } // namespace mojo | 1006 } // namespace mojo |
OLD | NEW |