| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/connector.h" | 5 #include "mojo/public/cpp/bindings/lib/connector.h" |
| 6 | 6 |
| 7 #include "mojo/public/cpp/environment/logging.h" | 7 #include "base/logging.h" |
| 8 #include "base/macros.h" |
| 9 #include "base/synchronization/lock.h" |
| 8 | 10 |
| 9 namespace mojo { | 11 namespace mojo { |
| 10 namespace internal { | 12 namespace internal { |
| 11 | 13 |
| 14 namespace { |
| 15 |
| 16 // Similar to base::AutoLock, except that it does nothing if |lock| passed into |
| 17 // the constructor is null. |
| 18 class MayAutoLock { |
| 19 public: |
| 20 explicit MayAutoLock(base::Lock* lock) : lock_(lock) { |
| 21 if (lock_) |
| 22 lock_->Acquire(); |
| 23 } |
| 24 |
| 25 ~MayAutoLock() { |
| 26 if (lock_) { |
| 27 lock_->AssertAcquired(); |
| 28 lock_->Release(); |
| 29 } |
| 30 } |
| 31 |
| 32 private: |
| 33 base::Lock* lock_; |
| 34 DISALLOW_COPY_AND_ASSIGN(MayAutoLock); |
| 35 }; |
| 36 |
| 37 } // namespace |
| 38 |
| 12 // ---------------------------------------------------------------------------- | 39 // ---------------------------------------------------------------------------- |
| 13 | 40 |
| 14 Connector::Connector(ScopedMessagePipeHandle message_pipe, | 41 Connector::Connector(ScopedMessagePipeHandle message_pipe, |
| 42 SendingThreadSafetyType sending_thread_safety_type, |
| 15 const MojoAsyncWaiter* waiter) | 43 const MojoAsyncWaiter* waiter) |
| 16 : waiter_(waiter), | 44 : waiter_(waiter), |
| 17 message_pipe_(message_pipe.Pass()), | 45 message_pipe_(message_pipe.Pass()), |
| 18 incoming_receiver_(nullptr), | 46 incoming_receiver_(nullptr), |
| 19 async_wait_id_(0), | 47 async_wait_id_(0), |
| 20 error_(false), | 48 error_(false), |
| 21 drop_writes_(false), | 49 drop_writes_(false), |
| 22 enforce_errors_from_incoming_receiver_(true), | 50 enforce_errors_from_incoming_receiver_(true), |
| 23 paused_(false), | 51 paused_(false), |
| 24 destroyed_flag_(nullptr) { | 52 destroyed_flag_(nullptr), |
| 53 lock_(sending_thread_safety_type == MULTI_THREADED_SEND ? new base::Lock |
| 54 : nullptr) { |
| 25 // Even though we don't have an incoming receiver, we still want to monitor | 55 // Even though we don't have an incoming receiver, we still want to monitor |
| 26 // the message pipe to know if is closed or encounters an error. | 56 // the message pipe to know if is closed or encounters an error. |
| 27 WaitToReadMore(); | 57 WaitToReadMore(); |
| 28 } | 58 } |
| 29 | 59 |
| 30 Connector::~Connector() { | 60 Connector::~Connector() { |
| 61 DCHECK(thread_checker_.CalledOnValidThread()); |
| 62 |
| 31 if (destroyed_flag_) | 63 if (destroyed_flag_) |
| 32 *destroyed_flag_ = true; | 64 *destroyed_flag_ = true; |
| 33 | 65 |
| 34 CancelWait(); | 66 CancelWait(); |
| 35 } | 67 } |
| 36 | 68 |
| 37 void Connector::CloseMessagePipe() { | 69 void Connector::CloseMessagePipe() { |
| 70 DCHECK(thread_checker_.CalledOnValidThread()); |
| 71 |
| 38 CancelWait(); | 72 CancelWait(); |
| 73 MayAutoLock locker(lock_.get()); |
| 39 Close(message_pipe_.Pass()); | 74 Close(message_pipe_.Pass()); |
| 40 } | 75 } |
| 41 | 76 |
| 42 ScopedMessagePipeHandle Connector::PassMessagePipe() { | 77 ScopedMessagePipeHandle Connector::PassMessagePipe() { |
| 78 DCHECK(thread_checker_.CalledOnValidThread()); |
| 79 |
| 43 CancelWait(); | 80 CancelWait(); |
| 81 MayAutoLock locker(lock_.get()); |
| 44 return message_pipe_.Pass(); | 82 return message_pipe_.Pass(); |
| 45 } | 83 } |
| 46 | 84 |
| 47 void Connector::RaiseError() { | 85 void Connector::RaiseError() { |
| 86 DCHECK(thread_checker_.CalledOnValidThread()); |
| 87 |
| 48 HandleError(true, true); | 88 HandleError(true, true); |
| 49 } | 89 } |
| 50 | 90 |
| 51 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { | 91 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { |
| 92 DCHECK(thread_checker_.CalledOnValidThread()); |
| 93 |
| 52 if (error_) | 94 if (error_) |
| 53 return false; | 95 return false; |
| 54 | 96 |
| 55 ResumeIncomingMethodCallProcessing(); | 97 ResumeIncomingMethodCallProcessing(); |
| 56 | 98 |
| 57 MojoResult rv = | 99 MojoResult rv = |
| 58 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); | 100 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); |
| 59 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED) | 101 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED) |
| 60 return false; | 102 return false; |
| 61 if (rv != MOJO_RESULT_OK) { | 103 if (rv != MOJO_RESULT_OK) { |
| 62 // Users that call WaitForIncomingMessage() should expect their code to be | 104 // Users that call WaitForIncomingMessage() should expect their code to be |
| 63 // re-entered, so we call the error handler synchronously. | 105 // re-entered, so we call the error handler synchronously. |
| 64 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); | 106 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); |
| 65 return false; | 107 return false; |
| 66 } | 108 } |
| 67 mojo_ignore_result(ReadSingleMessage(&rv)); | 109 mojo_ignore_result(ReadSingleMessage(&rv)); |
| 68 return (rv == MOJO_RESULT_OK); | 110 return (rv == MOJO_RESULT_OK); |
| 69 } | 111 } |
| 70 | 112 |
| 71 void Connector::PauseIncomingMethodCallProcessing() { | 113 void Connector::PauseIncomingMethodCallProcessing() { |
| 114 DCHECK(thread_checker_.CalledOnValidThread()); |
| 115 |
| 72 if (paused_) | 116 if (paused_) |
| 73 return; | 117 return; |
| 74 | 118 |
| 75 paused_ = true; | 119 paused_ = true; |
| 76 CancelWait(); | 120 CancelWait(); |
| 77 } | 121 } |
| 78 | 122 |
| 79 void Connector::ResumeIncomingMethodCallProcessing() { | 123 void Connector::ResumeIncomingMethodCallProcessing() { |
| 124 DCHECK(thread_checker_.CalledOnValidThread()); |
| 125 |
| 80 if (!paused_) | 126 if (!paused_) |
| 81 return; | 127 return; |
| 82 | 128 |
| 83 paused_ = false; | 129 paused_ = false; |
| 84 WaitToReadMore(); | 130 WaitToReadMore(); |
| 85 } | 131 } |
| 86 | 132 |
| 87 bool Connector::Accept(Message* message) { | 133 bool Connector::Accept(Message* message) { |
| 134 DCHECK(lock_ || thread_checker_.CalledOnValidThread()); |
| 135 |
| 136 // It shouldn't hurt even if |error_| may be changed by a different thread at |
| 137 // the same time. The outcome is that we may write into |message_pipe_| after |
| 138 // encountering an error, which should be fine. |
| 88 if (error_) | 139 if (error_) |
| 89 return false; | 140 return false; |
| 90 | 141 |
| 91 MOJO_CHECK(message_pipe_.is_valid()); | 142 MayAutoLock locker(lock_.get()); |
| 92 if (drop_writes_) | 143 |
| 144 if (!message_pipe_.is_valid() || drop_writes_) |
| 93 return true; | 145 return true; |
| 94 | 146 |
| 95 MojoResult rv = | 147 MojoResult rv = |
| 96 WriteMessageRaw(message_pipe_.get(), | 148 WriteMessageRaw(message_pipe_.get(), |
| 97 message->data(), | 149 message->data(), |
| 98 message->data_num_bytes(), | 150 message->data_num_bytes(), |
| 99 message->mutable_handles()->empty() | 151 message->mutable_handles()->empty() |
| 100 ? nullptr | 152 ? nullptr |
| 101 : reinterpret_cast<const MojoHandle*>( | 153 : reinterpret_cast<const MojoHandle*>( |
| 102 &message->mutable_handles()->front()), | 154 &message->mutable_handles()->front()), |
| (...skipping 14 matching lines...) Expand all Loading... |
| 117 drop_writes_ = true; | 169 drop_writes_ = true; |
| 118 break; | 170 break; |
| 119 case MOJO_RESULT_BUSY: | 171 case MOJO_RESULT_BUSY: |
| 120 // We'd get a "busy" result if one of the message's handles is: | 172 // We'd get a "busy" result if one of the message's handles is: |
| 121 // - |message_pipe_|'s own handle; | 173 // - |message_pipe_|'s own handle; |
| 122 // - simultaneously being used on another thread; or | 174 // - simultaneously being used on another thread; or |
| 123 // - in a "busy" state that prohibits it from being transferred (e.g., | 175 // - in a "busy" state that prohibits it from being transferred (e.g., |
| 124 // a data pipe handle in the middle of a two-phase read/write, | 176 // a data pipe handle in the middle of a two-phase read/write, |
| 125 // regardless of which thread that two-phase read/write is happening | 177 // regardless of which thread that two-phase read/write is happening |
| 126 // on). | 178 // on). |
| 127 // TODO(vtl): I wonder if this should be a |MOJO_DCHECK()|. (But, until | 179 // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until |
| 128 // crbug.com/389666, etc. are resolved, this will make tests fail quickly | 180 // crbug.com/389666, etc. are resolved, this will make tests fail quickly |
| 129 // rather than hanging.) | 181 // rather than hanging.) |
| 130 MOJO_CHECK(false) << "Race condition or other bug detected"; | 182 CHECK(false) << "Race condition or other bug detected"; |
| 131 return false; | 183 return false; |
| 132 default: | 184 default: |
| 133 // This particular write was rejected, presumably because of bad input. | 185 // This particular write was rejected, presumably because of bad input. |
| 134 // The pipe is not necessarily in a bad state. | 186 // The pipe is not necessarily in a bad state. |
| 135 return false; | 187 return false; |
| 136 } | 188 } |
| 137 return true; | 189 return true; |
| 138 } | 190 } |
| 139 | 191 |
| 140 // static | 192 // static |
| 141 void Connector::CallOnHandleReady(void* closure, MojoResult result) { | 193 void Connector::CallOnHandleReady(void* closure, MojoResult result) { |
| 142 Connector* self = static_cast<Connector*>(closure); | 194 Connector* self = static_cast<Connector*>(closure); |
| 143 self->OnHandleReady(result); | 195 self->OnHandleReady(result); |
| 144 } | 196 } |
| 145 | 197 |
| 146 void Connector::OnHandleReady(MojoResult result) { | 198 void Connector::OnHandleReady(MojoResult result) { |
| 147 MOJO_CHECK(async_wait_id_ != 0); | 199 DCHECK(thread_checker_.CalledOnValidThread()); |
| 200 |
| 201 CHECK(async_wait_id_ != 0); |
| 148 async_wait_id_ = 0; | 202 async_wait_id_ = 0; |
| 149 if (result != MOJO_RESULT_OK) { | 203 if (result != MOJO_RESULT_OK) { |
| 150 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 204 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
| 151 return; | 205 return; |
| 152 } | 206 } |
| 153 ReadAllAvailableMessages(); | 207 ReadAllAvailableMessages(); |
| 154 // At this point, this object might have been deleted. Return. | 208 // At this point, this object might have been deleted. Return. |
| 155 } | 209 } |
| 156 | 210 |
| 157 void Connector::WaitToReadMore() { | 211 void Connector::WaitToReadMore() { |
| 158 MOJO_CHECK(!async_wait_id_); | 212 CHECK(!async_wait_id_); |
| 159 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), | 213 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), |
| 160 MOJO_HANDLE_SIGNAL_READABLE, | 214 MOJO_HANDLE_SIGNAL_READABLE, |
| 161 MOJO_DEADLINE_INDEFINITE, | 215 MOJO_DEADLINE_INDEFINITE, |
| 162 &Connector::CallOnHandleReady, | 216 &Connector::CallOnHandleReady, |
| 163 this); | 217 this); |
| 164 } | 218 } |
| 165 | 219 |
| 166 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 220 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
| 167 bool receiver_result = false; | 221 bool receiver_result = false; |
| 168 | 222 |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 230 force_pipe_reset = true; | 284 force_pipe_reset = true; |
| 231 | 285 |
| 232 if (paused_) { | 286 if (paused_) { |
| 233 // If the user has paused receiving messages, we shouldn't call the error | 287 // If the user has paused receiving messages, we shouldn't call the error |
| 234 // handler right away. We need to wait until the user starts receiving | 288 // handler right away. We need to wait until the user starts receiving |
| 235 // messages again. | 289 // messages again. |
| 236 force_async_handler = true; | 290 force_async_handler = true; |
| 237 } | 291 } |
| 238 | 292 |
| 239 if (force_pipe_reset) { | 293 if (force_pipe_reset) { |
| 240 CloseMessagePipe(); | 294 CancelWait(); |
| 295 MayAutoLock locker(lock_.get()); |
| 296 Close(message_pipe_.Pass()); |
| 241 MessagePipe dummy_pipe; | 297 MessagePipe dummy_pipe; |
| 242 message_pipe_ = dummy_pipe.handle0.Pass(); | 298 message_pipe_ = dummy_pipe.handle0.Pass(); |
| 243 } else { | 299 } else { |
| 244 CancelWait(); | 300 CancelWait(); |
| 245 } | 301 } |
| 246 | 302 |
| 247 if (force_async_handler) { | 303 if (force_async_handler) { |
| 248 // |dummy_pipe.handle1| has been destructed. Reading the pipe will | 304 // |dummy_pipe.handle1| has been destructed. Reading the pipe will |
| 249 // eventually cause a read error on |message_pipe_| and set error state. | 305 // eventually cause a read error on |message_pipe_| and set error state. |
| 250 if (!paused_) | 306 if (!paused_) |
| 251 WaitToReadMore(); | 307 WaitToReadMore(); |
| 252 } else { | 308 } else { |
| 253 error_ = true; | 309 error_ = true; |
| 254 connection_error_handler_.Run(); | 310 connection_error_handler_.Run(); |
| 255 } | 311 } |
| 256 } | 312 } |
| 257 | 313 |
| 258 } // namespace internal | 314 } // namespace internal |
| 259 } // namespace mojo | 315 } // namespace mojo |
| OLD | NEW |