| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 150 | 150 |
| 151 void MessagePipeDispatcher::InitOnIO() { | 151 void MessagePipeDispatcher::InitOnIO() { |
| 152 base::AutoLock locker(lock()); | 152 base::AutoLock locker(lock()); |
| 153 CHECK(transferable_); | 153 CHECK(transferable_); |
| 154 calling_init_ = true; | 154 calling_init_ = true; |
| 155 if (channel_) | 155 if (channel_) |
| 156 channel_->Init(this); | 156 channel_->Init(this); |
| 157 calling_init_ = false; | 157 calling_init_ = false; |
| 158 } | 158 } |
| 159 | 159 |
| 160 void MessagePipeDispatcher::CloseOnIOAndRelease() { |
| 161 { |
| 162 base::AutoLock locker(lock()); |
| 163 CloseOnIO(); |
| 164 } |
| 165 Release(); // To match CloseImplNoLock. |
| 166 } |
| 167 |
| 160 void MessagePipeDispatcher::CloseOnIO() { | 168 void MessagePipeDispatcher::CloseOnIO() { |
| 161 base::AutoLock locker(lock()); | 169 lock().AssertAcquired(); |
| 162 Release(); // To match CloseImplNoLock. | |
| 163 if (transferable_) { | |
| 164 if (channel_) { | |
| 165 channel_->Shutdown(); | |
| 166 channel_ = nullptr; | |
| 167 } | |
| 168 } else { | |
| 169 if (non_transferable_state_ == CONNECT_CALLED || | |
| 170 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
| 171 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
| 172 RequestNontransferableChannel(); | |
| 173 | 170 |
| 174 // We can't cancel the pending request yet, since the other side of the | 171 if (channel_) { |
| 175 // message pipe would want to get pending outgoing messages (if any) or | 172 // If we closed the channel now then in-flight message pipes wouldn't get |
| 176 // at least know that this end was closed. So keep this object alive until | 173 // closed, and their other side wouldn't get a connection error notification |
| 177 // then. | 174 // which could lead to hangs or leaks. So we ask the other side of this |
| 178 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | 175 // message pipe to close, which ensures that we have dispatched all |
| 179 AddRef(); | 176 // in-flight message pipes. |
| 180 } else if (non_transferable_state_ == CONNECTED) { | 177 DCHECK(!close_requested_); |
| 181 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 178 close_requested_ = true; |
| 182 non_transferable_state_ = CLOSED; | 179 AddRef(); |
| 183 channel_ = nullptr; | 180 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 184 } | 181 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
| 182 if (!transferable_) |
| 183 message->set_route_id(pipe_id_); |
| 184 channel_->WriteMessage(std::move(message)); |
| 185 return; |
| 186 } |
| 187 |
| 188 if (!transferable_ && |
| 189 (non_transferable_state_ == CONNECT_CALLED || |
| 190 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { |
| 191 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 192 RequestNontransferableChannel(); |
| 193 |
| 194 // We can't cancel the pending request yet, since the other side of the |
| 195 // message pipe would want to get pending outgoing messages (if any) or at |
| 196 // least know that this end was closed. So keep this object alive until |
| 197 // then. |
| 198 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
| 199 AddRef(); |
| 185 } | 200 } |
| 186 } | 201 } |
| 187 | 202 |
| 188 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 203 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| 189 return Type::MESSAGE_PIPE; | 204 return Type::MESSAGE_PIPE; |
| 190 } | 205 } |
| 191 | 206 |
| 192 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { | 207 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| 208 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| 193 base::AutoLock locker(lock()); | 209 base::AutoLock locker(lock()); |
| 194 channel_ = channel; | 210 channel_ = channel; |
| 195 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { | 211 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
| 196 channel_->WriteMessage( | 212 channel_->WriteMessage( |
| 197 non_transferable_outgoing_message_queue_.GetMessage()); | 213 non_transferable_outgoing_message_queue_.GetMessage()); |
| 198 } | 214 } |
| 199 | 215 |
| 200 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { | 216 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
| 201 // We kept this object alive until it's connected, we can release it now. | |
| 202 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
| 203 non_transferable_state_ = CLOSED; | |
| 204 channel_ = nullptr; | |
| 205 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | |
| 206 } else { | |
| 207 non_transferable_state_ = CONNECTED; | 217 non_transferable_state_ = CONNECTED; |
| 218 // We kept this object alive until it's connected, we can close it now. |
| 219 CloseOnIO(); |
| 220 // Balance the AddRef in CloseOnIO. |
| 221 Release(); |
| 222 return; |
| 208 } | 223 } |
| 224 |
| 225 non_transferable_state_ = CONNECTED; |
| 209 } | 226 } |
| 210 | 227 |
| 211 #if defined(OS_WIN) | 228 #if defined(OS_WIN) |
| 212 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 229 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
| 213 // best way we want to share this. | 230 // best way we want to share this. |
| 214 // Since this is used for serialization of messages read/written to a MP that | 231 // Since this is used for serialization of messages read/written to a MP that |
| 215 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 232 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
| 216 // them when a MP is being sent. As a result, even for POSIX we will probably | 233 // them when a MP is being sent. As a result, even for POSIX we will probably |
| 217 // want to send the handles to the shell process and exchange them for tokens | 234 // want to send the handles to the shell process and exchange them for tokens |
| 218 // (since we can be sure that the shell will respond to our IPCs, compared to | 235 // (since we can be sure that the shell will respond to our IPCs, compared to |
| (...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 401 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) | 418 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
| 402 : channel_(nullptr), | 419 : channel_(nullptr), |
| 403 serialized_read_fds_length_(0u), | 420 serialized_read_fds_length_(0u), |
| 404 serialized_write_fds_length_(0u), | 421 serialized_write_fds_length_(0u), |
| 405 serialized_message_fds_length_(0u), | 422 serialized_message_fds_length_(0u), |
| 406 pipe_id_(0), | 423 pipe_id_(0), |
| 407 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), | 424 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
| 408 serialized_(false), | 425 serialized_(false), |
| 409 calling_init_(false), | 426 calling_init_(false), |
| 410 write_error_(false), | 427 write_error_(false), |
| 411 transferable_(transferable) { | 428 transferable_(transferable), |
| 429 close_requested_(false) { |
| 412 } | 430 } |
| 413 | 431 |
| 414 MessagePipeDispatcher::~MessagePipeDispatcher() { | 432 MessagePipeDispatcher::~MessagePipeDispatcher() { |
| 415 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 433 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
| 416 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 434 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
| 417 // down and so when it was deleting pending tasks it caused the last reference | 435 // down and so when it was deleting pending tasks it caused the last reference |
| 418 // to destruct this object. In that case, safe to destroy the channel. | 436 // to destruct this object. In that case, safe to destroy the channel. |
| 419 if (channel_ && | 437 if (channel_ && |
| 420 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { | 438 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { |
| 421 if (transferable_) { | 439 if (transferable_) { |
| (...skipping 21 matching lines...) Expand all Loading... |
| 443 if (!transferable_ && non_transferable_state_ == CLOSED) | 461 if (!transferable_ && non_transferable_state_ == CLOSED) |
| 444 return; | 462 return; |
| 445 | 463 |
| 446 // We take a manual refcount because at shutdown, the task below might not get | 464 // We take a manual refcount because at shutdown, the task below might not get |
| 447 // a chance to execute. If that happens, the RawChannel will still call our | 465 // a chance to execute. If that happens, the RawChannel will still call our |
| 448 // OnError method because it always runs (since it watches thread | 466 // OnError method because it always runs (since it watches thread |
| 449 // destruction). So to avoid UAF, manually add a reference and only release it | 467 // destruction). So to avoid UAF, manually add a reference and only release it |
| 450 // if the task runs. | 468 // if the task runs. |
| 451 AddRef(); | 469 AddRef(); |
| 452 internal::g_io_thread_task_runner->PostTask( | 470 internal::g_io_thread_task_runner->PostTask( |
| 453 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 471 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this)); |
| 454 } | 472 } |
| 455 | 473 |
| 456 void MessagePipeDispatcher::SerializeInternal() { | 474 void MessagePipeDispatcher::SerializeInternal() { |
| 457 serialized_ = true; | 475 serialized_ = true; |
| 458 if (!transferable_) { | 476 if (!transferable_) { |
| 459 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | 477 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 460 << "Non transferable message pipe being sent after read/write/waited. " | 478 << "Non transferable message pipe being sent after read/write/waited. " |
| 461 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " | 479 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
| 462 << "the pipe can be sent after it's read or written. This message pipe " | 480 << "the pipe can be sent after it's read or written. This message pipe " |
| 463 << "was previously bound at:\n" | 481 << "was previously bound at:\n" |
| (...skipping 396 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 860 ScopedPlatformHandleVectorPtr platform_handles) { | 878 ScopedPlatformHandleVectorPtr platform_handles) { |
| 861 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 879 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
| 862 if (message_view.transport_data_buffer_size() > 0) { | 880 if (message_view.transport_data_buffer_size() > 0) { |
| 863 DCHECK(message_view.transport_data_buffer()); | 881 DCHECK(message_view.transport_data_buffer()); |
| 864 message->SetDispatchers(TransportData::DeserializeDispatchers( | 882 message->SetDispatchers(TransportData::DeserializeDispatchers( |
| 865 message_view.transport_data_buffer(), | 883 message_view.transport_data_buffer(), |
| 866 message_view.transport_data_buffer_size(), | 884 message_view.transport_data_buffer_size(), |
| 867 std::move(platform_handles))); | 885 std::move(platform_handles))); |
| 868 } | 886 } |
| 869 | 887 |
| 888 bool call_release = false; |
| 870 if (started_transport_.Try()) { | 889 if (started_transport_.Try()) { |
| 871 // we're not in the middle of being sent | 890 // We're not in the middle of being sent. |
| 872 | 891 |
| 873 // Can get synchronously called back in Init if there was initial data. | 892 // Can get synchronously called back in Init if there was initial data. |
| 874 scoped_ptr<base::AutoLock> locker; | 893 scoped_ptr<base::AutoLock> locker; |
| 875 if (!calling_init_) { | 894 if (!calling_init_) |
| 876 locker.reset(new base::AutoLock(lock())); | 895 locker.reset(new base::AutoLock(lock())); |
| 896 |
| 897 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
| 898 if (transferable_) { |
| 899 channel_->Shutdown(); |
| 900 } else { |
| 901 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| 902 non_transferable_state_ = CLOSED; |
| 903 } |
| 904 channel_ = nullptr; |
| 905 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 906 if (close_requested_) { |
| 907 // We requested the other side to close the connection while they also |
| 908 // did the same. We must balance out the AddRef in CloseOnIO to ensure |
| 909 // this object isn't leaked. |
| 910 call_release = true; |
| 911 } |
| 912 } else { |
| 913 bool was_empty = message_queue_.IsEmpty(); |
| 914 message_queue_.AddMessage(std::move(message)); |
| 915 if (was_empty) |
| 916 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 877 } | 917 } |
| 878 | 918 |
| 879 bool was_empty = message_queue_.IsEmpty(); | |
| 880 message_queue_.AddMessage(std::move(message)); | |
| 881 if (was_empty) | |
| 882 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
| 883 | |
| 884 started_transport_.Release(); | 919 started_transport_.Release(); |
| 885 } else { | 920 } else { |
| 886 // If RawChannel is calling OnRead, that means it has its read_lock_ | 921 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
| 887 // acquired. That means StartSerialize can't be accessing message queue as | 922 // We got a request to shutdown the channel but this object is already |
| 888 // it waits on ReleaseHandle first which acquires readlock_. | 923 // calling into channel to serialize it. Since all the other side cares |
| 889 message_queue_.AddMessage(std::move(message)); | 924 // about is flushing pending messages, we bounce the quit back to it. |
| 925 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 926 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
| 927 channel_->WriteMessage(std::move(message)); |
| 928 } else { |
| 929 // If RawChannel is calling OnRead, that means it has its read_lock_ |
| 930 // acquired. That means StartSerialize can't be accessing message queue as |
| 931 // it waits on ReleaseHandle first which acquires read_lock_. |
| 932 message_queue_.AddMessage(std::move(message)); |
| 933 } |
| 890 } | 934 } |
| 935 |
| 936 if (call_release) |
| 937 Release(); |
| 891 } | 938 } |
| 892 | 939 |
| 893 void MessagePipeDispatcher::OnError(Error error) { | 940 void MessagePipeDispatcher::OnError(Error error) { |
| 894 // If there's a read error, then the other side of the pipe is closed. By | 941 // If there's a read error, then the other side of the pipe is closed. By |
| 895 // definition, we can't write since there's no one to read it. And we can't | 942 // definition, we can't write since there's no one to read it. And we can't |
| 896 // read anymore, since we just got a read erorr. So we close the pipe. | 943 // read anymore, since we just got a read erorr. So we close the pipe. |
| 897 // If there's a write error, then we stop writing. But we keep the pipe open | 944 // If there's a write error, then we stop writing. But we keep the pipe open |
| 898 // until we finish reading everything in it. This is because it's valid for | 945 // until we finish reading everything in it. This is because it's valid for |
| 899 // one endpoint to write some data and close their pipe immediately. Even | 946 // one endpoint to write some data and close their pipe immediately. Even |
| 900 // though the other end can't write anymore, it should still get all the data. | 947 // though the other end can't write anymore, it should still get all the data. |
| (...skipping 15 matching lines...) Expand all Loading... |
| 916 break; | 963 break; |
| 917 case ERROR_WRITE: | 964 case ERROR_WRITE: |
| 918 // Write errors are slightly notable: they probably shouldn't happen under | 965 // Write errors are slightly notable: they probably shouldn't happen under |
| 919 // normal operation (but maybe the other side crashed). | 966 // normal operation (but maybe the other side crashed). |
| 920 LOG(WARNING) << "MessagePipeDispatcher write error"; | 967 LOG(WARNING) << "MessagePipeDispatcher write error"; |
| 921 DCHECK_EQ(write_error_, false) << "Should only get one write error."; | 968 DCHECK_EQ(write_error_, false) << "Should only get one write error."; |
| 922 write_error_ = true; | 969 write_error_ = true; |
| 923 break; | 970 break; |
| 924 } | 971 } |
| 925 | 972 |
| 973 bool call_release = false; |
| 926 if (started_transport_.Try()) { | 974 if (started_transport_.Try()) { |
| 927 base::AutoLock locker(lock()); | 975 base::AutoLock locker(lock()); |
| 928 // We can get two OnError callbacks before the post task below completes. | |
| 929 // Although RawChannel still has a pointer to this object until Shutdown is | |
| 930 // called, that is safe since this class always does a PostTask to the IO | |
| 931 // thread to self destruct. | |
| 932 if (channel_ && error != ERROR_WRITE) { | 976 if (channel_ && error != ERROR_WRITE) { |
| 933 if (transferable_) { | 977 if (transferable_) { |
| 934 channel_->Shutdown(); | 978 channel_->Shutdown(); |
| 935 } else { | 979 } else { |
| 936 CHECK_NE(non_transferable_state_, CLOSED); | 980 CHECK_NE(non_transferable_state_, CLOSED); |
| 937 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 981 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| 938 non_transferable_state_ = CLOSED; | 982 non_transferable_state_ = CLOSED; |
| 939 } | 983 } |
| 940 channel_ = nullptr; | 984 channel_ = nullptr; |
| 985 if (close_requested_) { |
| 986 // Balance AddRef in CloseOnIO. |
| 987 call_release = true; |
| 988 } |
| 941 } | 989 } |
| 942 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 990 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 943 started_transport_.Release(); | 991 started_transport_.Release(); |
| 944 } else { | 992 } else { |
| 945 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 993 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| 946 } | 994 } |
| 995 |
| 996 if (call_release) |
| 997 Release(); |
| 947 } | 998 } |
| 948 | 999 |
| 949 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 1000 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
| 950 MessageInTransit* message, | 1001 MessageInTransit* message, |
| 951 std::vector<DispatcherTransport>* transports) { | 1002 std::vector<DispatcherTransport>* transports) { |
| 952 DCHECK(!message->has_dispatchers()); | 1003 DCHECK(!message->has_dispatchers()); |
| 953 | 1004 |
| 954 // You're not allowed to send either handle to a message pipe over the message | 1005 // You're not allowed to send either handle to a message pipe over the message |
| 955 // pipe, so check for this. (The case of trying to write a handle to itself is | 1006 // pipe, so check for this. (The case of trying to write a handle to itself is |
| 956 // taken care of by |Core|. That case kind of makes sense, but leads to | 1007 // taken care of by |Core|. That case kind of makes sense, but leads to |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1005 // PostTask since the broker can call us back synchronously. | 1056 // PostTask since the broker can call us back synchronously. |
| 1006 internal::g_io_thread_task_runner->PostTask( | 1057 internal::g_io_thread_task_runner->PostTask( |
| 1007 FROM_HERE, | 1058 FROM_HERE, |
| 1008 base::Bind(&Broker::ConnectMessagePipe, | 1059 base::Bind(&Broker::ConnectMessagePipe, |
| 1009 base::Unretained(internal::g_broker), pipe_id_, | 1060 base::Unretained(internal::g_broker), pipe_id_, |
| 1010 base::Unretained(this))); | 1061 base::Unretained(this))); |
| 1011 } | 1062 } |
| 1012 | 1063 |
| 1013 } // namespace edk | 1064 } // namespace edk |
| 1014 } // namespace mojo | 1065 } // namespace mojo |
| OLD | NEW |