OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
150 | 150 |
151 void MessagePipeDispatcher::InitOnIO() { | 151 void MessagePipeDispatcher::InitOnIO() { |
152 base::AutoLock locker(lock()); | 152 base::AutoLock locker(lock()); |
153 CHECK(transferable_); | 153 CHECK(transferable_); |
154 calling_init_ = true; | 154 calling_init_ = true; |
155 if (channel_) | 155 if (channel_) |
156 channel_->Init(this); | 156 channel_->Init(this); |
157 calling_init_ = false; | 157 calling_init_ = false; |
158 } | 158 } |
159 | 159 |
| 160 void MessagePipeDispatcher::CloseOnIOAndRelease() { |
| 161 { |
| 162 base::AutoLock locker(lock()); |
| 163 CloseOnIO(); |
| 164 } |
| 165 Release(); // To match CloseImplNoLock. |
| 166 } |
| 167 |
160 void MessagePipeDispatcher::CloseOnIO() { | 168 void MessagePipeDispatcher::CloseOnIO() { |
161 base::AutoLock locker(lock()); | 169 lock().AssertAcquired(); |
162 Release(); // To match CloseImplNoLock. | |
163 if (transferable_) { | |
164 if (channel_) { | |
165 channel_->Shutdown(); | |
166 channel_ = nullptr; | |
167 } | |
168 } else { | |
169 if (non_transferable_state_ == CONNECT_CALLED || | |
170 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
171 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
172 RequestNontransferableChannel(); | |
173 | 170 |
174 // We can't cancel the pending request yet, since the other side of the | 171 if (channel_) { |
175 // message pipe would want to get pending outgoing messages (if any) or | 172 // If we closed the channel now then in-flight message pipes wouldn't get |
176 // at least know that this end was closed. So keep this object alive until | 173 // closed, and their other side wouldn't get a connection error notification |
177 // then. | 174 // which could lead to hangs or leaks. So we ask the other side of this |
178 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | 175 // message pipe to close, which ensures that we have dispatched all |
179 AddRef(); | 176 // in-flight message pipes. |
180 } else if (non_transferable_state_ == CONNECTED) { | 177 DCHECK(!close_requested_); |
181 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 178 close_requested_ = true; |
182 non_transferable_state_ = CLOSED; | 179 AddRef(); |
183 channel_ = nullptr; | 180 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
184 } | 181 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
| 182 if (!transferable_) |
| 183 message->set_route_id(pipe_id_); |
| 184 channel_->WriteMessage(std::move(message)); |
| 185 return; |
| 186 } |
| 187 |
| 188 if (!transferable_ && |
| 189 (non_transferable_state_ == CONNECT_CALLED || |
| 190 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { |
| 191 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| 192 RequestNontransferableChannel(); |
| 193 |
| 194 // We can't cancel the pending request yet, since the other side of the |
| 195 // message pipe would want to get pending outgoing messages (if any) or at |
| 196 // least know that this end was closed. So keep this object alive until |
| 197 // then. |
| 198 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
| 199 AddRef(); |
185 } | 200 } |
186 } | 201 } |
187 | 202 |
188 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 203 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
189 return Type::MESSAGE_PIPE; | 204 return Type::MESSAGE_PIPE; |
190 } | 205 } |
191 | 206 |
192 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { | 207 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| 208 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
193 base::AutoLock locker(lock()); | 209 base::AutoLock locker(lock()); |
194 channel_ = channel; | 210 channel_ = channel; |
195 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { | 211 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
196 channel_->WriteMessage( | 212 channel_->WriteMessage( |
197 non_transferable_outgoing_message_queue_.GetMessage()); | 213 non_transferable_outgoing_message_queue_.GetMessage()); |
198 } | 214 } |
199 | 215 |
200 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { | 216 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
201 // We kept this object alive until it's connected, we can release it now. | |
202 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
203 non_transferable_state_ = CLOSED; | |
204 channel_ = nullptr; | |
205 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | |
206 } else { | |
207 non_transferable_state_ = CONNECTED; | 217 non_transferable_state_ = CONNECTED; |
| 218 // We kept this object alive until it's connected, we can close it now. |
| 219 CloseOnIO(); |
| 220 // Balance the AddRef in CloseOnIO. |
| 221 Release(); |
| 222 return; |
208 } | 223 } |
| 224 |
| 225 non_transferable_state_ = CONNECTED; |
209 } | 226 } |
210 | 227 |
211 #if defined(OS_WIN) | 228 #if defined(OS_WIN) |
212 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 229 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
213 // best way we want to share this. | 230 // best way we want to share this. |
214 // Since this is used for serialization of messages read/written to a MP that | 231 // Since this is used for serialization of messages read/written to a MP that |
215 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 232 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
216 // them when a MP is being sent. As a result, even for POSIX we will probably | 233 // them when a MP is being sent. As a result, even for POSIX we will probably |
217 // want to send the handles to the shell process and exchange them for tokens | 234 // want to send the handles to the shell process and exchange them for tokens |
218 // (since we can be sure that the shell will respond to our IPCs, compared to | 235 // (since we can be sure that the shell will respond to our IPCs, compared to |
(...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
401 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) | 418 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
402 : channel_(nullptr), | 419 : channel_(nullptr), |
403 serialized_read_fds_length_(0u), | 420 serialized_read_fds_length_(0u), |
404 serialized_write_fds_length_(0u), | 421 serialized_write_fds_length_(0u), |
405 serialized_message_fds_length_(0u), | 422 serialized_message_fds_length_(0u), |
406 pipe_id_(0), | 423 pipe_id_(0), |
407 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), | 424 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
408 serialized_(false), | 425 serialized_(false), |
409 calling_init_(false), | 426 calling_init_(false), |
410 write_error_(false), | 427 write_error_(false), |
411 transferable_(transferable) { | 428 transferable_(transferable), |
| 429 close_requested_(false) { |
412 } | 430 } |
413 | 431 |
414 MessagePipeDispatcher::~MessagePipeDispatcher() { | 432 MessagePipeDispatcher::~MessagePipeDispatcher() { |
415 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 433 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
416 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 434 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
417 // down and so when it was deleting pending tasks it caused the last reference | 435 // down and so when it was deleting pending tasks it caused the last reference |
418 // to destruct this object. In that case, safe to destroy the channel. | 436 // to destruct this object. In that case, safe to destroy the channel. |
419 if (channel_ && | 437 if (channel_ && |
420 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { | 438 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { |
421 if (transferable_) { | 439 if (transferable_) { |
(...skipping 21 matching lines...) Expand all Loading... |
443 if (!transferable_ && non_transferable_state_ == CLOSED) | 461 if (!transferable_ && non_transferable_state_ == CLOSED) |
444 return; | 462 return; |
445 | 463 |
446 // We take a manual refcount because at shutdown, the task below might not get | 464 // We take a manual refcount because at shutdown, the task below might not get |
447 // a chance to execute. If that happens, the RawChannel will still call our | 465 // a chance to execute. If that happens, the RawChannel will still call our |
448 // OnError method because it always runs (since it watches thread | 466 // OnError method because it always runs (since it watches thread |
449 // destruction). So to avoid UAF, manually add a reference and only release it | 467 // destruction). So to avoid UAF, manually add a reference and only release it |
450 // if the task runs. | 468 // if the task runs. |
451 AddRef(); | 469 AddRef(); |
452 internal::g_io_thread_task_runner->PostTask( | 470 internal::g_io_thread_task_runner->PostTask( |
453 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 471 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this)); |
454 } | 472 } |
455 | 473 |
456 void MessagePipeDispatcher::SerializeInternal() { | 474 void MessagePipeDispatcher::SerializeInternal() { |
457 serialized_ = true; | 475 serialized_ = true; |
458 if (!transferable_) { | 476 if (!transferable_) { |
459 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | 477 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
460 << "Non transferable message pipe being sent after read/write/waited. " | 478 << "Non transferable message pipe being sent after read/write/waited. " |
461 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " | 479 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
462 << "the pipe can be sent after it's read or written. This message pipe " | 480 << "the pipe can be sent after it's read or written. This message pipe " |
463 << "was previously bound at:\n" | 481 << "was previously bound at:\n" |
(...skipping 396 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
860 ScopedPlatformHandleVectorPtr platform_handles) { | 878 ScopedPlatformHandleVectorPtr platform_handles) { |
861 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 879 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
862 if (message_view.transport_data_buffer_size() > 0) { | 880 if (message_view.transport_data_buffer_size() > 0) { |
863 DCHECK(message_view.transport_data_buffer()); | 881 DCHECK(message_view.transport_data_buffer()); |
864 message->SetDispatchers(TransportData::DeserializeDispatchers( | 882 message->SetDispatchers(TransportData::DeserializeDispatchers( |
865 message_view.transport_data_buffer(), | 883 message_view.transport_data_buffer(), |
866 message_view.transport_data_buffer_size(), | 884 message_view.transport_data_buffer_size(), |
867 std::move(platform_handles))); | 885 std::move(platform_handles))); |
868 } | 886 } |
869 | 887 |
| 888 bool call_release = false; |
870 if (started_transport_.Try()) { | 889 if (started_transport_.Try()) { |
871 // we're not in the middle of being sent | 890 // We're not in the middle of being sent. |
872 | 891 |
873 // Can get synchronously called back in Init if there was initial data. | 892 // Can get synchronously called back in Init if there was initial data. |
874 scoped_ptr<base::AutoLock> locker; | 893 scoped_ptr<base::AutoLock> locker; |
875 if (!calling_init_) { | 894 if (!calling_init_) |
876 locker.reset(new base::AutoLock(lock())); | 895 locker.reset(new base::AutoLock(lock())); |
| 896 |
| 897 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
| 898 if (transferable_) { |
| 899 channel_->Shutdown(); |
| 900 } else { |
| 901 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| 902 non_transferable_state_ = CLOSED; |
| 903 } |
| 904 channel_ = nullptr; |
| 905 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 906 if (close_requested_) { |
| 907 // We requested the other side to close the connection while they also |
| 908 // did the same. We must balance out the AddRef in CloseOnIO to ensure |
| 909 // this object isn't leaked. |
| 910 call_release = true; |
| 911 } |
| 912 } else { |
| 913 bool was_empty = message_queue_.IsEmpty(); |
| 914 message_queue_.AddMessage(std::move(message)); |
| 915 if (was_empty) |
| 916 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
877 } | 917 } |
878 | 918 |
879 bool was_empty = message_queue_.IsEmpty(); | |
880 message_queue_.AddMessage(std::move(message)); | |
881 if (was_empty) | |
882 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
883 | |
884 started_transport_.Release(); | 919 started_transport_.Release(); |
885 } else { | 920 } else { |
886 // If RawChannel is calling OnRead, that means it has its read_lock_ | 921 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
887 // acquired. That means StartSerialize can't be accessing message queue as | 922 // We got a request to shutdown the channel but this object is already |
888 // it waits on ReleaseHandle first which acquires readlock_. | 923 // calling into channel to serialize it. Since all the other side cares |
889 message_queue_.AddMessage(std::move(message)); | 924 // about is flushing pending messages, we bounce the quit back to it. |
| 925 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 926 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
| 927 channel_->WriteMessage(std::move(message)); |
| 928 } else { |
| 929 // If RawChannel is calling OnRead, that means it has its read_lock_ |
| 930 // acquired. That means StartSerialize can't be accessing message queue as |
| 931 // it waits on ReleaseHandle first which acquires read_lock_. |
| 932 message_queue_.AddMessage(std::move(message)); |
| 933 } |
890 } | 934 } |
| 935 |
| 936 if (call_release) |
| 937 Release(); |
891 } | 938 } |
892 | 939 |
893 void MessagePipeDispatcher::OnError(Error error) { | 940 void MessagePipeDispatcher::OnError(Error error) { |
894 // If there's a read error, then the other side of the pipe is closed. By | 941 // If there's a read error, then the other side of the pipe is closed. By |
895 // definition, we can't write since there's no one to read it. And we can't | 942 // definition, we can't write since there's no one to read it. And we can't |
896 // read anymore, since we just got a read erorr. So we close the pipe. | 943 // read anymore, since we just got a read erorr. So we close the pipe. |
897 // If there's a write error, then we stop writing. But we keep the pipe open | 944 // If there's a write error, then we stop writing. But we keep the pipe open |
898 // until we finish reading everything in it. This is because it's valid for | 945 // until we finish reading everything in it. This is because it's valid for |
899 // one endpoint to write some data and close their pipe immediately. Even | 946 // one endpoint to write some data and close their pipe immediately. Even |
900 // though the other end can't write anymore, it should still get all the data. | 947 // though the other end can't write anymore, it should still get all the data. |
(...skipping 15 matching lines...) Expand all Loading... |
916 break; | 963 break; |
917 case ERROR_WRITE: | 964 case ERROR_WRITE: |
918 // Write errors are slightly notable: they probably shouldn't happen under | 965 // Write errors are slightly notable: they probably shouldn't happen under |
919 // normal operation (but maybe the other side crashed). | 966 // normal operation (but maybe the other side crashed). |
920 LOG(WARNING) << "MessagePipeDispatcher write error"; | 967 LOG(WARNING) << "MessagePipeDispatcher write error"; |
921 DCHECK_EQ(write_error_, false) << "Should only get one write error."; | 968 DCHECK_EQ(write_error_, false) << "Should only get one write error."; |
922 write_error_ = true; | 969 write_error_ = true; |
923 break; | 970 break; |
924 } | 971 } |
925 | 972 |
| 973 bool call_release = false; |
926 if (started_transport_.Try()) { | 974 if (started_transport_.Try()) { |
927 base::AutoLock locker(lock()); | 975 base::AutoLock locker(lock()); |
928 // We can get two OnError callbacks before the post task below completes. | |
929 // Although RawChannel still has a pointer to this object until Shutdown is | |
930 // called, that is safe since this class always does a PostTask to the IO | |
931 // thread to self destruct. | |
932 if (channel_ && error != ERROR_WRITE) { | 976 if (channel_ && error != ERROR_WRITE) { |
933 if (transferable_) { | 977 if (transferable_) { |
934 channel_->Shutdown(); | 978 channel_->Shutdown(); |
935 } else { | 979 } else { |
936 CHECK_NE(non_transferable_state_, CLOSED); | 980 CHECK_NE(non_transferable_state_, CLOSED); |
937 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 981 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
938 non_transferable_state_ = CLOSED; | 982 non_transferable_state_ = CLOSED; |
939 } | 983 } |
940 channel_ = nullptr; | 984 channel_ = nullptr; |
| 985 if (close_requested_) { |
| 986 // Balance AddRef in CloseOnIO. |
| 987 call_release = true; |
| 988 } |
941 } | 989 } |
942 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 990 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
943 started_transport_.Release(); | 991 started_transport_.Release(); |
944 } else { | 992 } else { |
945 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 993 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
946 } | 994 } |
| 995 |
| 996 if (call_release) |
| 997 Release(); |
947 } | 998 } |
948 | 999 |
949 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 1000 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
950 MessageInTransit* message, | 1001 MessageInTransit* message, |
951 std::vector<DispatcherTransport>* transports) { | 1002 std::vector<DispatcherTransport>* transports) { |
952 DCHECK(!message->has_dispatchers()); | 1003 DCHECK(!message->has_dispatchers()); |
953 | 1004 |
954 // You're not allowed to send either handle to a message pipe over the message | 1005 // You're not allowed to send either handle to a message pipe over the message |
955 // pipe, so check for this. (The case of trying to write a handle to itself is | 1006 // pipe, so check for this. (The case of trying to write a handle to itself is |
956 // taken care of by |Core|. That case kind of makes sense, but leads to | 1007 // taken care of by |Core|. That case kind of makes sense, but leads to |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1005 // PostTask since the broker can call us back synchronously. | 1056 // PostTask since the broker can call us back synchronously. |
1006 internal::g_io_thread_task_runner->PostTask( | 1057 internal::g_io_thread_task_runner->PostTask( |
1007 FROM_HERE, | 1058 FROM_HERE, |
1008 base::Bind(&Broker::ConnectMessagePipe, | 1059 base::Bind(&Broker::ConnectMessagePipe, |
1009 base::Unretained(internal::g_broker), pipe_id_, | 1060 base::Unretained(internal::g_broker), pipe_id_, |
1010 base::Unretained(this))); | 1061 base::Unretained(this))); |
1011 } | 1062 } |
1012 | 1063 |
1013 } // namespace edk | 1064 } // namespace edk |
1014 } // namespace mojo | 1065 } // namespace mojo |
OLD | NEW |