| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include <utility> |
| 8 |
| 7 #include "base/bind.h" | 9 #include "base/bind.h" |
| 8 #include "base/debug/stack_trace.h" | 10 #include "base/debug/stack_trace.h" |
| 9 #include "base/logging.h" | 11 #include "base/logging.h" |
| 10 #include "base/message_loop/message_loop.h" | 12 #include "base/message_loop/message_loop.h" |
| 11 #include "mojo/edk/embedder/embedder_internal.h" | 13 #include "mojo/edk/embedder/embedder_internal.h" |
| 12 #include "mojo/edk/embedder/platform_handle_utils.h" | 14 #include "mojo/edk/embedder/platform_handle_utils.h" |
| 13 #include "mojo/edk/embedder/platform_shared_buffer.h" | 15 #include "mojo/edk/embedder/platform_shared_buffer.h" |
| 14 #include "mojo/edk/embedder/platform_support.h" | 16 #include "mojo/edk/embedder/platform_support.h" |
| 15 #include "mojo/edk/system/broker.h" | 17 #include "mojo/edk/system/broker.h" |
| 16 #include "mojo/edk/system/configuration.h" | 18 #include "mojo/edk/system/configuration.h" |
| (...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 119 } | 121 } |
| 120 | 122 |
| 121 void MessagePipeDispatcher::Init( | 123 void MessagePipeDispatcher::Init( |
| 122 ScopedPlatformHandle message_pipe, | 124 ScopedPlatformHandle message_pipe, |
| 123 char* serialized_read_buffer, size_t serialized_read_buffer_size, | 125 char* serialized_read_buffer, size_t serialized_read_buffer_size, |
| 124 char* serialized_write_buffer, size_t serialized_write_buffer_size, | 126 char* serialized_write_buffer, size_t serialized_write_buffer_size, |
| 125 std::vector<int>* serialized_read_fds, | 127 std::vector<int>* serialized_read_fds, |
| 126 std::vector<int>* serialized_write_fds) { | 128 std::vector<int>* serialized_write_fds) { |
| 127 CHECK(transferable_); | 129 CHECK(transferable_); |
| 128 if (message_pipe.get().is_valid()) { | 130 if (message_pipe.get().is_valid()) { |
| 129 channel_ = RawChannel::Create(message_pipe.Pass()); | 131 channel_ = RawChannel::Create(std::move(message_pipe)); |
| 130 | 132 |
| 131 // TODO(jam): It's probably cleaner to pass this in Init call. | 133 // TODO(jam): It's probably cleaner to pass this in Init call. |
| 132 channel_->SetSerializedData( | 134 channel_->SetSerializedData( |
| 133 serialized_read_buffer, serialized_read_buffer_size, | 135 serialized_read_buffer, serialized_read_buffer_size, |
| 134 serialized_write_buffer, serialized_write_buffer_size, | 136 serialized_write_buffer, serialized_write_buffer_size, |
| 135 serialized_read_fds, serialized_write_fds); | 137 serialized_read_fds, serialized_write_fds); |
| 136 internal::g_io_thread_task_runner->PostTask( | 138 internal::g_io_thread_task_runner->PostTask( |
| 137 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | 139 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
| 138 } | 140 } |
| 139 } | 141 } |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 263 char* serialized_read_buffer = nullptr; | 265 char* serialized_read_buffer = nullptr; |
| 264 size_t serialized_read_buffer_size = 0; | 266 size_t serialized_read_buffer_size = 0; |
| 265 char* serialized_write_buffer = nullptr; | 267 char* serialized_write_buffer = nullptr; |
| 266 size_t serialized_write_buffer_size = 0; | 268 size_t serialized_write_buffer_size = 0; |
| 267 char* message_queue_data = nullptr; | 269 char* message_queue_data = nullptr; |
| 268 size_t message_queue_size = 0; | 270 size_t message_queue_size = 0; |
| 269 scoped_refptr<PlatformSharedBuffer> shared_buffer; | 271 scoped_refptr<PlatformSharedBuffer> shared_buffer; |
| 270 scoped_ptr<PlatformSharedBufferMapping> mapping; | 272 scoped_ptr<PlatformSharedBufferMapping> mapping; |
| 271 if (shared_memory_handle.is_valid()) { | 273 if (shared_memory_handle.is_valid()) { |
| 272 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | 274 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
| 273 serialization->shared_memory_size, shared_memory_handle.Pass()); | 275 serialization->shared_memory_size, std::move(shared_memory_handle)); |
| 274 mapping = shared_buffer->Map(0, serialization->shared_memory_size); | 276 mapping = shared_buffer->Map(0, serialization->shared_memory_size); |
| 275 char* buffer = static_cast<char*>(mapping->GetBase()); | 277 char* buffer = static_cast<char*>(mapping->GetBase()); |
| 276 if (serialization->serialized_read_buffer_size) { | 278 if (serialization->serialized_read_buffer_size) { |
| 277 serialized_read_buffer = buffer; | 279 serialized_read_buffer = buffer; |
| 278 serialized_read_buffer_size = serialization->serialized_read_buffer_size; | 280 serialized_read_buffer_size = serialization->serialized_read_buffer_size; |
| 279 buffer += serialized_read_buffer_size; | 281 buffer += serialized_read_buffer_size; |
| 280 } | 282 } |
| 281 if (serialization->serialized_write_buffer_size) { | 283 if (serialization->serialized_write_buffer_size) { |
| 282 serialized_write_buffer = buffer; | 284 serialized_write_buffer = buffer; |
| 283 serialized_write_buffer_size = | 285 serialized_write_buffer_size = |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 367 } | 369 } |
| 368 | 370 |
| 369 // TODO(jam): Copied below from RawChannelWin. See commment above | 371 // TODO(jam): Copied below from RawChannelWin. See commment above |
| 370 // GetReadPlatformHandles. | 372 // GetReadPlatformHandles. |
| 371 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 373 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
| 372 if (message_view.transport_data_buffer_size() > 0) { | 374 if (message_view.transport_data_buffer_size() > 0) { |
| 373 DCHECK(message_view.transport_data_buffer()); | 375 DCHECK(message_view.transport_data_buffer()); |
| 374 message->SetDispatchers(TransportData::DeserializeDispatchers( | 376 message->SetDispatchers(TransportData::DeserializeDispatchers( |
| 375 message_view.transport_data_buffer(), | 377 message_view.transport_data_buffer(), |
| 376 message_view.transport_data_buffer_size(), | 378 message_view.transport_data_buffer_size(), |
| 377 temp_platform_handles.Pass())); | 379 std::move(temp_platform_handles))); |
| 378 } | 380 } |
| 379 | 381 |
| 380 rv->message_queue_.AddMessage(message.Pass()); | 382 rv->message_queue_.AddMessage(std::move(message)); |
| 381 } | 383 } |
| 382 | 384 |
| 383 rv->Init(platform_handle.Pass(), | 385 rv->Init(std::move(platform_handle), serialized_read_buffer, |
| 384 serialized_read_buffer, | 386 serialized_read_buffer_size, serialized_write_buffer, |
| 385 serialized_read_buffer_size, | 387 serialized_write_buffer_size, &serialized_read_fds, |
| 386 serialized_write_buffer, | |
| 387 serialized_write_buffer_size, | |
| 388 &serialized_read_fds, | |
| 389 &serialized_write_fds); | 388 &serialized_write_fds); |
| 390 | 389 |
| 391 if (message_queue_size) { // Should be empty by now. | 390 if (message_queue_size) { // Should be empty by now. |
| 392 LOG(ERROR) << "Invalid queued messages"; | 391 LOG(ERROR) << "Invalid queued messages"; |
| 393 return nullptr; | 392 return nullptr; |
| 394 } | 393 } |
| 395 | 394 |
| 396 return rv; | 395 return rv; |
| 397 } | 396 } |
| 398 | 397 |
| (...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 547 scoped_refptr<Dispatcher> | 546 scoped_refptr<Dispatcher> |
| 548 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 547 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| 549 lock().AssertAcquired(); | 548 lock().AssertAcquired(); |
| 550 | 549 |
| 551 SerializeInternal(); | 550 SerializeInternal(); |
| 552 | 551 |
| 553 scoped_refptr<MessagePipeDispatcher> rv( | 552 scoped_refptr<MessagePipeDispatcher> rv( |
| 554 new MessagePipeDispatcher(transferable_)); | 553 new MessagePipeDispatcher(transferable_)); |
| 555 rv->serialized_ = true; | 554 rv->serialized_ = true; |
| 556 if (transferable_) { | 555 if (transferable_) { |
| 557 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); | 556 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); |
| 558 serialized_message_queue_.swap(rv->serialized_message_queue_); | 557 serialized_message_queue_.swap(rv->serialized_message_queue_); |
| 559 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | 558 serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| 560 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | 559 serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
| 561 serialized_fds_.swap(rv->serialized_fds_); | 560 serialized_fds_.swap(rv->serialized_fds_); |
| 562 rv->serialized_read_fds_length_ = serialized_read_fds_length_; | 561 rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
| 563 rv->serialized_write_fds_length_ = serialized_write_fds_length_; | 562 rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
| 564 rv->serialized_message_fds_length_ = serialized_message_fds_length_; | 563 rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
| 565 rv->write_error_ = write_error_; | 564 rv->write_error_ = write_error_; |
| 566 } else { | 565 } else { |
| 567 rv->pipe_id_ = pipe_id_; | 566 rv->pipe_id_ = pipe_id_; |
| (...skipping 30 matching lines...) Expand all Loading... |
| 598 } | 597 } |
| 599 | 598 |
| 600 message->SerializeAndCloseDispatchers(); | 599 message->SerializeAndCloseDispatchers(); |
| 601 if (!transferable_) | 600 if (!transferable_) |
| 602 message->set_route_id(pipe_id_); | 601 message->set_route_id(pipe_id_); |
| 603 if (!transferable_ && | 602 if (!transferable_ && |
| 604 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || | 603 (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
| 605 non_transferable_state_ == CONNECT_CALLED)) { | 604 non_transferable_state_ == CONNECT_CALLED)) { |
| 606 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | 605 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 607 RequestNontransferableChannel(); | 606 RequestNontransferableChannel(); |
| 608 non_transferable_outgoing_message_queue_.AddMessage(message.Pass()); | 607 non_transferable_outgoing_message_queue_.AddMessage(std::move(message)); |
| 609 } else { | 608 } else { |
| 610 channel_->WriteMessage(message.Pass()); | 609 channel_->WriteMessage(std::move(message)); |
| 611 } | 610 } |
| 612 | 611 |
| 613 return MOJO_RESULT_OK; | 612 return MOJO_RESULT_OK; |
| 614 } | 613 } |
| 615 | 614 |
| 616 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | 615 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
| 617 void* bytes, | 616 void* bytes, |
| 618 uint32_t* num_bytes, | 617 uint32_t* num_bytes, |
| 619 DispatcherVector* dispatchers, | 618 DispatcherVector* dispatchers, |
| 620 uint32_t* num_dispatchers, | 619 uint32_t* num_dispatchers, |
| (...skipping 228 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 849 } | 848 } |
| 850 | 849 |
| 851 void MessagePipeDispatcher::OnReadMessage( | 850 void MessagePipeDispatcher::OnReadMessage( |
| 852 const MessageInTransit::View& message_view, | 851 const MessageInTransit::View& message_view, |
| 853 ScopedPlatformHandleVectorPtr platform_handles) { | 852 ScopedPlatformHandleVectorPtr platform_handles) { |
| 854 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 853 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
| 855 if (message_view.transport_data_buffer_size() > 0) { | 854 if (message_view.transport_data_buffer_size() > 0) { |
| 856 DCHECK(message_view.transport_data_buffer()); | 855 DCHECK(message_view.transport_data_buffer()); |
| 857 message->SetDispatchers(TransportData::DeserializeDispatchers( | 856 message->SetDispatchers(TransportData::DeserializeDispatchers( |
| 858 message_view.transport_data_buffer(), | 857 message_view.transport_data_buffer(), |
| 859 message_view.transport_data_buffer_size(), platform_handles.Pass())); | 858 message_view.transport_data_buffer_size(), |
| 859 std::move(platform_handles))); |
| 860 } | 860 } |
| 861 | 861 |
| 862 if (started_transport_.Try()) { | 862 if (started_transport_.Try()) { |
| 863 // we're not in the middle of being sent | 863 // we're not in the middle of being sent |
| 864 | 864 |
| 865 // Can get synchronously called back in Init if there was initial data. | 865 // Can get synchronously called back in Init if there was initial data. |
| 866 scoped_ptr<base::AutoLock> locker; | 866 scoped_ptr<base::AutoLock> locker; |
| 867 if (!calling_init_) { | 867 if (!calling_init_) { |
| 868 locker.reset(new base::AutoLock(lock())); | 868 locker.reset(new base::AutoLock(lock())); |
| 869 } | 869 } |
| 870 | 870 |
| 871 bool was_empty = message_queue_.IsEmpty(); | 871 bool was_empty = message_queue_.IsEmpty(); |
| 872 message_queue_.AddMessage(message.Pass()); | 872 message_queue_.AddMessage(std::move(message)); |
| 873 if (was_empty) | 873 if (was_empty) |
| 874 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 874 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 875 | 875 |
| 876 started_transport_.Release(); | 876 started_transport_.Release(); |
| 877 } else { | 877 } else { |
| 878 // If RawChannel is calling OnRead, that means it has its read_lock_ | 878 // If RawChannel is calling OnRead, that means it has its read_lock_ |
| 879 // acquired. That means StartSerialize can't be accessing message queue as | 879 // acquired. That means StartSerialize can't be accessing message queue as |
| 880 // it waits on ReleaseHandle first which acquires readlock_. | 880 // it waits on ReleaseHandle first which acquires readlock_. |
| 881 message_queue_.AddMessage(message.Pass()); | 881 message_queue_.AddMessage(std::move(message)); |
| 882 } | 882 } |
| 883 } | 883 } |
| 884 | 884 |
| 885 void MessagePipeDispatcher::OnError(Error error) { | 885 void MessagePipeDispatcher::OnError(Error error) { |
| 886 // If there's a read error, then the other side of the pipe is closed. By | 886 // If there's a read error, then the other side of the pipe is closed. By |
| 887 // definition, we can't write since there's no one to read it. And we can't | 887 // definition, we can't write since there's no one to read it. And we can't |
| 888 // read anymore, since we just got a read erorr. So we close the pipe. | 888 // read anymore, since we just got a read erorr. So we close the pipe. |
| 889 // If there's a write error, then we stop writing. But we keep the pipe open | 889 // If there's a write error, then we stop writing. But we keep the pipe open |
| 890 // until we finish reading everything in it. This is because it's valid for | 890 // until we finish reading everything in it. This is because it's valid for |
| 891 // one endpoint to write some data and close their pipe immediately. Even | 891 // one endpoint to write some data and close their pipe immediately. Even |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 974 dispatchers->reserve(transports->size()); | 974 dispatchers->reserve(transports->size()); |
| 975 for (size_t i = 0; i < transports->size(); i++) { | 975 for (size_t i = 0; i < transports->size(); i++) { |
| 976 if ((*transports)[i].is_valid()) { | 976 if ((*transports)[i].is_valid()) { |
| 977 dispatchers->push_back( | 977 dispatchers->push_back( |
| 978 (*transports)[i].CreateEquivalentDispatcherAndClose()); | 978 (*transports)[i].CreateEquivalentDispatcherAndClose()); |
| 979 } else { | 979 } else { |
| 980 LOG(WARNING) << "Enqueueing null dispatcher"; | 980 LOG(WARNING) << "Enqueueing null dispatcher"; |
| 981 dispatchers->push_back(nullptr); | 981 dispatchers->push_back(nullptr); |
| 982 } | 982 } |
| 983 } | 983 } |
| 984 message->SetDispatchers(dispatchers.Pass()); | 984 message->SetDispatchers(std::move(dispatchers)); |
| 985 return MOJO_RESULT_OK; | 985 return MOJO_RESULT_OK; |
| 986 } | 986 } |
| 987 | 987 |
| 988 void MessagePipeDispatcher::RequestNontransferableChannel() { | 988 void MessagePipeDispatcher::RequestNontransferableChannel() { |
| 989 lock().AssertAcquired(); | 989 lock().AssertAcquired(); |
| 990 CHECK(!transferable_); | 990 CHECK(!transferable_); |
| 991 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); | 991 CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
| 992 non_transferable_state_ = CONNECT_CALLED; | 992 non_transferable_state_ = CONNECT_CALLED; |
| 993 #if !defined(OFFICIAL_BUILD) | 993 #if !defined(OFFICIAL_BUILD) |
| 994 non_transferable_bound_stack_.reset(new base::debug::StackTrace); | 994 non_transferable_bound_stack_.reset(new base::debug::StackTrace); |
| 995 #endif | 995 #endif |
| 996 | 996 |
| 997 // PostTask since the broker can call us back synchronously. | 997 // PostTask since the broker can call us back synchronously. |
| 998 internal::g_io_thread_task_runner->PostTask( | 998 internal::g_io_thread_task_runner->PostTask( |
| 999 FROM_HERE, | 999 FROM_HERE, |
| 1000 base::Bind(&Broker::ConnectMessagePipe, | 1000 base::Bind(&Broker::ConnectMessagePipe, |
| 1001 base::Unretained(internal::g_broker), pipe_id_, | 1001 base::Unretained(internal::g_broker), pipe_id_, |
| 1002 base::Unretained(this))); | 1002 base::Unretained(this))); |
| 1003 } | 1003 } |
| 1004 | 1004 |
| 1005 } // namespace edk | 1005 } // namespace edk |
| 1006 } // namespace mojo | 1006 } // namespace mojo |
| OLD | NEW |