OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/connector.h" | 5 #include "mojo/public/cpp/bindings/lib/connector.h" |
6 | 6 |
7 #include "mojo/public/cpp/environment/logging.h" | 7 #include "base/logging.h" |
| 8 #include "base/macros.h" |
| 9 #include "base/synchronization/lock.h" |
8 | 10 |
9 namespace mojo { | 11 namespace mojo { |
10 namespace internal { | 12 namespace internal { |
11 | 13 |
| 14 namespace { |
| 15 |
| 16 // Similar to base::AutoLock, except that it does nothing if |lock| passed into |
| 17 // the constructor is null. |
| 18 class MayAutoLock { |
| 19 public: |
| 20 explicit MayAutoLock(base::Lock* lock) : lock_(lock) { |
| 21 if (lock_) |
| 22 lock_->Acquire(); |
| 23 } |
| 24 |
| 25 ~MayAutoLock() { |
| 26 if (lock_) { |
| 27 lock_->AssertAcquired(); |
| 28 lock_->Release(); |
| 29 } |
| 30 } |
| 31 |
| 32 private: |
| 33 base::Lock* lock_; |
| 34 DISALLOW_COPY_AND_ASSIGN(MayAutoLock); |
| 35 }; |
| 36 |
| 37 } // namespace |
| 38 |
12 // ---------------------------------------------------------------------------- | 39 // ---------------------------------------------------------------------------- |
13 | 40 |
14 Connector::Connector(ScopedMessagePipeHandle message_pipe, | 41 Connector::Connector(ScopedMessagePipeHandle message_pipe, |
| 42 ConnectorConfig config, |
15 const MojoAsyncWaiter* waiter) | 43 const MojoAsyncWaiter* waiter) |
16 : waiter_(waiter), | 44 : waiter_(waiter), |
17 message_pipe_(message_pipe.Pass()), | 45 message_pipe_(message_pipe.Pass()), |
18 incoming_receiver_(nullptr), | 46 incoming_receiver_(nullptr), |
19 async_wait_id_(0), | 47 async_wait_id_(0), |
20 error_(false), | 48 error_(false), |
21 drop_writes_(false), | 49 drop_writes_(false), |
22 enforce_errors_from_incoming_receiver_(true), | 50 enforce_errors_from_incoming_receiver_(true), |
23 paused_(false), | 51 paused_(false), |
24 destroyed_flag_(nullptr) { | 52 destroyed_flag_(nullptr), |
| 53 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) { |
25 // Even though we don't have an incoming receiver, we still want to monitor | 54 // Even though we don't have an incoming receiver, we still want to monitor |
26 // the message pipe to know if is closed or encounters an error. | 55 // the message pipe to know if is closed or encounters an error. |
27 WaitToReadMore(); | 56 WaitToReadMore(); |
28 } | 57 } |
29 | 58 |
30 Connector::~Connector() { | 59 Connector::~Connector() { |
| 60 DCHECK(thread_checker_.CalledOnValidThread()); |
| 61 |
31 if (destroyed_flag_) | 62 if (destroyed_flag_) |
32 *destroyed_flag_ = true; | 63 *destroyed_flag_ = true; |
33 | 64 |
34 CancelWait(); | 65 CancelWait(); |
35 } | 66 } |
36 | 67 |
37 void Connector::CloseMessagePipe() { | 68 void Connector::CloseMessagePipe() { |
| 69 DCHECK(thread_checker_.CalledOnValidThread()); |
| 70 |
38 CancelWait(); | 71 CancelWait(); |
| 72 MayAutoLock locker(lock_.get()); |
39 Close(message_pipe_.Pass()); | 73 Close(message_pipe_.Pass()); |
40 } | 74 } |
41 | 75 |
42 ScopedMessagePipeHandle Connector::PassMessagePipe() { | 76 ScopedMessagePipeHandle Connector::PassMessagePipe() { |
| 77 DCHECK(thread_checker_.CalledOnValidThread()); |
| 78 |
43 CancelWait(); | 79 CancelWait(); |
| 80 MayAutoLock locker(lock_.get()); |
44 return message_pipe_.Pass(); | 81 return message_pipe_.Pass(); |
45 } | 82 } |
46 | 83 |
47 void Connector::RaiseError() { | 84 void Connector::RaiseError() { |
| 85 DCHECK(thread_checker_.CalledOnValidThread()); |
| 86 |
48 HandleError(true, true); | 87 HandleError(true, true); |
49 } | 88 } |
50 | 89 |
51 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { | 90 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { |
| 91 DCHECK(thread_checker_.CalledOnValidThread()); |
| 92 |
52 if (error_) | 93 if (error_) |
53 return false; | 94 return false; |
54 | 95 |
55 ResumeIncomingMethodCallProcessing(); | 96 ResumeIncomingMethodCallProcessing(); |
56 | 97 |
57 MojoResult rv = | 98 MojoResult rv = |
58 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); | 99 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); |
59 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED) | 100 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED) |
60 return false; | 101 return false; |
61 if (rv != MOJO_RESULT_OK) { | 102 if (rv != MOJO_RESULT_OK) { |
62 // Users that call WaitForIncomingMessage() should expect their code to be | 103 // Users that call WaitForIncomingMessage() should expect their code to be |
63 // re-entered, so we call the error handler synchronously. | 104 // re-entered, so we call the error handler synchronously. |
64 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); | 105 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); |
65 return false; | 106 return false; |
66 } | 107 } |
67 mojo_ignore_result(ReadSingleMessage(&rv)); | 108 mojo_ignore_result(ReadSingleMessage(&rv)); |
68 return (rv == MOJO_RESULT_OK); | 109 return (rv == MOJO_RESULT_OK); |
69 } | 110 } |
70 | 111 |
71 void Connector::PauseIncomingMethodCallProcessing() { | 112 void Connector::PauseIncomingMethodCallProcessing() { |
| 113 DCHECK(thread_checker_.CalledOnValidThread()); |
| 114 |
72 if (paused_) | 115 if (paused_) |
73 return; | 116 return; |
74 | 117 |
75 paused_ = true; | 118 paused_ = true; |
76 CancelWait(); | 119 CancelWait(); |
77 } | 120 } |
78 | 121 |
79 void Connector::ResumeIncomingMethodCallProcessing() { | 122 void Connector::ResumeIncomingMethodCallProcessing() { |
| 123 DCHECK(thread_checker_.CalledOnValidThread()); |
| 124 |
80 if (!paused_) | 125 if (!paused_) |
81 return; | 126 return; |
82 | 127 |
83 paused_ = false; | 128 paused_ = false; |
84 WaitToReadMore(); | 129 WaitToReadMore(); |
85 } | 130 } |
86 | 131 |
87 bool Connector::Accept(Message* message) { | 132 bool Connector::Accept(Message* message) { |
| 133 DCHECK(lock_ || thread_checker_.CalledOnValidThread()); |
| 134 |
| 135 // It shouldn't hurt even if |error_| may be changed by a different thread at |
| 136 // the same time. The outcome is that we may write into |message_pipe_| after |
| 137 // encountering an error, which should be fine. |
88 if (error_) | 138 if (error_) |
89 return false; | 139 return false; |
90 | 140 |
91 MOJO_CHECK(message_pipe_.is_valid()); | 141 MayAutoLock locker(lock_.get()); |
92 if (drop_writes_) | 142 |
| 143 if (!message_pipe_.is_valid() || drop_writes_) |
93 return true; | 144 return true; |
94 | 145 |
95 MojoResult rv = | 146 MojoResult rv = |
96 WriteMessageRaw(message_pipe_.get(), | 147 WriteMessageRaw(message_pipe_.get(), |
97 message->data(), | 148 message->data(), |
98 message->data_num_bytes(), | 149 message->data_num_bytes(), |
99 message->mutable_handles()->empty() | 150 message->mutable_handles()->empty() |
100 ? nullptr | 151 ? nullptr |
101 : reinterpret_cast<const MojoHandle*>( | 152 : reinterpret_cast<const MojoHandle*>( |
102 &message->mutable_handles()->front()), | 153 &message->mutable_handles()->front()), |
(...skipping 14 matching lines...) Expand all Loading... |
117 drop_writes_ = true; | 168 drop_writes_ = true; |
118 break; | 169 break; |
119 case MOJO_RESULT_BUSY: | 170 case MOJO_RESULT_BUSY: |
120 // We'd get a "busy" result if one of the message's handles is: | 171 // We'd get a "busy" result if one of the message's handles is: |
121 // - |message_pipe_|'s own handle; | 172 // - |message_pipe_|'s own handle; |
122 // - simultaneously being used on another thread; or | 173 // - simultaneously being used on another thread; or |
123 // - in a "busy" state that prohibits it from being transferred (e.g., | 174 // - in a "busy" state that prohibits it from being transferred (e.g., |
124 // a data pipe handle in the middle of a two-phase read/write, | 175 // a data pipe handle in the middle of a two-phase read/write, |
125 // regardless of which thread that two-phase read/write is happening | 176 // regardless of which thread that two-phase read/write is happening |
126 // on). | 177 // on). |
127 // TODO(vtl): I wonder if this should be a |MOJO_DCHECK()|. (But, until | 178 // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until |
128 // crbug.com/389666, etc. are resolved, this will make tests fail quickly | 179 // crbug.com/389666, etc. are resolved, this will make tests fail quickly |
129 // rather than hanging.) | 180 // rather than hanging.) |
130 MOJO_CHECK(false) << "Race condition or other bug detected"; | 181 CHECK(false) << "Race condition or other bug detected"; |
131 return false; | 182 return false; |
132 default: | 183 default: |
133 // This particular write was rejected, presumably because of bad input. | 184 // This particular write was rejected, presumably because of bad input. |
134 // The pipe is not necessarily in a bad state. | 185 // The pipe is not necessarily in a bad state. |
135 return false; | 186 return false; |
136 } | 187 } |
137 return true; | 188 return true; |
138 } | 189 } |
139 | 190 |
140 // static | 191 // static |
141 void Connector::CallOnHandleReady(void* closure, MojoResult result) { | 192 void Connector::CallOnHandleReady(void* closure, MojoResult result) { |
142 Connector* self = static_cast<Connector*>(closure); | 193 Connector* self = static_cast<Connector*>(closure); |
143 self->OnHandleReady(result); | 194 self->OnHandleReady(result); |
144 } | 195 } |
145 | 196 |
146 void Connector::OnHandleReady(MojoResult result) { | 197 void Connector::OnHandleReady(MojoResult result) { |
147 MOJO_CHECK(async_wait_id_ != 0); | 198 DCHECK(thread_checker_.CalledOnValidThread()); |
| 199 |
| 200 CHECK(async_wait_id_ != 0); |
148 async_wait_id_ = 0; | 201 async_wait_id_ = 0; |
149 if (result != MOJO_RESULT_OK) { | 202 if (result != MOJO_RESULT_OK) { |
150 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 203 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
151 return; | 204 return; |
152 } | 205 } |
153 ReadAllAvailableMessages(); | 206 ReadAllAvailableMessages(); |
154 // At this point, this object might have been deleted. Return. | 207 // At this point, this object might have been deleted. Return. |
155 } | 208 } |
156 | 209 |
157 void Connector::WaitToReadMore() { | 210 void Connector::WaitToReadMore() { |
158 MOJO_CHECK(!async_wait_id_); | 211 CHECK(!async_wait_id_); |
159 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), | 212 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), |
160 MOJO_HANDLE_SIGNAL_READABLE, | 213 MOJO_HANDLE_SIGNAL_READABLE, |
161 MOJO_DEADLINE_INDEFINITE, | 214 MOJO_DEADLINE_INDEFINITE, |
162 &Connector::CallOnHandleReady, | 215 &Connector::CallOnHandleReady, |
163 this); | 216 this); |
164 } | 217 } |
165 | 218 |
166 bool Connector::ReadSingleMessage(MojoResult* read_result) { | 219 bool Connector::ReadSingleMessage(MojoResult* read_result) { |
167 bool receiver_result = false; | 220 bool receiver_result = false; |
168 | 221 |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
230 force_pipe_reset = true; | 283 force_pipe_reset = true; |
231 | 284 |
232 if (paused_) { | 285 if (paused_) { |
233 // If the user has paused receiving messages, we shouldn't call the error | 286 // If the user has paused receiving messages, we shouldn't call the error |
234 // handler right away. We need to wait until the user starts receiving | 287 // handler right away. We need to wait until the user starts receiving |
235 // messages again. | 288 // messages again. |
236 force_async_handler = true; | 289 force_async_handler = true; |
237 } | 290 } |
238 | 291 |
239 if (force_pipe_reset) { | 292 if (force_pipe_reset) { |
240 CloseMessagePipe(); | 293 CancelWait(); |
| 294 MayAutoLock locker(lock_.get()); |
| 295 Close(message_pipe_.Pass()); |
241 MessagePipe dummy_pipe; | 296 MessagePipe dummy_pipe; |
242 message_pipe_ = dummy_pipe.handle0.Pass(); | 297 message_pipe_ = dummy_pipe.handle0.Pass(); |
243 } else { | 298 } else { |
244 CancelWait(); | 299 CancelWait(); |
245 } | 300 } |
246 | 301 |
247 if (force_async_handler) { | 302 if (force_async_handler) { |
248 // |dummy_pipe.handle1| has been destructed. Reading the pipe will | 303 // |dummy_pipe.handle1| has been destructed. Reading the pipe will |
249 // eventually cause a read error on |message_pipe_| and set error state. | 304 // eventually cause a read error on |message_pipe_| and set error state. |
250 if (!paused_) | 305 if (!paused_) |
251 WaitToReadMore(); | 306 WaitToReadMore(); |
252 } else { | 307 } else { |
253 error_ = true; | 308 error_ = true; |
254 connection_error_handler_.Run(); | 309 connection_error_handler_.Run(); |
255 } | 310 } |
256 } | 311 } |
257 | 312 |
258 } // namespace internal | 313 } // namespace internal |
259 } // namespace mojo | 314 } // namespace mojo |
OLD | NEW |