OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
150 | 150 |
151 void MessagePipeDispatcher::InitOnIO() { | 151 void MessagePipeDispatcher::InitOnIO() { |
152 base::AutoLock locker(lock()); | 152 base::AutoLock locker(lock()); |
153 CHECK(transferable_); | 153 CHECK(transferable_); |
154 calling_init_ = true; | 154 calling_init_ = true; |
155 if (channel_) | 155 if (channel_) |
156 channel_->Init(this); | 156 channel_->Init(this); |
157 calling_init_ = false; | 157 calling_init_ = false; |
158 } | 158 } |
159 | 159 |
160 void MessagePipeDispatcher::CloseOnIO() { | 160 void MessagePipeDispatcher::ReleaseAndCloseOnIO() { |
161 base::AutoLock locker(lock()); | 161 base::AutoLock locker(lock()); |
162 Release(); // To match CloseImplNoLock. | 162 Release(); // To match CloseImplNoLock. |
163 if (transferable_) { | 163 CloseOnIO(); |
164 if (channel_) { | 164 } |
165 channel_->Shutdown(); | |
166 channel_ = nullptr; | |
167 } | |
168 } else { | |
169 if (non_transferable_state_ == CONNECT_CALLED || | |
170 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
171 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
172 RequestNontransferableChannel(); | |
173 | 165 |
174 // We can't cancel the pending request yet, since the other side of the | 166 void MessagePipeDispatcher::CloseOnIO() { |
175 // message pipe would want to get pending outgoing messages (if any) or | 167 lock().AssertAcquired(); |
176 // at least know that this end was closed. So keep this object alive until | 168 |
177 // then. | 169 if (channel_) { |
178 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | 170 DCHECK(!close_requested_); |
179 AddRef(); | 171 close_requested_ = true; |
180 } else if (non_transferable_state_ == CONNECTED) { | 172 AddRef(); |
181 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 173 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
182 non_transferable_state_ = CLOSED; | 174 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
183 channel_ = nullptr; | 175 if (!transferable_) |
184 } | 176 message->set_route_id(pipe_id_); |
177 channel_->WriteMessage(std::move(message)); | |
178 return; | |
179 } | |
180 | |
181 if (!transferable_ && | |
182 (non_transferable_state_ == CONNECT_CALLED || | |
183 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { | |
184 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
185 RequestNontransferableChannel(); | |
186 | |
187 // We can't cancel the pending request yet, since the other side of the | |
188 // message pipe would want to get pending outgoing messages (if any) or at | |
189 // least know that this end was closed. So keep this object alive until | |
190 // then. | |
191 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | |
192 AddRef(); | |
185 } | 193 } |
186 } | 194 } |
187 | 195 |
188 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 196 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
189 return Type::MESSAGE_PIPE; | 197 return Type::MESSAGE_PIPE; |
190 } | 198 } |
191 | 199 |
192 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { | 200 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
201 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
193 base::AutoLock locker(lock()); | 202 base::AutoLock locker(lock()); |
194 channel_ = channel; | 203 channel_ = channel; |
195 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { | 204 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
196 channel_->WriteMessage( | 205 channel_->WriteMessage( |
197 non_transferable_outgoing_message_queue_.GetMessage()); | 206 non_transferable_outgoing_message_queue_.GetMessage()); |
198 } | 207 } |
199 | 208 |
200 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { | 209 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
201 // We kept this object alive until it's connected, we can release it now. | 210 non_transferable_state_ = CONNECTED; |
202 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 211 // We kept this object alive until it's connected, we can close it now. |
203 non_transferable_state_ = CLOSED; | 212 CloseOnIO(); |
204 channel_ = nullptr; | 213 // Balance the AddRef in CloseOnIO. |
205 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | 214 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
206 } else { | 215 return; |
207 non_transferable_state_ = CONNECTED; | |
208 } | 216 } |
217 | |
218 non_transferable_state_ = CONNECTED; | |
209 } | 219 } |
210 | 220 |
211 #if defined(OS_WIN) | 221 #if defined(OS_WIN) |
212 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 222 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
213 // best way we want to share this. | 223 // best way we want to share this. |
214 // Since this is used for serialization of messages read/written to a MP that | 224 // Since this is used for serialization of messages read/written to a MP that |
215 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 225 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
216 // them when a MP is being sent. As a result, even for POSIX we will probably | 226 // them when a MP is being sent. As a result, even for POSIX we will probably |
217 // want to send the handles to the shell process and exchange them for tokens | 227 // want to send the handles to the shell process and exchange them for tokens |
218 // (since we can be sure that the shell will respond to our IPCs, compared to | 228 // (since we can be sure that the shell will respond to our IPCs, compared to |
(...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
401 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) | 411 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
402 : channel_(nullptr), | 412 : channel_(nullptr), |
403 serialized_read_fds_length_(0u), | 413 serialized_read_fds_length_(0u), |
404 serialized_write_fds_length_(0u), | 414 serialized_write_fds_length_(0u), |
405 serialized_message_fds_length_(0u), | 415 serialized_message_fds_length_(0u), |
406 pipe_id_(0), | 416 pipe_id_(0), |
407 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), | 417 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
408 serialized_(false), | 418 serialized_(false), |
409 calling_init_(false), | 419 calling_init_(false), |
410 write_error_(false), | 420 write_error_(false), |
411 transferable_(transferable) { | 421 transferable_(transferable), |
422 close_requested_(false) { | |
412 } | 423 } |
413 | 424 |
414 MessagePipeDispatcher::~MessagePipeDispatcher() { | 425 MessagePipeDispatcher::~MessagePipeDispatcher() { |
415 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 426 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
416 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 427 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
417 // down and so when it was deleting pending tasks it caused the last reference | 428 // down and so when it was deleting pending tasks it caused the last reference |
418 // to destruct this object. In that case, safe to destroy the channel. | 429 // to destruct this object. In that case, safe to destroy the channel. |
419 if (channel_ && | 430 if (channel_ && |
420 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { | 431 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { |
421 if (transferable_) { | 432 if (transferable_) { |
(...skipping 21 matching lines...) Expand all Loading... | |
443 if (!transferable_ && non_transferable_state_ == CLOSED) | 454 if (!transferable_ && non_transferable_state_ == CLOSED) |
444 return; | 455 return; |
445 | 456 |
446 // We take a manual refcount because at shutdown, the task below might not get | 457 // We take a manual refcount because at shutdown, the task below might not get |
447 // a chance to execute. If that happens, the RawChannel will still call our | 458 // a chance to execute. If that happens, the RawChannel will still call our |
448 // OnError method because it always runs (since it watches thread | 459 // OnError method because it always runs (since it watches thread |
449 // destruction). So to avoid UAF, manually add a reference and only release it | 460 // destruction). So to avoid UAF, manually add a reference and only release it |
450 // if the task runs. | 461 // if the task runs. |
451 AddRef(); | 462 AddRef(); |
452 internal::g_io_thread_task_runner->PostTask( | 463 internal::g_io_thread_task_runner->PostTask( |
453 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 464 FROM_HERE, base::Bind(&MessagePipeDispatcher::ReleaseAndCloseOnIO, this)); |
454 } | 465 } |
455 | 466 |
456 void MessagePipeDispatcher::SerializeInternal() { | 467 void MessagePipeDispatcher::SerializeInternal() { |
457 serialized_ = true; | 468 serialized_ = true; |
458 if (!transferable_) { | 469 if (!transferable_) { |
459 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | 470 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
460 << "Non transferable message pipe being sent after read/write/waited. " | 471 << "Non transferable message pipe being sent after read/write/waited. " |
461 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " | 472 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
462 << "the pipe can be sent after it's read or written. This message pipe " | 473 << "the pipe can be sent after it's read or written. This message pipe " |
463 << "was previously bound at:\n" | 474 << "was previously bound at:\n" |
(...skipping 397 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
861 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 872 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
862 if (message_view.transport_data_buffer_size() > 0) { | 873 if (message_view.transport_data_buffer_size() > 0) { |
863 DCHECK(message_view.transport_data_buffer()); | 874 DCHECK(message_view.transport_data_buffer()); |
864 message->SetDispatchers(TransportData::DeserializeDispatchers( | 875 message->SetDispatchers(TransportData::DeserializeDispatchers( |
865 message_view.transport_data_buffer(), | 876 message_view.transport_data_buffer(), |
866 message_view.transport_data_buffer_size(), | 877 message_view.transport_data_buffer_size(), |
867 std::move(platform_handles))); | 878 std::move(platform_handles))); |
868 } | 879 } |
869 | 880 |
870 if (started_transport_.Try()) { | 881 if (started_transport_.Try()) { |
871 // we're not in the middle of being sent | 882 // We're not in the middle of being sent. |
872 | 883 |
873 // Can get synchronously called back in Init if there was initial data. | 884 // Can get synchronously called back in Init if there was initial data. |
874 scoped_ptr<base::AutoLock> locker; | 885 scoped_ptr<base::AutoLock> locker; |
875 if (!calling_init_) { | 886 if (!calling_init_) |
876 locker.reset(new base::AutoLock(lock())); | 887 locker.reset(new base::AutoLock(lock())); |
888 | |
889 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { | |
890 if (transferable_) { | |
891 channel_->Shutdown(); | |
892 } else { | |
893 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
894 non_transferable_state_ = CLOSED; | |
895 } | |
896 channel_ = nullptr; | |
897 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
898 } else { | |
899 bool was_empty = message_queue_.IsEmpty(); | |
900 message_queue_.AddMessage(std::move(message)); | |
901 if (was_empty) | |
902 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
877 } | 903 } |
878 | 904 |
879 bool was_empty = message_queue_.IsEmpty(); | |
880 message_queue_.AddMessage(std::move(message)); | |
881 if (was_empty) | |
882 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
883 | |
884 started_transport_.Release(); | 905 started_transport_.Release(); |
885 } else { | 906 } else { |
886 // If RawChannel is calling OnRead, that means it has its read_lock_ | 907 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
887 // acquired. That means StartSerialize can't be accessing message queue as | 908 // We got a request to shutdown the channel but this object is already |
888 // it waits on ReleaseHandle first which acquires readlock_. | 909 // calling into channel to serialize it. Since all the other side cares |
889 message_queue_.AddMessage(std::move(message)); | 910 // about is flushing pending messages, we bounce the quit back to it. |
911 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
912 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); | |
913 channel_->WriteMessage(std::move(message)); | |
914 } else { | |
915 // If RawChannel is calling OnRead, that means it has its read_lock_ | |
916 // acquired. That means StartSerialize can't be accessing message queue as | |
917 // it waits on ReleaseHandle first which acquires read_lock_. | |
918 message_queue_.AddMessage(std::move(message)); | |
919 } | |
890 } | 920 } |
891 } | 921 } |
892 | 922 |
893 void MessagePipeDispatcher::OnError(Error error) { | 923 void MessagePipeDispatcher::OnError(Error error) { |
894 // If there's a read error, then the other side of the pipe is closed. By | 924 // If there's a read error, then the other side of the pipe is closed. By |
895 // definition, we can't write since there's no one to read it. And we can't | 925 // definition, we can't write since there's no one to read it. And we can't |
896 // read anymore, since we just got a read erorr. So we close the pipe. | 926 // read anymore, since we just got a read erorr. So we close the pipe. |
897 // If there's a write error, then we stop writing. But we keep the pipe open | 927 // If there's a write error, then we stop writing. But we keep the pipe open |
898 // until we finish reading everything in it. This is because it's valid for | 928 // until we finish reading everything in it. This is because it's valid for |
899 // one endpoint to write some data and close their pipe immediately. Even | 929 // one endpoint to write some data and close their pipe immediately. Even |
(...skipping 18 matching lines...) Expand all Loading... | |
918 // Write errors are slightly notable: they probably shouldn't happen under | 948 // Write errors are slightly notable: they probably shouldn't happen under |
919 // normal operation (but maybe the other side crashed). | 949 // normal operation (but maybe the other side crashed). |
920 LOG(WARNING) << "MessagePipeDispatcher write error"; | 950 LOG(WARNING) << "MessagePipeDispatcher write error"; |
921 DCHECK_EQ(write_error_, false) << "Should only get one write error."; | 951 DCHECK_EQ(write_error_, false) << "Should only get one write error."; |
922 write_error_ = true; | 952 write_error_ = true; |
923 break; | 953 break; |
924 } | 954 } |
925 | 955 |
926 if (started_transport_.Try()) { | 956 if (started_transport_.Try()) { |
927 base::AutoLock locker(lock()); | 957 base::AutoLock locker(lock()); |
928 // We can get two OnError callbacks before the post task below completes. | |
929 // Although RawChannel still has a pointer to this object until Shutdown is | |
930 // called, that is safe since this class always does a PostTask to the IO | |
931 // thread to self destruct. | |
932 if (channel_ && error != ERROR_WRITE) { | 958 if (channel_ && error != ERROR_WRITE) { |
933 if (transferable_) { | 959 if (transferable_) { |
934 channel_->Shutdown(); | 960 channel_->Shutdown(); |
935 } else { | 961 } else { |
936 CHECK_NE(non_transferable_state_, CLOSED); | 962 CHECK_NE(non_transferable_state_, CLOSED); |
937 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 963 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
938 non_transferable_state_ = CLOSED; | 964 non_transferable_state_ = CLOSED; |
939 } | 965 } |
940 channel_ = nullptr; | 966 channel_ = nullptr; |
967 if (close_requested_) { | |
968 // Balance AddRef in CloseOnIO. | |
969 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | |
sky
2015/12/30 16:33:32
Is there a reason you can't Release() rather than
jam
2015/12/30 17:03:40
This might be the last reference, so I was just si
| |
970 } | |
941 } | 971 } |
942 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 972 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
943 started_transport_.Release(); | 973 started_transport_.Release(); |
944 } else { | 974 } else { |
945 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 975 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
946 } | 976 } |
947 } | 977 } |
948 | 978 |
949 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 979 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
950 MessageInTransit* message, | 980 MessageInTransit* message, |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1005 // PostTask since the broker can call us back synchronously. | 1035 // PostTask since the broker can call us back synchronously. |
1006 internal::g_io_thread_task_runner->PostTask( | 1036 internal::g_io_thread_task_runner->PostTask( |
1007 FROM_HERE, | 1037 FROM_HERE, |
1008 base::Bind(&Broker::ConnectMessagePipe, | 1038 base::Bind(&Broker::ConnectMessagePipe, |
1009 base::Unretained(internal::g_broker), pipe_id_, | 1039 base::Unretained(internal::g_broker), pipe_id_, |
1010 base::Unretained(this))); | 1040 base::Unretained(this))); |
1011 } | 1041 } |
1012 | 1042 |
1013 } // namespace edk | 1043 } // namespace edk |
1014 } // namespace mojo | 1044 } // namespace mojo |
OLD | NEW |