Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(195)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1554623005: Ensure that in-flight message pipes are always closed and the other end is notified. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/raw_channel.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 150
151 void MessagePipeDispatcher::InitOnIO() { 151 void MessagePipeDispatcher::InitOnIO() {
152 base::AutoLock locker(lock()); 152 base::AutoLock locker(lock());
153 CHECK(transferable_); 153 CHECK(transferable_);
154 calling_init_ = true; 154 calling_init_ = true;
155 if (channel_) 155 if (channel_)
156 channel_->Init(this); 156 channel_->Init(this);
157 calling_init_ = false; 157 calling_init_ = false;
158 } 158 }
159 159
160 void MessagePipeDispatcher::CloseOnIO() { 160 void MessagePipeDispatcher::ReleaseAndCloseOnIO() {
161 base::AutoLock locker(lock()); 161 base::AutoLock locker(lock());
162 Release(); // To match CloseImplNoLock. 162 Release(); // To match CloseImplNoLock.
163 if (transferable_) { 163 CloseOnIO();
164 if (channel_) { 164 }
165 channel_->Shutdown();
166 channel_ = nullptr;
167 }
168 } else {
169 if (non_transferable_state_ == CONNECT_CALLED ||
170 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
171 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
172 RequestNontransferableChannel();
173 165
174 // We can't cancel the pending request yet, since the other side of the 166 void MessagePipeDispatcher::CloseOnIO() {
175 // message pipe would want to get pending outgoing messages (if any) or 167 lock().AssertAcquired();
176 // at least know that this end was closed. So keep this object alive until 168
177 // then. 169 if (channel_) {
178 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; 170 DCHECK(!close_requested_);
179 AddRef(); 171 close_requested_ = true;
180 } else if (non_transferable_state_ == CONNECTED) { 172 AddRef();
181 internal::g_broker->CloseMessagePipe(pipe_id_, this); 173 scoped_ptr<MessageInTransit> message(new MessageInTransit(
182 non_transferable_state_ = CLOSED; 174 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
183 channel_ = nullptr; 175 if (!transferable_)
184 } 176 message->set_route_id(pipe_id_);
177 channel_->WriteMessage(std::move(message));
178 return;
179 }
180
181 if (!transferable_ &&
182 (non_transferable_state_ == CONNECT_CALLED ||
183 non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) {
184 if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
185 RequestNontransferableChannel();
186
187 // We can't cancel the pending request yet, since the other side of the
188 // message pipe would want to get pending outgoing messages (if any) or at
189 // least know that this end was closed. So keep this object alive until
190 // then.
191 non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
192 AddRef();
185 } 193 }
186 } 194 }
187 195
188 Dispatcher::Type MessagePipeDispatcher::GetType() const { 196 Dispatcher::Type MessagePipeDispatcher::GetType() const {
189 return Type::MESSAGE_PIPE; 197 return Type::MESSAGE_PIPE;
190 } 198 }
191 199
192 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { 200 void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
201 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
193 base::AutoLock locker(lock()); 202 base::AutoLock locker(lock());
194 channel_ = channel; 203 channel_ = channel;
195 while (!non_transferable_outgoing_message_queue_.IsEmpty()) { 204 while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
196 channel_->WriteMessage( 205 channel_->WriteMessage(
197 non_transferable_outgoing_message_queue_.GetMessage()); 206 non_transferable_outgoing_message_queue_.GetMessage());
198 } 207 }
199 208
200 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { 209 if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
201 // We kept this object alive until it's connected, we can release it now. 210 non_transferable_state_ = CONNECTED;
202 internal::g_broker->CloseMessagePipe(pipe_id_, this); 211 // We kept this object alive until it's connected, we can close it now.
203 non_transferable_state_ = CLOSED; 212 CloseOnIO();
204 channel_ = nullptr; 213 // Balance the AddRef in CloseOnIO.
205 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); 214 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
206 } else { 215 return;
207 non_transferable_state_ = CONNECTED;
208 } 216 }
217
218 non_transferable_state_ = CONNECTED;
209 } 219 }
210 220
211 #if defined(OS_WIN) 221 #if defined(OS_WIN)
212 // TODO(jam): this is copied from RawChannelWin till I figure out what's the 222 // TODO(jam): this is copied from RawChannelWin till I figure out what's the
213 // best way we want to share this. 223 // best way we want to share this.
214 // Since this is used for serialization of messages read/written to a MP that 224 // Since this is used for serialization of messages read/written to a MP that
215 // aren't consumed by Mojo primitives yet, there could be an unbounded number of 225 // aren't consumed by Mojo primitives yet, there could be an unbounded number of
216 // them when a MP is being sent. As a result, even for POSIX we will probably 226 // them when a MP is being sent. As a result, even for POSIX we will probably
217 // want to send the handles to the shell process and exchange them for tokens 227 // want to send the handles to the shell process and exchange them for tokens
218 // (since we can be sure that the shell will respond to our IPCs, compared to 228 // (since we can be sure that the shell will respond to our IPCs, compared to
(...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after
401 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) 411 MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
402 : channel_(nullptr), 412 : channel_(nullptr),
403 serialized_read_fds_length_(0u), 413 serialized_read_fds_length_(0u),
404 serialized_write_fds_length_(0u), 414 serialized_write_fds_length_(0u),
405 serialized_message_fds_length_(0u), 415 serialized_message_fds_length_(0u),
406 pipe_id_(0), 416 pipe_id_(0),
407 non_transferable_state_(WAITING_FOR_READ_OR_WRITE), 417 non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
408 serialized_(false), 418 serialized_(false),
409 calling_init_(false), 419 calling_init_(false),
410 write_error_(false), 420 write_error_(false),
411 transferable_(transferable) { 421 transferable_(transferable),
422 close_requested_(false) {
412 } 423 }
413 424
414 MessagePipeDispatcher::~MessagePipeDispatcher() { 425 MessagePipeDispatcher::~MessagePipeDispatcher() {
415 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The 426 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The
416 // exception is if they posted a task to run CloseOnIO but the IO thread shut 427 // exception is if they posted a task to run CloseOnIO but the IO thread shut
417 // down and so when it was deleting pending tasks it caused the last reference 428 // down and so when it was deleting pending tasks it caused the last reference
418 // to destruct this object. In that case, safe to destroy the channel. 429 // to destruct this object. In that case, safe to destroy the channel.
419 if (channel_ && 430 if (channel_ &&
420 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { 431 internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) {
421 if (transferable_) { 432 if (transferable_) {
(...skipping 21 matching lines...) Expand all
443 if (!transferable_ && non_transferable_state_ == CLOSED) 454 if (!transferable_ && non_transferable_state_ == CLOSED)
444 return; 455 return;
445 456
446 // We take a manual refcount because at shutdown, the task below might not get 457 // We take a manual refcount because at shutdown, the task below might not get
447 // a chance to execute. If that happens, the RawChannel will still call our 458 // a chance to execute. If that happens, the RawChannel will still call our
448 // OnError method because it always runs (since it watches thread 459 // OnError method because it always runs (since it watches thread
449 // destruction). So to avoid UAF, manually add a reference and only release it 460 // destruction). So to avoid UAF, manually add a reference and only release it
450 // if the task runs. 461 // if the task runs.
451 AddRef(); 462 AddRef();
452 internal::g_io_thread_task_runner->PostTask( 463 internal::g_io_thread_task_runner->PostTask(
453 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); 464 FROM_HERE, base::Bind(&MessagePipeDispatcher::ReleaseAndCloseOnIO, this));
454 } 465 }
455 466
456 void MessagePipeDispatcher::SerializeInternal() { 467 void MessagePipeDispatcher::SerializeInternal() {
457 serialized_ = true; 468 serialized_ = true;
458 if (!transferable_) { 469 if (!transferable_) {
459 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) 470 CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
460 << "Non transferable message pipe being sent after read/write/waited. " 471 << "Non transferable message pipe being sent after read/write/waited. "
461 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " 472 << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
462 << "the pipe can be sent after it's read or written. This message pipe " 473 << "the pipe can be sent after it's read or written. This message pipe "
463 << "was previously bound at:\n" 474 << "was previously bound at:\n"
(...skipping 397 matching lines...) Expand 10 before | Expand all | Expand 10 after
861 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); 872 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
862 if (message_view.transport_data_buffer_size() > 0) { 873 if (message_view.transport_data_buffer_size() > 0) {
863 DCHECK(message_view.transport_data_buffer()); 874 DCHECK(message_view.transport_data_buffer());
864 message->SetDispatchers(TransportData::DeserializeDispatchers( 875 message->SetDispatchers(TransportData::DeserializeDispatchers(
865 message_view.transport_data_buffer(), 876 message_view.transport_data_buffer(),
866 message_view.transport_data_buffer_size(), 877 message_view.transport_data_buffer_size(),
867 std::move(platform_handles))); 878 std::move(platform_handles)));
868 } 879 }
869 880
870 if (started_transport_.Try()) { 881 if (started_transport_.Try()) {
871 // we're not in the middle of being sent 882 // We're not in the middle of being sent.
872 883
873 // Can get synchronously called back in Init if there was initial data. 884 // Can get synchronously called back in Init if there was initial data.
874 scoped_ptr<base::AutoLock> locker; 885 scoped_ptr<base::AutoLock> locker;
875 if (!calling_init_) { 886 if (!calling_init_)
876 locker.reset(new base::AutoLock(lock())); 887 locker.reset(new base::AutoLock(lock()));
888
889 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
890 if (transferable_) {
891 channel_->Shutdown();
892 } else {
893 internal::g_broker->CloseMessagePipe(pipe_id_, this);
894 non_transferable_state_ = CLOSED;
895 }
896 channel_ = nullptr;
897 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
898 } else {
899 bool was_empty = message_queue_.IsEmpty();
900 message_queue_.AddMessage(std::move(message));
901 if (was_empty)
902 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
877 } 903 }
878 904
879 bool was_empty = message_queue_.IsEmpty();
880 message_queue_.AddMessage(std::move(message));
881 if (was_empty)
882 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
883
884 started_transport_.Release(); 905 started_transport_.Release();
885 } else { 906 } else {
886 // If RawChannel is calling OnRead, that means it has its read_lock_ 907 if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
887 // acquired. That means StartSerialize can't be accessing message queue as 908 // We got a request to shutdown the channel but this object is already
888 // it waits on ReleaseHandle first which acquires readlock_. 909 // calling into channel to serialize it. Since all the other side cares
889 message_queue_.AddMessage(std::move(message)); 910 // about is flushing pending messages, we bounce the quit back to it.
911 scoped_ptr<MessageInTransit> message(new MessageInTransit(
912 MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
913 channel_->WriteMessage(std::move(message));
914 } else {
915 // If RawChannel is calling OnRead, that means it has its read_lock_
916 // acquired. That means StartSerialize can't be accessing message queue as
917 // it waits on ReleaseHandle first which acquires read_lock_.
918 message_queue_.AddMessage(std::move(message));
919 }
890 } 920 }
891 } 921 }
892 922
893 void MessagePipeDispatcher::OnError(Error error) { 923 void MessagePipeDispatcher::OnError(Error error) {
894 // If there's a read error, then the other side of the pipe is closed. By 924 // If there's a read error, then the other side of the pipe is closed. By
895 // definition, we can't write since there's no one to read it. And we can't 925 // definition, we can't write since there's no one to read it. And we can't
896 // read anymore, since we just got a read erorr. So we close the pipe. 926 // read anymore, since we just got a read erorr. So we close the pipe.
897 // If there's a write error, then we stop writing. But we keep the pipe open 927 // If there's a write error, then we stop writing. But we keep the pipe open
898 // until we finish reading everything in it. This is because it's valid for 928 // until we finish reading everything in it. This is because it's valid for
899 // one endpoint to write some data and close their pipe immediately. Even 929 // one endpoint to write some data and close their pipe immediately. Even
(...skipping 18 matching lines...) Expand all
918 // Write errors are slightly notable: they probably shouldn't happen under 948 // Write errors are slightly notable: they probably shouldn't happen under
919 // normal operation (but maybe the other side crashed). 949 // normal operation (but maybe the other side crashed).
920 LOG(WARNING) << "MessagePipeDispatcher write error"; 950 LOG(WARNING) << "MessagePipeDispatcher write error";
921 DCHECK_EQ(write_error_, false) << "Should only get one write error."; 951 DCHECK_EQ(write_error_, false) << "Should only get one write error.";
922 write_error_ = true; 952 write_error_ = true;
923 break; 953 break;
924 } 954 }
925 955
926 if (started_transport_.Try()) { 956 if (started_transport_.Try()) {
927 base::AutoLock locker(lock()); 957 base::AutoLock locker(lock());
928 // We can get two OnError callbacks before the post task below completes.
929 // Although RawChannel still has a pointer to this object until Shutdown is
930 // called, that is safe since this class always does a PostTask to the IO
931 // thread to self destruct.
932 if (channel_ && error != ERROR_WRITE) { 958 if (channel_ && error != ERROR_WRITE) {
933 if (transferable_) { 959 if (transferable_) {
934 channel_->Shutdown(); 960 channel_->Shutdown();
935 } else { 961 } else {
936 CHECK_NE(non_transferable_state_, CLOSED); 962 CHECK_NE(non_transferable_state_, CLOSED);
937 internal::g_broker->CloseMessagePipe(pipe_id_, this); 963 internal::g_broker->CloseMessagePipe(pipe_id_, this);
938 non_transferable_state_ = CLOSED; 964 non_transferable_state_ = CLOSED;
939 } 965 }
940 channel_ = nullptr; 966 channel_ = nullptr;
967 if (close_requested_) {
968 // Balance AddRef in CloseOnIO.
969 base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
sky 2015/12/30 16:33:32 Is there a reason you can't Release() rather than
jam 2015/12/30 17:03:40 This might be the last reference, so I was just si
970 }
941 } 971 }
942 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 972 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
943 started_transport_.Release(); 973 started_transport_.Release();
944 } else { 974 } else {
945 // We must be waiting to call ReleaseHandle. It will call Shutdown. 975 // We must be waiting to call ReleaseHandle. It will call Shutdown.
946 } 976 }
947 } 977 }
948 978
949 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( 979 MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
950 MessageInTransit* message, 980 MessageInTransit* message,
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
1005 // PostTask since the broker can call us back synchronously. 1035 // PostTask since the broker can call us back synchronously.
1006 internal::g_io_thread_task_runner->PostTask( 1036 internal::g_io_thread_task_runner->PostTask(
1007 FROM_HERE, 1037 FROM_HERE,
1008 base::Bind(&Broker::ConnectMessagePipe, 1038 base::Bind(&Broker::ConnectMessagePipe,
1009 base::Unretained(internal::g_broker), pipe_id_, 1039 base::Unretained(internal::g_broker), pipe_id_,
1010 base::Unretained(this))); 1040 base::Unretained(this)));
1011 } 1041 }
1012 1042
1013 } // namespace edk 1043 } // namespace edk
1014 } // namespace mojo 1044 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/raw_channel.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698