OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
150 | 150 |
151 void MessagePipeDispatcher::InitOnIO() { | 151 void MessagePipeDispatcher::InitOnIO() { |
152 base::AutoLock locker(lock()); | 152 base::AutoLock locker(lock()); |
153 CHECK(transferable_); | 153 CHECK(transferable_); |
154 calling_init_ = true; | 154 calling_init_ = true; |
155 if (channel_) | 155 if (channel_) |
156 channel_->Init(this); | 156 channel_->Init(this); |
157 calling_init_ = false; | 157 calling_init_ = false; |
158 } | 158 } |
159 | 159 |
160 void MessagePipeDispatcher::CloseOnIO() { | 160 void MessagePipeDispatcher::ReleaseAndCloseOnIO() { |
161 base::AutoLock locker(lock()); | 161 base::AutoLock locker(lock()); |
162 Release(); // To match CloseImplNoLock. | 162 Release(); // To match CloseImplNoLock. |
163 if (transferable_) { | 163 CloseOnIO(); |
164 if (channel_) { | 164 } |
165 channel_->Shutdown(); | |
166 channel_ = nullptr; | |
167 } | |
168 } else { | |
169 if (non_transferable_state_ == CONNECT_CALLED || | |
170 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { | |
171 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
172 RequestNontransferableChannel(); | |
173 | 165 |
174 // We can't cancel the pending request yet, since the other side of the | 166 void MessagePipeDispatcher::CloseOnIO() { |
175 // message pipe would want to get pending outgoing messages (if any) or | 167 lock().AssertAcquired(); |
176 // at least know that this end was closed. So keep this object alive until | 168 |
177 // then. | 169 if (channel_) { |
178 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | 170 // If we closed the channel now then in-flight message pipes wouldn't get |
179 AddRef(); | 171 // closed, and their other side wouldn't get a connection error notification |
180 } else if (non_transferable_state_ == CONNECTED) { | 172 // which could lead to hangs or leaks. So we ask the other side of this |
181 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 173 // message pipe to close, which ensures that we have dispatched all |
182 non_transferable_state_ = CLOSED; | 174 // in-flight message pipes. |
183 channel_ = nullptr; | 175 DCHECK(!close_requested_); |
184 } | 176 close_requested_ = true; |
177 AddRef(); | |
178 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
179 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); | |
180 if (!transferable_) | |
181 message->set_route_id(pipe_id_); | |
182 channel_->WriteMessage(std::move(message)); | |
183 return; | |
184 } | |
185 | |
186 if (!transferable_ && | |
187 (non_transferable_state_ == CONNECT_CALLED || | |
188 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { | |
189 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | |
190 RequestNontransferableChannel(); | |
191 | |
192 // We can't cancel the pending request yet, since the other side of the | |
193 // message pipe would want to get pending outgoing messages (if any) or at | |
194 // least know that this end was closed. So keep this object alive until | |
195 // then. | |
196 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; | |
197 AddRef(); | |
185 } | 198 } |
186 } | 199 } |
187 | 200 |
188 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 201 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
189 return Type::MESSAGE_PIPE; | 202 return Type::MESSAGE_PIPE; |
190 } | 203 } |
191 | 204 |
192 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { | 205 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
206 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
193 base::AutoLock locker(lock()); | 207 base::AutoLock locker(lock()); |
194 channel_ = channel; | 208 channel_ = channel; |
195 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { | 209 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
196 channel_->WriteMessage( | 210 channel_->WriteMessage( |
197 non_transferable_outgoing_message_queue_.GetMessage()); | 211 non_transferable_outgoing_message_queue_.GetMessage()); |
198 } | 212 } |
199 | 213 |
200 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { | 214 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
201 // We kept this object alive until it's connected, we can release it now. | |
202 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
203 non_transferable_state_ = CLOSED; | |
204 channel_ = nullptr; | |
205 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); | |
206 } else { | |
207 non_transferable_state_ = CONNECTED; | 215 non_transferable_state_ = CONNECTED; |
216 // We kept this object alive until it's connected, we can close it now. | |
217 CloseOnIO(); | |
218 // Balance the AddRef in CloseOnIO. | |
219 Release(); | |
220 return; | |
208 } | 221 } |
222 | |
223 non_transferable_state_ = CONNECTED; | |
209 } | 224 } |
210 | 225 |
211 #if defined(OS_WIN) | 226 #if defined(OS_WIN) |
212 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | 227 // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
213 // best way we want to share this. | 228 // best way we want to share this. |
214 // Since this is used for serialization of messages read/written to a MP that | 229 // Since this is used for serialization of messages read/written to a MP that |
215 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | 230 // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
216 // them when a MP is being sent. As a result, even for POSIX we will probably | 231 // them when a MP is being sent. As a result, even for POSIX we will probably |
217 // want to send the handles to the shell process and exchange them for tokens | 232 // want to send the handles to the shell process and exchange them for tokens |
218 // (since we can be sure that the shell will respond to our IPCs, compared to | 233 // (since we can be sure that the shell will respond to our IPCs, compared to |
(...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
401 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) | 416 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
402 : channel_(nullptr), | 417 : channel_(nullptr), |
403 serialized_read_fds_length_(0u), | 418 serialized_read_fds_length_(0u), |
404 serialized_write_fds_length_(0u), | 419 serialized_write_fds_length_(0u), |
405 serialized_message_fds_length_(0u), | 420 serialized_message_fds_length_(0u), |
406 pipe_id_(0), | 421 pipe_id_(0), |
407 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), | 422 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
408 serialized_(false), | 423 serialized_(false), |
409 calling_init_(false), | 424 calling_init_(false), |
410 write_error_(false), | 425 write_error_(false), |
411 transferable_(transferable) { | 426 transferable_(transferable), |
427 close_requested_(false) { | |
412 } | 428 } |
413 | 429 |
414 MessagePipeDispatcher::~MessagePipeDispatcher() { | 430 MessagePipeDispatcher::~MessagePipeDispatcher() { |
415 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The | 431 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
416 // exception is if they posted a task to run CloseOnIO but the IO thread shut | 432 // exception is if they posted a task to run CloseOnIO but the IO thread shut |
417 // down and so when it was deleting pending tasks it caused the last reference | 433 // down and so when it was deleting pending tasks it caused the last reference |
418 // to destruct this object. In that case, safe to destroy the channel. | 434 // to destruct this object. In that case, safe to destroy the channel. |
419 if (channel_ && | 435 if (channel_ && |
420 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { | 436 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { |
421 if (transferable_) { | 437 if (transferable_) { |
(...skipping 21 matching lines...) Expand all Loading... | |
443 if (!transferable_ && non_transferable_state_ == CLOSED) | 459 if (!transferable_ && non_transferable_state_ == CLOSED) |
444 return; | 460 return; |
445 | 461 |
446 // We take a manual refcount because at shutdown, the task below might not get | 462 // We take a manual refcount because at shutdown, the task below might not get |
447 // a chance to execute. If that happens, the RawChannel will still call our | 463 // a chance to execute. If that happens, the RawChannel will still call our |
448 // OnError method because it always runs (since it watches thread | 464 // OnError method because it always runs (since it watches thread |
449 // destruction). So to avoid UAF, manually add a reference and only release it | 465 // destruction). So to avoid UAF, manually add a reference and only release it |
450 // if the task runs. | 466 // if the task runs. |
451 AddRef(); | 467 AddRef(); |
452 internal::g_io_thread_task_runner->PostTask( | 468 internal::g_io_thread_task_runner->PostTask( |
453 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | 469 FROM_HERE, base::Bind(&MessagePipeDispatcher::ReleaseAndCloseOnIO, this)); |
454 } | 470 } |
455 | 471 |
456 void MessagePipeDispatcher::SerializeInternal() { | 472 void MessagePipeDispatcher::SerializeInternal() { |
457 serialized_ = true; | 473 serialized_ = true; |
458 if (!transferable_) { | 474 if (!transferable_) { |
459 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) | 475 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
460 << "Non transferable message pipe being sent after read/write/waited. " | 476 << "Non transferable message pipe being sent after read/write/waited. " |
461 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " | 477 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
462 << "the pipe can be sent after it's read or written. This message pipe " | 478 << "the pipe can be sent after it's read or written. This message pipe " |
463 << "was previously bound at:\n" | 479 << "was previously bound at:\n" |
(...skipping 397 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
861 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | 877 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
862 if (message_view.transport_data_buffer_size() > 0) { | 878 if (message_view.transport_data_buffer_size() > 0) { |
863 DCHECK(message_view.transport_data_buffer()); | 879 DCHECK(message_view.transport_data_buffer()); |
864 message->SetDispatchers(TransportData::DeserializeDispatchers( | 880 message->SetDispatchers(TransportData::DeserializeDispatchers( |
865 message_view.transport_data_buffer(), | 881 message_view.transport_data_buffer(), |
866 message_view.transport_data_buffer_size(), | 882 message_view.transport_data_buffer_size(), |
867 std::move(platform_handles))); | 883 std::move(platform_handles))); |
868 } | 884 } |
869 | 885 |
870 if (started_transport_.Try()) { | 886 if (started_transport_.Try()) { |
871 // we're not in the middle of being sent | 887 // We're not in the middle of being sent. |
872 | 888 |
873 // Can get synchronously called back in Init if there was initial data. | 889 // Can get synchronously called back in Init if there was initial data. |
874 scoped_ptr<base::AutoLock> locker; | 890 scoped_ptr<base::AutoLock> locker; |
875 if (!calling_init_) { | 891 if (!calling_init_) |
876 locker.reset(new base::AutoLock(lock())); | 892 locker.reset(new base::AutoLock(lock())); |
893 | |
894 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { | |
895 if (transferable_) { | |
896 channel_->Shutdown(); | |
897 } else { | |
898 internal::g_broker->CloseMessagePipe(pipe_id_, this); | |
899 non_transferable_state_ = CLOSED; | |
900 } | |
901 channel_ = nullptr; | |
902 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
903 if (close_requested_) { | |
904 // We requested the other side to close the connection while they also | |
905 // did the same. We must balance out the AddRef in CloseOnIO to ensure | |
906 // this object isn't leaked. | |
907 Release(); | |
jam
2015/12/30 17:03:40
(unlike the other two changes from patchset 1, whi
sky
2015/12/30 17:32:29
If the Release() here deletes this, won't 916 resu
jam
2015/12/30 17:41:27
yep. fixed this along with dchecks that i was hitt
| |
908 } | |
909 } else { | |
910 bool was_empty = message_queue_.IsEmpty(); | |
911 message_queue_.AddMessage(std::move(message)); | |
912 if (was_empty) | |
913 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
877 } | 914 } |
878 | 915 |
879 bool was_empty = message_queue_.IsEmpty(); | |
880 message_queue_.AddMessage(std::move(message)); | |
881 if (was_empty) | |
882 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
883 | |
884 started_transport_.Release(); | 916 started_transport_.Release(); |
885 } else { | 917 } else { |
886 // If RawChannel is calling OnRead, that means it has its read_lock_ | 918 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
887 // acquired. That means StartSerialize can't be accessing message queue as | 919 // We got a request to shutdown the channel but this object is already |
888 // it waits on ReleaseHandle first which acquires readlock_. | 920 // calling into channel to serialize it. Since all the other side cares |
889 message_queue_.AddMessage(std::move(message)); | 921 // about is flushing pending messages, we bounce the quit back to it. |
922 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
923 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); | |
924 channel_->WriteMessage(std::move(message)); | |
925 } else { | |
926 // If RawChannel is calling OnRead, that means it has its read_lock_ | |
927 // acquired. That means StartSerialize can't be accessing message queue as | |
928 // it waits on ReleaseHandle first which acquires read_lock_. | |
929 message_queue_.AddMessage(std::move(message)); | |
930 } | |
890 } | 931 } |
891 } | 932 } |
892 | 933 |
893 void MessagePipeDispatcher::OnError(Error error) { | 934 void MessagePipeDispatcher::OnError(Error error) { |
894 // If there's a read error, then the other side of the pipe is closed. By | 935 // If there's a read error, then the other side of the pipe is closed. By |
895 // definition, we can't write since there's no one to read it. And we can't | 936 // definition, we can't write since there's no one to read it. And we can't |
896 // read anymore, since we just got a read erorr. So we close the pipe. | 937 // read anymore, since we just got a read erorr. So we close the pipe. |
897 // If there's a write error, then we stop writing. But we keep the pipe open | 938 // If there's a write error, then we stop writing. But we keep the pipe open |
898 // until we finish reading everything in it. This is because it's valid for | 939 // until we finish reading everything in it. This is because it's valid for |
899 // one endpoint to write some data and close their pipe immediately. Even | 940 // one endpoint to write some data and close their pipe immediately. Even |
(...skipping 18 matching lines...) Expand all Loading... | |
918 // Write errors are slightly notable: they probably shouldn't happen under | 959 // Write errors are slightly notable: they probably shouldn't happen under |
919 // normal operation (but maybe the other side crashed). | 960 // normal operation (but maybe the other side crashed). |
920 LOG(WARNING) << "MessagePipeDispatcher write error"; | 961 LOG(WARNING) << "MessagePipeDispatcher write error"; |
921 DCHECK_EQ(write_error_, false) << "Should only get one write error."; | 962 DCHECK_EQ(write_error_, false) << "Should only get one write error."; |
922 write_error_ = true; | 963 write_error_ = true; |
923 break; | 964 break; |
924 } | 965 } |
925 | 966 |
926 if (started_transport_.Try()) { | 967 if (started_transport_.Try()) { |
927 base::AutoLock locker(lock()); | 968 base::AutoLock locker(lock()); |
928 // We can get two OnError callbacks before the post task below completes. | 969 bool call_release = false; |
929 // Although RawChannel still has a pointer to this object until Shutdown is | |
930 // called, that is safe since this class always does a PostTask to the IO | |
931 // thread to self destruct. | |
932 if (channel_ && error != ERROR_WRITE) { | 970 if (channel_ && error != ERROR_WRITE) { |
933 if (transferable_) { | 971 if (transferable_) { |
934 channel_->Shutdown(); | 972 channel_->Shutdown(); |
935 } else { | 973 } else { |
936 CHECK_NE(non_transferable_state_, CLOSED); | 974 CHECK_NE(non_transferable_state_, CLOSED); |
937 internal::g_broker->CloseMessagePipe(pipe_id_, this); | 975 internal::g_broker->CloseMessagePipe(pipe_id_, this); |
938 non_transferable_state_ = CLOSED; | 976 non_transferable_state_ = CLOSED; |
939 } | 977 } |
940 channel_ = nullptr; | 978 channel_ = nullptr; |
979 if (close_requested_) { | |
980 // Balance AddRef in CloseOnIO. | |
981 call_release = true; | |
982 } | |
941 } | 983 } |
942 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 984 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
943 started_transport_.Release(); | 985 started_transport_.Release(); |
986 if (call_release) | |
987 Release(); | |
944 } else { | 988 } else { |
945 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 989 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
946 } | 990 } |
947 } | 991 } |
948 | 992 |
949 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | 993 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
950 MessageInTransit* message, | 994 MessageInTransit* message, |
951 std::vector<DispatcherTransport>* transports) { | 995 std::vector<DispatcherTransport>* transports) { |
952 DCHECK(!message->has_dispatchers()); | 996 DCHECK(!message->has_dispatchers()); |
953 | 997 |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1005 // PostTask since the broker can call us back synchronously. | 1049 // PostTask since the broker can call us back synchronously. |
1006 internal::g_io_thread_task_runner->PostTask( | 1050 internal::g_io_thread_task_runner->PostTask( |
1007 FROM_HERE, | 1051 FROM_HERE, |
1008 base::Bind(&Broker::ConnectMessagePipe, | 1052 base::Bind(&Broker::ConnectMessagePipe, |
1009 base::Unretained(internal::g_broker), pipe_id_, | 1053 base::Unretained(internal::g_broker), pipe_id_, |
1010 base::Unretained(this))); | 1054 base::Unretained(this))); |
1011 } | 1055 } |
1012 | 1056 |
1013 } // namespace edk | 1057 } // namespace edk |
1014 } // namespace mojo | 1058 } // namespace mojo |
OLD | NEW |