Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 150 | 150 |
| 151 void MessagePipeDispatcher::InitOnIO() { | 151 void MessagePipeDispatcher::InitOnIO() { |
| 152 base::AutoLock locker(lock()); | 152 base::AutoLock locker(lock()); |
| 153 CHECK(transferable_); | 153 CHECK(transferable_); |
| 154 calling_init_ = true; | 154 calling_init_ = true; |
| 155 if (channel_) | 155 if (channel_) |
| 156 channel_->Init(this); | 156 channel_->Init(this); |
| 157 calling_init_ = false; | 157 calling_init_ = false; |
| 158 } | 158 } |
| 159 | 159 |
| 160 void MessagePipeDispatcher::CloseOnIO() { | 160 void MessagePipeDispatcher::ReleaseAndCloseOnIO() { |
| 161 base::AutoLock locker(lock()); | 161 base::AutoLock locker(lock()); |
| 162 Release(); // To match CloseImplNoLock. | 162 Release(); // To match CloseImplNoLock. |
| 163 if (transferable_) { | 163 CloseOnIO(); |
| 164 if (channel_) { | 164 } |
| 165 channel_->Shutdown(); | |
| 166 channel_ = nullptr; | |
| 167 } | |
| 168 } else { | |
| 169 if (non_transferable_state_ == CONNECT_CALLED || | |
| 170 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
| 171 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
| 172 RequestNontransferableChannel(); | |
| 173 | 165 |
| 174 // We can't cancel the pending request yet, since the other side of the | 166 void MessagePipeDispatcher::CloseOnIO() { |
| 175 // message pipe would want to get pending outgoing messages (if any) or | 167 lock().AssertAcquired(); |
| 176 // at least know that this end was closed. So keep this object alive until | 168 |
| 177 // then. | 169 if (channel_) { |
| 178 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | 170 DCHECK(!close_requested_); |
| 179 AddRef(); | 171 close_requested_ = true; |
| 180 } else if (non_transferable_state_ == CONNECTED) { | 172 AddRef(); |
| 181 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 173 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 182 non_transferable_state_ = CLOSED; | 174 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
| 183 channel_ = nullptr; | 175 if (!transferable_) |
| 184 } | 176 message->set_route_id(pipe_id_); |
| 177 channel_->WriteMessage(std::move(message)); | |
| 178 return; | |
| 179 } | |
| 180 | |
| 181 if (!transferable_ && | |
| 182 (non_transferable_state_ == CONNECT_CALLED || | |
| 183 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { | |
| 184 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
| 185 RequestNontransferableChannel(); | |
| 186 | |
| 187 // We can't cancel the pending request yet, since the other side of the | |
| 188 // message pipe would want to get pending outgoing messages (if any) or at | |
| 189 // least know that this end was closed. So keep this object alive until | |
| 190 // then. | |
| 191 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | |
| 192 AddRef(); | |
| 185 } | 193 } |
| 186 } | 194 } |
| 187 | 195 |
| 188 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 196 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| 189 return Type::MESSAGE_PIPE; | 197 return Type::MESSAGE_PIPE; |
| 190 } | 198 } |
| 191 | 199 |
| 192 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { | 200 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| 201 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
| 193 base::AutoLock locker(lock()); | 202 base::AutoLock locker(lock()); |
| 194 channel_ = channel; | 203 channel_ = channel; |
| 195 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { | 204 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
| 196 channel_->WriteMessage( | 205 channel_->WriteMessage( |
| 197 non_transferable_outgoing_message_queue_.GetMessage()); | 206 non_transferable_outgoing_message_queue_.GetMessage()); |
| 198 } | 207 } |
| 199 | 208 |
| 200 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { | 209 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
| 201 // We kept this object alive until it's connected, we can release it now. | 210 non_transferable_state_ = CONNECTED; |
| 202 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 211 // We kept this object alive until it's connected, we can close it now. |
| 203 non_transferable_state_ = CLOSED; | 212 CloseOnIO(); |
| 204 channel_ = nullptr; | 213 // Balance the AddRef in CloseOnIO. |
| 205 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | 214 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
| 206 } else { | 215 return; |
| 207 non_transferable_state_ = CONNECTED; | |
| 208 } | 216 } |
| 217 | |
| 218 non_transferable_state_ = CONNECTED; | |
| 209 } | 219 } |
| 210 | 220 |
| 211 #if defined(OS_WIN) | 221 #if defined(OS_WIN) |
| 212 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 222 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
| 213 // best way we want to share this. | 223 // best way we want to share this. |
| 214 // Since this is used for serialization of messages read/written to a MP that | 224 // Since this is used for serialization of messages read/written to a MP that |
| 215 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 225 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
| 216 // them when a MP is being sent. As a result, even for POSIX we will probably | 226 // them when a MP is being sent. As a result, even for POSIX we will probably |
| 217 // want to send the handles to the shell process and exchange them for tokens | 227 // want to send the handles to the shell process and exchange them for tokens |
| 218 // (since we can be sure that the shell will respond to our IPCs, compared to | 228 // (since we can be sure that the shell will respond to our IPCs, compared to |
| (...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 401 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) | 411 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
| 402 : channel_(nullptr), | 412 : channel_(nullptr), |
| 403 serialized_read_fds_length_(0u), | 413 serialized_read_fds_length_(0u), |
| 404 serialized_write_fds_length_(0u), | 414 serialized_write_fds_length_(0u), |
| 405 serialized_message_fds_length_(0u), | 415 serialized_message_fds_length_(0u), |
| 406 pipe_id_(0), | 416 pipe_id_(0), |
| 407 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), | 417 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
| 408 serialized_(false), | 418 serialized_(false), |
| 409 calling_init_(false), | 419 calling_init_(false), |
| 410 write_error_(false), | 420 write_error_(false), |
| 411 transferable_(transferable) { | 421 transferable_(transferable), |
| 422 close_requested_(false) { | |
| 412 } | 423 } |
| 413 | 424 |
| 414 MessagePipeDispatcher::~MessagePipeDispatcher() { | 425 MessagePipeDispatcher::~MessagePipeDispatcher() { |
| 415 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 426 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
| 416 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 427 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
| 417 // down and so when it was deleting pending tasks it caused the last reference | 428 // down and so when it was deleting pending tasks it caused the last reference |
| 418 // to destruct this object. In that case, safe to destroy the channel. | 429 // to destruct this object. In that case, safe to destroy the channel. |
| 419 if (channel_ && | 430 if (channel_ && |
| 420 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { | 431 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { |
| 421 if (transferable_) { | 432 if (transferable_) { |
| (...skipping 21 matching lines...) Expand all Loading... | |
| 443 if (!transferable_ && non_transferable_state_ == CLOSED) | 454 if (!transferable_ && non_transferable_state_ == CLOSED) |
| 444 return; | 455 return; |
| 445 | 456 |
| 446 // We take a manual refcount because at shutdown, the task below might not get | 457 // We take a manual refcount because at shutdown, the task below might not get |
| 447 // a chance to execute. If that happens, the RawChannel will still call our | 458 // a chance to execute. If that happens, the RawChannel will still call our |
| 448 // OnError method because it always runs (since it watches thread | 459 // OnError method because it always runs (since it watches thread |
| 449 // destruction). So to avoid UAF, manually add a reference and only release it | 460 // destruction). So to avoid UAF, manually add a reference and only release it |
| 450 // if the task runs. | 461 // if the task runs. |
| 451 AddRef(); | 462 AddRef(); |
| 452 internal::g_io_thread_task_runner->PostTask( | 463 internal::g_io_thread_task_runner->PostTask( |
| 453 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 464 FROM_HERE, base::Bind(&MessagePipeDispatcher::ReleaseAndCloseOnIO, this)); |
| 454 } | 465 } |
| 455 | 466 |
| 456 void MessagePipeDispatcher::SerializeInternal() { | 467 void MessagePipeDispatcher::SerializeInternal() { |
| 457 serialized_ = true; | 468 serialized_ = true; |
| 458 if (!transferable_) { | 469 if (!transferable_) { |
| 459 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | 470 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 460 << "Non transferable message pipe being sent after read/write/waited. " | 471 << "Non transferable message pipe being sent after read/write/waited. " |
| 461 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " | 472 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
| 462 << "the pipe can be sent after it's read or written. This message pipe " | 473 << "the pipe can be sent after it's read or written. This message pipe " |
| 463 << "was previously bound at:\n" | 474 << "was previously bound at:\n" |
| (...skipping 397 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 861 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 872 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
| 862 if (message_view.transport_data_buffer_size() > 0) { | 873 if (message_view.transport_data_buffer_size() > 0) { |
| 863 DCHECK(message_view.transport_data_buffer()); | 874 DCHECK(message_view.transport_data_buffer()); |
| 864 message->SetDispatchers(TransportData::DeserializeDispatchers( | 875 message->SetDispatchers(TransportData::DeserializeDispatchers( |
| 865 message_view.transport_data_buffer(), | 876 message_view.transport_data_buffer(), |
| 866 message_view.transport_data_buffer_size(), | 877 message_view.transport_data_buffer_size(), |
| 867 std::move(platform_handles))); | 878 std::move(platform_handles))); |
| 868 } | 879 } |
| 869 | 880 |
| 870 if (started_transport_.Try()) { | 881 if (started_transport_.Try()) { |
| 871 // we're not in the middle of being sent | 882 // We're not in the middle of being sent. |
| 872 | 883 |
| 873 // Can get synchronously called back in Init if there was initial data. | 884 // Can get synchronously called back in Init if there was initial data. |
| 874 scoped_ptr<base::AutoLock> locker; | 885 scoped_ptr<base::AutoLock> locker; |
| 875 if (!calling_init_) { | 886 if (!calling_init_) |
| 876 locker.reset(new base::AutoLock(lock())); | 887 locker.reset(new base::AutoLock(lock())); |
| 888 | |
| 889 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { | |
| 890 if (transferable_) { | |
| 891 channel_->Shutdown(); | |
| 892 } else { | |
| 893 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
| 894 non_transferable_state_ = CLOSED; | |
| 895 } | |
| 896 channel_ = nullptr; | |
| 897 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
| 898 } else { | |
| 899 bool was_empty = message_queue_.IsEmpty(); | |
| 900 message_queue_.AddMessage(std::move(message)); | |
| 901 if (was_empty) | |
| 902 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
| 877 } | 903 } |
| 878 | 904 |
| 879 bool was_empty = message_queue_.IsEmpty(); | |
| 880 message_queue_.AddMessage(std::move(message)); | |
| 881 if (was_empty) | |
| 882 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
| 883 | |
| 884 started_transport_.Release(); | 905 started_transport_.Release(); |
| 885 } else { | 906 } else { |
| 886 // If RawChannel is calling OnRead, that means it has its read_lock_ | 907 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
| 887 // acquired. That means StartSerialize can't be accessing message queue as | 908 // We got a request to shutdown the channel but this object is already |
| 888 // it waits on ReleaseHandle first which acquires readlock_. | 909 // calling into channel to serialize it. Since all the other side cares |
| 889 message_queue_.AddMessage(std::move(message)); | 910 // about is flushing pending messages, we bounce the quit back to it. |
| 911 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 912 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); | |
| 913 channel_->WriteMessage(std::move(message)); | |
| 914 } else { | |
| 915 // If RawChannel is calling OnRead, that means it has its read_lock_ | |
| 916 // acquired. That means StartSerialize can't be accessing message queue as | |
| 917 // it waits on ReleaseHandle first which acquires read_lock_. | |
| 918 message_queue_.AddMessage(std::move(message)); | |
| 919 } | |
| 890 } | 920 } |
| 891 } | 921 } |
| 892 | 922 |
| 893 void MessagePipeDispatcher::OnError(Error error) { | 923 void MessagePipeDispatcher::OnError(Error error) { |
| 894 // If there's a read error, then the other side of the pipe is closed. By | 924 // If there's a read error, then the other side of the pipe is closed. By |
| 895 // definition, we can't write since there's no one to read it. And we can't | 925 // definition, we can't write since there's no one to read it. And we can't |
| 896 // read anymore, since we just got a read erorr. So we close the pipe. | 926 // read anymore, since we just got a read erorr. So we close the pipe. |
| 897 // If there's a write error, then we stop writing. But we keep the pipe open | 927 // If there's a write error, then we stop writing. But we keep the pipe open |
| 898 // until we finish reading everything in it. This is because it's valid for | 928 // until we finish reading everything in it. This is because it's valid for |
| 899 // one endpoint to write some data and close their pipe immediately. Even | 929 // one endpoint to write some data and close their pipe immediately. Even |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 918 // Write errors are slightly notable: they probably shouldn't happen under | 948 // Write errors are slightly notable: they probably shouldn't happen under |
| 919 // normal operation (but maybe the other side crashed). | 949 // normal operation (but maybe the other side crashed). |
| 920 LOG(WARNING) << "MessagePipeDispatcher write error"; | 950 LOG(WARNING) << "MessagePipeDispatcher write error"; |
| 921 DCHECK_EQ(write_error_, false) << "Should only get one write error."; | 951 DCHECK_EQ(write_error_, false) << "Should only get one write error."; |
| 922 write_error_ = true; | 952 write_error_ = true; |
| 923 break; | 953 break; |
| 924 } | 954 } |
| 925 | 955 |
| 926 if (started_transport_.Try()) { | 956 if (started_transport_.Try()) { |
| 927 base::AutoLock locker(lock()); | 957 base::AutoLock locker(lock()); |
| 928 // We can get two OnError callbacks before the post task below completes. | |
| 929 // Although RawChannel still has a pointer to this object until Shutdown is | |
| 930 // called, that is safe since this class always does a PostTask to the IO | |
| 931 // thread to self destruct. | |
| 932 if (channel_ && error != ERROR_WRITE) { | 958 if (channel_ && error != ERROR_WRITE) { |
| 933 if (transferable_) { | 959 if (transferable_) { |
| 934 channel_->Shutdown(); | 960 channel_->Shutdown(); |
| 935 } else { | 961 } else { |
| 936 CHECK_NE(non_transferable_state_, CLOSED); | 962 CHECK_NE(non_transferable_state_, CLOSED); |
| 937 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 963 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| 938 non_transferable_state_ = CLOSED; | 964 non_transferable_state_ = CLOSED; |
| 939 } | 965 } |
| 940 channel_ = nullptr; | 966 channel_ = nullptr; |
| 967 if (close_requested_) { | |
| 968 // Balance AddRef in CloseOnIO. | |
| 969 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | |
|
sky
2015/12/30 16:33:32
Is there a reason you can't Release() rather than
jam
2015/12/30 17:03:40
This might be the last reference, so I was just si
| |
| 970 } | |
| 941 } | 971 } |
| 942 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 972 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 943 started_transport_.Release(); | 973 started_transport_.Release(); |
| 944 } else { | 974 } else { |
| 945 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 975 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| 946 } | 976 } |
| 947 } | 977 } |
| 948 | 978 |
| 949 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 979 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
| 950 MessageInTransit* message, | 980 MessageInTransit* message, |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1005 // PostTask since the broker can call us back synchronously. | 1035 // PostTask since the broker can call us back synchronously. |
| 1006 internal::g_io_thread_task_runner->PostTask( | 1036 internal::g_io_thread_task_runner->PostTask( |
| 1007 FROM_HERE, | 1037 FROM_HERE, |
| 1008 base::Bind(&Broker::ConnectMessagePipe, | 1038 base::Bind(&Broker::ConnectMessagePipe, |
| 1009 base::Unretained(internal::g_broker), pipe_id_, | 1039 base::Unretained(internal::g_broker), pipe_id_, |
| 1010 base::Unretained(this))); | 1040 base::Unretained(this))); |
| 1011 } | 1041 } |
| 1012 | 1042 |
| 1013 } // namespace edk | 1043 } // namespace edk |
| 1014 } // namespace mojo | 1044 } // namespace mojo |
| OLD | NEW |