Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(892)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1554623005: Ensure that in-flight message pipes are always closed and the other end is notified. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix dcheck of lock being acquired at destruction Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/raw_channel.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 150
151 void MessagePipeDispatcher::InitOnIO() { 151 void MessagePipeDispatcher::InitOnIO() {
152 base::AutoLock locker(lock()); 152 base::AutoLock locker(lock());
153 CHECK(transferable_); 153 CHECK(transferable_);
154 calling_init_ = true; 154 calling_init_ = true;
155 if (channel_) 155 if (channel_)
156 channel_->Init(this); 156 channel_->Init(this);
157 calling_init_ = false; 157 calling_init_ = false;
158 } 158 }
159 159
160 void MessagePipeDispatcher::CloseOnIOAndRelease() {
161 {
162 base::AutoLock locker(lock());
163 CloseOnIO();
164 }
165 Release(); // To match CloseImplNoLock.
166 }
167
160 void MessagePipeDispatcher::CloseOnIO() { 168 void MessagePipeDispatcher::CloseOnIO() {
161 base::AutoLock locker(lock()); 169 lock().AssertAcquired();
162 Release(); // To match CloseImplNoLock.
163 if (transferable_) {
164 if (channel_) {
165 channel_->Shutdown();
166 channel_ = nullptr;
167 }
168 } else {
169 if (non_transferable_state_ == CONNECT_CALLED ||
170 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
171 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
172 RequestNontransferableChannel();
173 170
174 // We can't cancel the pending request yet, since the other side of the 171 if (channel_) {
175 // message pipe would want to get pending outgoing messages (if any) or 172 // If we closed the channel now then in-flight message pipes wouldn't get
176 // at least know that this end was closed. So keep this object alive until 173 // closed, and their other side wouldn't get a connection error notification
177 // then. 174 // which could lead to hangs or leaks. So we ask the other side of this
178 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; 175 // message pipe to close, which ensures that we have dispatched all
179 AddRef(); 176 // in-flight message pipes.
180 } else if (non_transferable_state_ == CONNECTED) { 177 DCHECK(!close_requested_);
181 internal::g_broker->CloseMessagePipe(pipe_id_, this); 178 close_requested_ = true;
182 non_transferable_state_ = CLOSED; 179 AddRef();
183 channel_ = nullptr; 180 scoped_ptr<MessageInTransit> message(new MessageInTransit(
184 } 181 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
182 if (!transferable_)
183 message->set_route_id(pipe_id_);
184 channel_->WriteMessage(std::move(message));
185 return;
186 }
187
188 if (!transferable_ &&
189 (non_transferable_state_ == CONNECT_CALLED ||
190 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) {
191 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
192 RequestNontransferableChannel();
193
194 // We can't cancel the pending request yet, since the other side of the
195 // message pipe would want to get pending outgoing messages (if any) or at
196 // least know that this end was closed. So keep this object alive until
197 // then.
198 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
199 AddRef();
185 } 200 }
186 } 201 }
187 202
188 Dispatcher::Type MessagePipeDispatcher::GetType() const { 203 Dispatcher::Type MessagePipeDispatcher::GetType() const {
189 return Type::MESSAGE_PIPE; 204 return Type::MESSAGE_PIPE;
190 } 205 }
191 206
192 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { 207 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
208 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
193 base::AutoLock locker(lock()); 209 base::AutoLock locker(lock());
194 channel_ = channel; 210 channel_ = channel;
195 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { 211 while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
196 channel_->WriteMessage( 212 channel_->WriteMessage(
197 non_transferable_outgoing_message_queue_.GetMessage()); 213 non_transferable_outgoing_message_queue_.GetMessage());
198 } 214 }
199 215
200 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { 216 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
201 // We kept this object alive until it's connected, we can release it now.
202 internal::g_broker->CloseMessagePipe(pipe_id_, this);
203 non_transferable_state_ = CLOSED;
204 channel_ = nullptr;
205 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
206 } else {
207 non_transferable_state_ = CONNECTED; 217 non_transferable_state_ = CONNECTED;
218 // We kept this object alive until it's connected, we can close it now.
219 CloseOnIO();
220 // Balance the AddRef in CloseOnIO.
221 Release();
222 return;
208 } 223 }
224
225 non_transferable_state_ = CONNECTED;
209 } 226 }
210 227
211 #if defined(OS_WIN) 228 #if defined(OS_WIN)
212 // TODO(jam): this is copied from RawChannelWin till I figure out what's the 229 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
213 // best way we want to share this. 230 // best way we want to share this.
214 // Since this is used for serialization of messages read/written to a MP that 231 // Since this is used for serialization of messages read/written to a MP that
215 // aren't consumed by Mojo primitives yet, there could be an unbounded number of 232 // aren't consumed by Mojo primitives yet, there could be an unbounded number of
216 // them when a MP is being sent. As a result, even for POSIX we will probably 233 // them when a MP is being sent. As a result, even for POSIX we will probably
217 // want to send the handles to the shell process and exchange them for tokens 234 // want to send the handles to the shell process and exchange them for tokens
218 // (since we can be sure that the shell will respond to our IPCs, compared to 235 // (since we can be sure that the shell will respond to our IPCs, compared to
(...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after
401 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) 418 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
402 : channel_(nullptr), 419 : channel_(nullptr),
403 serialized_read_fds_length_(0u), 420 serialized_read_fds_length_(0u),
404 serialized_write_fds_length_(0u), 421 serialized_write_fds_length_(0u),
405 serialized_message_fds_length_(0u), 422 serialized_message_fds_length_(0u),
406 pipe_id_(0), 423 pipe_id_(0),
407 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), 424 non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
408 serialized_(false), 425 serialized_(false),
409 calling_init_(false), 426 calling_init_(false),
410 write_error_(false), 427 write_error_(false),
411 transferable_(transferable) { 428 transferable_(transferable),
429 close_requested_(false) {
412 } 430 }
413 431
414 MessagePipeDispatcher::~MessagePipeDispatcher() { 432 MessagePipeDispatcher::~MessagePipeDispatcher() {
415 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The 433 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
416 // exception is if they posted a task to run CloseOnIO but the IO thread shut 434 // exception is if they posted a task to run CloseOnIO but the IO thread shut
417 // down and so when it was deleting pending tasks it caused the last reference 435 // down and so when it was deleting pending tasks it caused the last reference
418 // to destruct this object. In that case, safe to destroy the channel. 436 // to destruct this object. In that case, safe to destroy the channel.
419 if (channel_ && 437 if (channel_ &&
420 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { 438 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) {
421 if (transferable_) { 439 if (transferable_) {
(...skipping 21 matching lines...) Expand all
443 if (!transferable_ && non_transferable_state_ == CLOSED) 461 if (!transferable_ && non_transferable_state_ == CLOSED)
444 return; 462 return;
445 463
446 // We take a manual refcount because at shutdown, the task below might not get 464 // We take a manual refcount because at shutdown, the task below might not get
447 // a chance to execute. If that happens, the RawChannel will still call our 465 // a chance to execute. If that happens, the RawChannel will still call our
448 // OnError method because it always runs (since it watches thread 466 // OnError method because it always runs (since it watches thread
449 // destruction). So to avoid UAF, manually add a reference and only release it 467 // destruction). So to avoid UAF, manually add a reference and only release it
450 // if the task runs. 468 // if the task runs.
451 AddRef(); 469 AddRef();
452 internal::g_io_thread_task_runner->PostTask( 470 internal::g_io_thread_task_runner->PostTask(
453 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); 471 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this));
454 } 472 }
455 473
456 void MessagePipeDispatcher::SerializeInternal() { 474 void MessagePipeDispatcher::SerializeInternal() {
457 serialized_ = true; 475 serialized_ = true;
458 if (!transferable_) { 476 if (!transferable_) {
459 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) 477 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
460 << "Non transferable message pipe being sent after read/write/waited. " 478 << "Non transferable message pipe being sent after read/write/waited. "
461 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " 479 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
462 << "the pipe can be sent after it's read or written. This message pipe " 480 << "the pipe can be sent after it's read or written. This message pipe "
463 << "was previously bound at:\n" 481 << "was previously bound at:\n"
(...skipping 396 matching lines...) Expand 10 before | Expand all | Expand 10 after
860 ScopedPlatformHandleVectorPtr platform_handles) { 878 ScopedPlatformHandleVectorPtr platform_handles) {
861 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 879 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
862 if (message_view.transport_data_buffer_size() > 0) { 880 if (message_view.transport_data_buffer_size() > 0) {
863 DCHECK(message_view.transport_data_buffer()); 881 DCHECK(message_view.transport_data_buffer());
864 message->SetDispatchers(TransportData::DeserializeDispatchers( 882 message->SetDispatchers(TransportData::DeserializeDispatchers(
865 message_view.transport_data_buffer(), 883 message_view.transport_data_buffer(),
866 message_view.transport_data_buffer_size(), 884 message_view.transport_data_buffer_size(),
867 std::move(platform_handles))); 885 std::move(platform_handles)));
868 } 886 }
869 887
888 bool call_release = false;
870 if (started_transport_.Try()) { 889 if (started_transport_.Try()) {
871 // we're not in the middle of being sent 890 // We're not in the middle of being sent.
872 891
873 // Can get synchronously called back in Init if there was initial data. 892 // Can get synchronously called back in Init if there was initial data.
874 scoped_ptr<base::AutoLock> locker; 893 scoped_ptr<base::AutoLock> locker;
875 if (!calling_init_) { 894 if (!calling_init_)
876 locker.reset(new base::AutoLock(lock())); 895 locker.reset(new base::AutoLock(lock()));
896
897 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
898 if (transferable_) {
899 channel_->Shutdown();
900 } else {
901 internal::g_broker->CloseMessagePipe(pipe_id_, this);
902 non_transferable_state_ = CLOSED;
903 }
904 channel_ = nullptr;
905 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
906 if (close_requested_) {
907 // We requested the other side to close the connection while they also
908 // did the same. We must balance out the AddRef in CloseOnIO to ensure
909 // this object isn't leaked.
910 call_release = true;
911 }
912 } else {
913 bool was_empty = message_queue_.IsEmpty();
914 message_queue_.AddMessage(std::move(message));
915 if (was_empty)
916 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
877 } 917 }
878 918
879 bool was_empty = message_queue_.IsEmpty();
880 message_queue_.AddMessage(std::move(message));
881 if (was_empty)
882 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
883
884 started_transport_.Release(); 919 started_transport_.Release();
885 } else { 920 } else {
886 // If RawChannel is calling OnRead, that means it has its read_lock_ 921 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
887 // acquired. That means StartSerialize can't be accessing message queue as 922 // We got a request to shutdown the channel but this object is already
888 // it waits on ReleaseHandle first which acquires readlock_. 923 // calling into channel to serialize it. Since all the other side cares
889 message_queue_.AddMessage(std::move(message)); 924 // about is flushing pending messages, we bounce the quit back to it.
925 scoped_ptr<MessageInTransit> message(new MessageInTransit(
926 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
927 channel_->WriteMessage(std::move(message));
928 } else {
929 // If RawChannel is calling OnRead, that means it has its read_lock_
930 // acquired. That means StartSerialize can't be accessing message queue as
931 // it waits on ReleaseHandle first which acquires read_lock_.
932 message_queue_.AddMessage(std::move(message));
933 }
890 } 934 }
935
936 if (call_release)
937 Release();
891 } 938 }
892 939
893 void MessagePipeDispatcher::OnError(Error error) { 940 void MessagePipeDispatcher::OnError(Error error) {
894 // If there's a read error, then the other side of the pipe is closed. By 941 // If there's a read error, then the other side of the pipe is closed. By
895 // definition, we can't write since there's no one to read it. And we can't 942 // definition, we can't write since there's no one to read it. And we can't
896 // read anymore, since we just got a read erorr. So we close the pipe. 943 // read anymore, since we just got a read erorr. So we close the pipe.
897 // If there's a write error, then we stop writing. But we keep the pipe open 944 // If there's a write error, then we stop writing. But we keep the pipe open
898 // until we finish reading everything in it. This is because it's valid for 945 // until we finish reading everything in it. This is because it's valid for
899 // one endpoint to write some data and close their pipe immediately. Even 946 // one endpoint to write some data and close their pipe immediately. Even
900 // though the other end can't write anymore, it should still get all the data. 947 // though the other end can't write anymore, it should still get all the data.
(...skipping 15 matching lines...) Expand all
916 break; 963 break;
917 case ERROR_WRITE: 964 case ERROR_WRITE:
918 // Write errors are slightly notable: they probably shouldn't happen under 965 // Write errors are slightly notable: they probably shouldn't happen under
919 // normal operation (but maybe the other side crashed). 966 // normal operation (but maybe the other side crashed).
920 LOG(WARNING) << "MessagePipeDispatcher write error"; 967 LOG(WARNING) << "MessagePipeDispatcher write error";
921 DCHECK_EQ(write_error_, false) << "Should only get one write error."; 968 DCHECK_EQ(write_error_, false) << "Should only get one write error.";
922 write_error_ = true; 969 write_error_ = true;
923 break; 970 break;
924 } 971 }
925 972
973 bool call_release = false;
926 if (started_transport_.Try()) { 974 if (started_transport_.Try()) {
927 base::AutoLock locker(lock()); 975 base::AutoLock locker(lock());
928 // We can get two OnError callbacks before the post task below completes.
929 // Although RawChannel still has a pointer to this object until Shutdown is
930 // called, that is safe since this class always does a PostTask to the IO
931 // thread to self destruct.
932 if (channel_ && error != ERROR_WRITE) { 976 if (channel_ && error != ERROR_WRITE) {
933 if (transferable_) { 977 if (transferable_) {
934 channel_->Shutdown(); 978 channel_->Shutdown();
935 } else { 979 } else {
936 CHECK_NE(non_transferable_state_, CLOSED); 980 CHECK_NE(non_transferable_state_, CLOSED);
937 internal::g_broker->CloseMessagePipe(pipe_id_, this); 981 internal::g_broker->CloseMessagePipe(pipe_id_, this);
938 non_transferable_state_ = CLOSED; 982 non_transferable_state_ = CLOSED;
939 } 983 }
940 channel_ = nullptr; 984 channel_ = nullptr;
985 if (close_requested_) {
986 // Balance AddRef in CloseOnIO.
987 call_release = true;
988 }
941 } 989 }
942 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 990 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
943 started_transport_.Release(); 991 started_transport_.Release();
944 } else { 992 } else {
945 // We must be waiting to call ReleaseHandle. It will call Shutdown. 993 // We must be waiting to call ReleaseHandle. It will call Shutdown.
946 } 994 }
995
996 if (call_release)
997 Release();
947 } 998 }
948 999
949 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( 1000 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
950 MessageInTransit* message, 1001 MessageInTransit* message,
951 std::vector<DispatcherTransport>* transports) { 1002 std::vector<DispatcherTransport>* transports) {
952 DCHECK(!message->has_dispatchers()); 1003 DCHECK(!message->has_dispatchers());
953 1004
954 // You're not allowed to send either handle to a message pipe over the message 1005 // You're not allowed to send either handle to a message pipe over the message
955 // pipe, so check for this. (The case of trying to write a handle to itself is 1006 // pipe, so check for this. (The case of trying to write a handle to itself is
956 // taken care of by |Core|. That case kind of makes sense, but leads to 1007 // taken care of by |Core|. That case kind of makes sense, but leads to
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
1005 // PostTask since the broker can call us back synchronously. 1056 // PostTask since the broker can call us back synchronously.
1006 internal::g_io_thread_task_runner->PostTask( 1057 internal::g_io_thread_task_runner->PostTask(
1007 FROM_HERE, 1058 FROM_HERE,
1008 base::Bind(&Broker::ConnectMessagePipe, 1059 base::Bind(&Broker::ConnectMessagePipe,
1009 base::Unretained(internal::g_broker), pipe_id_, 1060 base::Unretained(internal::g_broker), pipe_id_,
1010 base::Unretained(this))); 1061 base::Unretained(this)));
1011 } 1062 }
1012 1063
1013 } // namespace edk 1064 } // namespace edk
1014 } // namespace mojo 1065 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/raw_channel.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698