Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(160)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1554623005: Ensure that in-flight message pipes are always closed and the other end is notified. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix leaks Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/raw_channel.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 150
151 void MessagePipeDispatcher::InitOnIO() { 151 void MessagePipeDispatcher::InitOnIO() {
152 base::AutoLock locker(lock()); 152 base::AutoLock locker(lock());
153 CHECK(transferable_); 153 CHECK(transferable_);
154 calling_init_ = true; 154 calling_init_ = true;
155 if (channel_) 155 if (channel_)
156 channel_->Init(this); 156 channel_->Init(this);
157 calling_init_ = false; 157 calling_init_ = false;
158 } 158 }
159 159
160 void MessagePipeDispatcher::CloseOnIO() { 160 void MessagePipeDispatcher::ReleaseAndCloseOnIO() {
161 base::AutoLock locker(lock()); 161 base::AutoLock locker(lock());
162 Release(); // To match CloseImplNoLock. 162 Release(); // To match CloseImplNoLock.
163 if (transferable_) { 163 CloseOnIO();
164 if (channel_) { 164 }
165 channel_->Shutdown();
166 channel_ = nullptr;
167 }
168 } else {
169 if (non_transferable_state_ == CONNECT_CALLED ||
170 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
171 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
172 RequestNontransferableChannel();
173 165
174 // We can't cancel the pending request yet, since the other side of the 166 void MessagePipeDispatcher::CloseOnIO() {
175 // message pipe would want to get pending outgoing messages (if any) or 167 lock().AssertAcquired();
176 // at least know that this end was closed. So keep this object alive until 168
177 // then. 169 if (channel_) {
178 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; 170 // If we closed the channel now then in-flight message pipes wouldn't get
179 AddRef(); 171 // closed, and their other side wouldn't get a connection error notification
180 } else if (non_transferable_state_ == CONNECTED) { 172 // which could lead to hangs or leaks. So we ask the other side of this
181 internal::g_broker->CloseMessagePipe(pipe_id_, this); 173 // message pipe to close, which ensures that we have dispatched all
182 non_transferable_state_ = CLOSED; 174 // in-flight message pipes.
183 channel_ = nullptr; 175 DCHECK(!close_requested_);
184 } 176 close_requested_ = true;
177 AddRef();
178 scoped_ptr<MessageInTransit> message(new MessageInTransit(
179 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
180 if (!transferable_)
181 message->set_route_id(pipe_id_);
182 channel_->WriteMessage(std::move(message));
183 return;
184 }
185
186 if (!transferable_ &&
187 (non_transferable_state_ == CONNECT_CALLED ||
188 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) {
189 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
190 RequestNontransferableChannel();
191
192 // We can't cancel the pending request yet, since the other side of the
193 // message pipe would want to get pending outgoing messages (if any) or at
194 // least know that this end was closed. So keep this object alive until
195 // then.
196 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
197 AddRef();
185 } 198 }
186 } 199 }
187 200
188 Dispatcher::Type MessagePipeDispatcher::GetType() const { 201 Dispatcher::Type MessagePipeDispatcher::GetType() const {
189 return Type::MESSAGE_PIPE; 202 return Type::MESSAGE_PIPE;
190 } 203 }
191 204
192 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { 205 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
206 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
193 base::AutoLock locker(lock()); 207 base::AutoLock locker(lock());
194 channel_ = channel; 208 channel_ = channel;
195 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { 209 while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
196 channel_->WriteMessage( 210 channel_->WriteMessage(
197 non_transferable_outgoing_message_queue_.GetMessage()); 211 non_transferable_outgoing_message_queue_.GetMessage());
198 } 212 }
199 213
200 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { 214 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
201 // We kept this object alive until it's connected, we can release it now.
202 internal::g_broker->CloseMessagePipe(pipe_id_, this);
203 non_transferable_state_ = CLOSED;
204 channel_ = nullptr;
205 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
206 } else {
207 non_transferable_state_ = CONNECTED; 215 non_transferable_state_ = CONNECTED;
216 // We kept this object alive until it's connected, we can close it now.
217 CloseOnIO();
218 // Balance the AddRef in CloseOnIO.
219 Release();
220 return;
208 } 221 }
222
223 non_transferable_state_ = CONNECTED;
209 } 224 }
210 225
211 #if defined(OS_WIN) 226 #if defined(OS_WIN)
212 // TODO(jam): this is copied from RawChannelWin till I figure out what's the 227 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
213 // best way we want to share this. 228 // best way we want to share this.
214 // Since this is used for serialization of messages read/written to a MP that 229 // Since this is used for serialization of messages read/written to a MP that
215 // aren't consumed by Mojo primitives yet, there could be an unbounded number of 230 // aren't consumed by Mojo primitives yet, there could be an unbounded number of
216 // them when a MP is being sent. As a result, even for POSIX we will probably 231 // them when a MP is being sent. As a result, even for POSIX we will probably
217 // want to send the handles to the shell process and exchange them for tokens 232 // want to send the handles to the shell process and exchange them for tokens
218 // (since we can be sure that the shell will respond to our IPCs, compared to 233 // (since we can be sure that the shell will respond to our IPCs, compared to
(...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after
401 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) 416 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
402 : channel_(nullptr), 417 : channel_(nullptr),
403 serialized_read_fds_length_(0u), 418 serialized_read_fds_length_(0u),
404 serialized_write_fds_length_(0u), 419 serialized_write_fds_length_(0u),
405 serialized_message_fds_length_(0u), 420 serialized_message_fds_length_(0u),
406 pipe_id_(0), 421 pipe_id_(0),
407 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), 422 non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
408 serialized_(false), 423 serialized_(false),
409 calling_init_(false), 424 calling_init_(false),
410 write_error_(false), 425 write_error_(false),
411 transferable_(transferable) { 426 transferable_(transferable),
427 close_requested_(false) {
412 } 428 }
413 429
414 MessagePipeDispatcher::~MessagePipeDispatcher() { 430 MessagePipeDispatcher::~MessagePipeDispatcher() {
415 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The 431 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
416 // exception is if they posted a task to run CloseOnIO but the IO thread shut 432 // exception is if they posted a task to run CloseOnIO but the IO thread shut
417 // down and so when it was deleting pending tasks it caused the last reference 433 // down and so when it was deleting pending tasks it caused the last reference
418 // to destruct this object. In that case, safe to destroy the channel. 434 // to destruct this object. In that case, safe to destroy the channel.
419 if (channel_ && 435 if (channel_ &&
420 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { 436 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) {
421 if (transferable_) { 437 if (transferable_) {
(...skipping 21 matching lines...) Expand all
443 if (!transferable_ && non_transferable_state_ == CLOSED) 459 if (!transferable_ && non_transferable_state_ == CLOSED)
444 return; 460 return;
445 461
446 // We take a manual refcount because at shutdown, the task below might not get 462 // We take a manual refcount because at shutdown, the task below might not get
447 // a chance to execute. If that happens, the RawChannel will still call our 463 // a chance to execute. If that happens, the RawChannel will still call our
448 // OnError method because it always runs (since it watches thread 464 // OnError method because it always runs (since it watches thread
449 // destruction). So to avoid UAF, manually add a reference and only release it 465 // destruction). So to avoid UAF, manually add a reference and only release it
450 // if the task runs. 466 // if the task runs.
451 AddRef(); 467 AddRef();
452 internal::g_io_thread_task_runner->PostTask( 468 internal::g_io_thread_task_runner->PostTask(
453 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); 469 FROM_HERE, base::Bind(&MessagePipeDispatcher::ReleaseAndCloseOnIO, this));
454 } 470 }
455 471
456 void MessagePipeDispatcher::SerializeInternal() { 472 void MessagePipeDispatcher::SerializeInternal() {
457 serialized_ = true; 473 serialized_ = true;
458 if (!transferable_) { 474 if (!transferable_) {
459 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) 475 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
460 << "Non transferable message pipe being sent after read/write/waited. " 476 << "Non transferable message pipe being sent after read/write/waited. "
461 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " 477 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
462 << "the pipe can be sent after it's read or written. This message pipe " 478 << "the pipe can be sent after it's read or written. This message pipe "
463 << "was previously bound at:\n" 479 << "was previously bound at:\n"
(...skipping 397 matching lines...) Expand 10 before | Expand all | Expand 10 after
861 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 877 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
862 if (message_view.transport_data_buffer_size() > 0) { 878 if (message_view.transport_data_buffer_size() > 0) {
863 DCHECK(message_view.transport_data_buffer()); 879 DCHECK(message_view.transport_data_buffer());
864 message->SetDispatchers(TransportData::DeserializeDispatchers( 880 message->SetDispatchers(TransportData::DeserializeDispatchers(
865 message_view.transport_data_buffer(), 881 message_view.transport_data_buffer(),
866 message_view.transport_data_buffer_size(), 882 message_view.transport_data_buffer_size(),
867 std::move(platform_handles))); 883 std::move(platform_handles)));
868 } 884 }
869 885
870 if (started_transport_.Try()) { 886 if (started_transport_.Try()) {
871 // we're not in the middle of being sent 887 // We're not in the middle of being sent.
872 888
873 // Can get synchronously called back in Init if there was initial data. 889 // Can get synchronously called back in Init if there was initial data.
874 scoped_ptr<base::AutoLock> locker; 890 scoped_ptr<base::AutoLock> locker;
875 if (!calling_init_) { 891 if (!calling_init_)
876 locker.reset(new base::AutoLock(lock())); 892 locker.reset(new base::AutoLock(lock()));
893
894 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
895 if (transferable_) {
896 channel_->Shutdown();
897 } else {
898 internal::g_broker->CloseMessagePipe(pipe_id_, this);
899 non_transferable_state_ = CLOSED;
900 }
901 channel_ = nullptr;
902 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
903 if (close_requested_) {
904 // We requested the other side to close the connection while they also
905 // did the same. We must balance out the AddRef in CloseOnIO to ensure
906 // this object isn't leaked.
907 Release();
jam 2015/12/30 17:03:40 (unlike the other two changes from patchset 1, whi
sky 2015/12/30 17:32:29 If the Release() here deletes this, won't 916 resu
jam 2015/12/30 17:41:27 yep. fixed this along with dchecks that i was hitt
908 }
909 } else {
910 bool was_empty = message_queue_.IsEmpty();
911 message_queue_.AddMessage(std::move(message));
912 if (was_empty)
913 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
877 } 914 }
878 915
879 bool was_empty = message_queue_.IsEmpty();
880 message_queue_.AddMessage(std::move(message));
881 if (was_empty)
882 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
883
884 started_transport_.Release(); 916 started_transport_.Release();
885 } else { 917 } else {
886 // If RawChannel is calling OnRead, that means it has its read_lock_ 918 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
887 // acquired. That means StartSerialize can't be accessing message queue as 919 // We got a request to shutdown the channel but this object is already
888 // it waits on ReleaseHandle first which acquires readlock_. 920 // calling into channel to serialize it. Since all the other side cares
889 message_queue_.AddMessage(std::move(message)); 921 // about is flushing pending messages, we bounce the quit back to it.
922 scoped_ptr<MessageInTransit> message(new MessageInTransit(
923 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
924 channel_->WriteMessage(std::move(message));
925 } else {
926 // If RawChannel is calling OnRead, that means it has its read_lock_
927 // acquired. That means StartSerialize can't be accessing message queue as
928 // it waits on ReleaseHandle first which acquires read_lock_.
929 message_queue_.AddMessage(std::move(message));
930 }
890 } 931 }
891 } 932 }
892 933
893 void MessagePipeDispatcher::OnError(Error error) { 934 void MessagePipeDispatcher::OnError(Error error) {
894 // If there's a read error, then the other side of the pipe is closed. By 935 // If there's a read error, then the other side of the pipe is closed. By
895 // definition, we can't write since there's no one to read it. And we can't 936 // definition, we can't write since there's no one to read it. And we can't
896 // read anymore, since we just got a read erorr. So we close the pipe. 937 // read anymore, since we just got a read erorr. So we close the pipe.
897 // If there's a write error, then we stop writing. But we keep the pipe open 938 // If there's a write error, then we stop writing. But we keep the pipe open
898 // until we finish reading everything in it. This is because it's valid for 939 // until we finish reading everything in it. This is because it's valid for
899 // one endpoint to write some data and close their pipe immediately. Even 940 // one endpoint to write some data and close their pipe immediately. Even
(...skipping 18 matching lines...) Expand all
918 // Write errors are slightly notable: they probably shouldn't happen under 959 // Write errors are slightly notable: they probably shouldn't happen under
919 // normal operation (but maybe the other side crashed). 960 // normal operation (but maybe the other side crashed).
920 LOG(WARNING) << "MessagePipeDispatcher write error"; 961 LOG(WARNING) << "MessagePipeDispatcher write error";
921 DCHECK_EQ(write_error_, false) << "Should only get one write error."; 962 DCHECK_EQ(write_error_, false) << "Should only get one write error.";
922 write_error_ = true; 963 write_error_ = true;
923 break; 964 break;
924 } 965 }
925 966
926 if (started_transport_.Try()) { 967 if (started_transport_.Try()) {
927 base::AutoLock locker(lock()); 968 base::AutoLock locker(lock());
928 // We can get two OnError callbacks before the post task below completes. 969 bool call_release = false;
929 // Although RawChannel still has a pointer to this object until Shutdown is
930 // called, that is safe since this class always does a PostTask to the IO
931 // thread to self destruct.
932 if (channel_ && error != ERROR_WRITE) { 970 if (channel_ && error != ERROR_WRITE) {
933 if (transferable_) { 971 if (transferable_) {
934 channel_->Shutdown(); 972 channel_->Shutdown();
935 } else { 973 } else {
936 CHECK_NE(non_transferable_state_, CLOSED); 974 CHECK_NE(non_transferable_state_, CLOSED);
937 internal::g_broker->CloseMessagePipe(pipe_id_, this); 975 internal::g_broker->CloseMessagePipe(pipe_id_, this);
938 non_transferable_state_ = CLOSED; 976 non_transferable_state_ = CLOSED;
939 } 977 }
940 channel_ = nullptr; 978 channel_ = nullptr;
979 if (close_requested_) {
980 // Balance AddRef in CloseOnIO.
981 call_release = true;
982 }
941 } 983 }
942 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 984 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
943 started_transport_.Release(); 985 started_transport_.Release();
986 if (call_release)
987 Release();
944 } else { 988 } else {
945 // We must be waiting to call ReleaseHandle. It will call Shutdown. 989 // We must be waiting to call ReleaseHandle. It will call Shutdown.
946 } 990 }
947 } 991 }
948 992
949 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( 993 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
950 MessageInTransit* message, 994 MessageInTransit* message,
951 std::vector<DispatcherTransport>* transports) { 995 std::vector<DispatcherTransport>* transports) {
952 DCHECK(!message->has_dispatchers()); 996 DCHECK(!message->has_dispatchers());
953 997
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
1005 // PostTask since the broker can call us back synchronously. 1049 // PostTask since the broker can call us back synchronously.
1006 internal::g_io_thread_task_runner->PostTask( 1050 internal::g_io_thread_task_runner->PostTask(
1007 FROM_HERE, 1051 FROM_HERE,
1008 base::Bind(&Broker::ConnectMessagePipe, 1052 base::Bind(&Broker::ConnectMessagePipe,
1009 base::Unretained(internal::g_broker), pipe_id_, 1053 base::Unretained(internal::g_broker), pipe_id_,
1010 base::Unretained(this))); 1054 base::Unretained(this)));
1011 } 1055 }
1012 1056
1013 } // namespace edk 1057 } // namespace edk
1014 } // namespace mojo 1058 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/raw_channel.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698