OLD | NEW |
(Empty) | |
| 1 // Copyright 2017 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/public/cpp/system/wait_set.h" |
| 6 |
| 7 #include <algorithm> |
| 8 #include <limits> |
| 9 #include <map> |
| 10 #include <vector> |
| 11 |
| 12 #include "base/containers/stack_container.h" |
| 13 #include "base/logging.h" |
| 14 #include "base/macros.h" |
| 15 #include "base/memory/ptr_util.h" |
| 16 #include "base/synchronization/lock.h" |
| 17 #include "base/synchronization/waitable_event.h" |
| 18 #include "mojo/public/cpp/system/watcher.h" |
| 19 |
| 20 namespace mojo { |
| 21 |
| 22 class WaitSet::State : public base::RefCountedThreadSafe<State> { |
| 23 public: |
| 24 State() |
| 25 : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| 26 base::WaitableEvent::InitialState::NOT_SIGNALED) { |
| 27 MojoResult rv = CreateWatcher(&Context::OnNotification, &watcher_handle_); |
| 28 DCHECK_EQ(MOJO_RESULT_OK, rv); |
| 29 } |
| 30 |
| 31 void ShutDown() { |
| 32 // NOTE: This may immediately invoke Notify for every context. |
| 33 watcher_handle_.reset(); |
| 34 } |
| 35 |
| 36 MojoResult AddHandle(Handle handle, MojoHandleSignals signals) { |
| 37 DCHECK(watcher_handle_.is_valid()); |
| 38 |
| 39 scoped_refptr<Context> context = new Context(this, handle); |
| 40 |
| 41 { |
| 42 base::AutoLock lock(lock_); |
| 43 |
| 44 if (handle_to_context_.count(handle)) |
| 45 return MOJO_RESULT_ALREADY_EXISTS; |
| 46 DCHECK(!contexts_.count(context->context_value())); |
| 47 |
| 48 handle_to_context_[handle] = context; |
| 49 contexts_[context->context_value()] = context; |
| 50 } |
| 51 |
| 52 // Balanced in State::Notify() with MOJO_RESULT_CANCELLED if |
| 53 // MojoWatch() succeeds. Otherwise balanced immediately below. |
| 54 context->AddRef(); |
| 55 |
| 56 // This can notify immediately if the watcher is already armed. Don't hold |
| 57 // |lock_| while calling it. |
| 58 MojoResult rv = MojoWatch(watcher_handle_.get().value(), handle.value(), |
| 59 signals, context->context_value()); |
| 60 if (rv == MOJO_RESULT_INVALID_ARGUMENT) { |
| 61 base::AutoLock lock(lock_); |
| 62 handle_to_context_.erase(handle); |
| 63 contexts_.erase(context->context_value()); |
| 64 |
| 65 // Balanced above. |
| 66 context->Release(); |
| 67 return rv; |
| 68 } |
| 69 DCHECK_EQ(MOJO_RESULT_OK, rv); |
| 70 |
| 71 return rv; |
| 72 } |
| 73 |
| 74 MojoResult RemoveHandle(Handle handle) { |
| 75 DCHECK(watcher_handle_.is_valid()); |
| 76 |
| 77 scoped_refptr<Context> context; |
| 78 { |
| 79 base::AutoLock lock(lock_); |
| 80 auto it = handle_to_context_.find(handle); |
| 81 if (it == handle_to_context_.end()) |
| 82 return MOJO_RESULT_NOT_FOUND; |
| 83 |
| 84 context = std::move(it->second); |
| 85 handle_to_context_.erase(it); |
| 86 |
| 87 // Ensure that we never return this handle as a ready result again. Note |
| 88 // that it's removal from |handle_to_context_| above ensures it will never |
| 89 // be added back to this map. |
| 90 ready_handles_.erase(handle); |
| 91 } |
| 92 |
| 93 // NOTE: This may enter the notification callback immediately, so don't hold |
| 94 // |lock_| while calling it. |
| 95 MojoResult rv = MojoCancelWatch(watcher_handle_.get().value(), |
| 96 context->context_value()); |
| 97 |
| 98 // We don't really care whether or not this succeeds. In either case, the |
| 99 // context was or will imminently be cancelled and moved from |contexts_| |
| 100 // to |cancelled_contexts_|. |
| 101 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND); |
| 102 |
| 103 { |
| 104 // Always clear |cancelled_contexts_| in case it's accumulated any more |
| 105 // entries since the last time we ran. |
| 106 base::AutoLock lock(lock_); |
| 107 cancelled_contexts_.clear(); |
| 108 } |
| 109 |
| 110 return rv; |
| 111 } |
| 112 |
| 113 void Wait(size_t* num_ready_handles, |
| 114 Handle* ready_handles, |
| 115 MojoResult* ready_results, |
| 116 MojoHandleSignalsState* signals_states) { |
| 117 DCHECK(watcher_handle_.is_valid()); |
| 118 DCHECK(num_ready_handles); |
| 119 DCHECK(ready_handles); |
| 120 DCHECK(ready_results); |
| 121 bool should_wait = false; |
| 122 { |
| 123 base::AutoLock lock(lock_); |
| 124 if (ready_handles_.empty()) { |
| 125 // No handles are currently in the ready set. Make sure the event is |
| 126 // reset and try to arm the watcher. |
| 127 handle_event_.Reset(); |
| 128 |
| 129 DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max()); |
| 130 uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles); |
| 131 base::StackVector<uintptr_t, 4> ready_contexts; |
| 132 ready_contexts.container().resize(num_ready_contexts); |
| 133 base::StackVector<MojoHandleSignalsState, 4> ready_states; |
| 134 MojoHandleSignalsState* out_states = signals_states; |
| 135 if (!out_states) { |
| 136 // If the caller didn't provide a buffer for signal states, we provide |
| 137 // our own locally. MojoArmWatcher() requires one if we want to handle |
| 138 // arming failure properly. |
| 139 ready_states.container().resize(num_ready_contexts); |
| 140 out_states = ready_states.container().data(); |
| 141 } |
| 142 MojoResult rv = MojoArmWatcher( |
| 143 watcher_handle_.get().value(), &num_ready_contexts, |
| 144 ready_contexts.container().data(), ready_results, out_states); |
| 145 |
| 146 if (rv == MOJO_RESULT_FAILED_PRECONDITION) { |
| 147 // Can't arm because one or more handles is already ready. |
| 148 *num_ready_handles = num_ready_contexts; |
| 149 for (size_t i = 0; i < num_ready_contexts; ++i) { |
| 150 auto it = contexts_.find(ready_contexts.container()[i]); |
| 151 DCHECK(it != contexts_.end()); |
| 152 ready_handles[i] = it->second->handle(); |
| 153 } |
| 154 return; |
| 155 } |
| 156 |
| 157 if (rv == MOJO_RESULT_NOT_FOUND) { |
| 158 // There are no handles in the set. Nothing to watch. |
| 159 *num_ready_handles = 0; |
| 160 return; |
| 161 } |
| 162 |
| 163 // Watcher is armed. We can go on waiting for an event to signal. |
| 164 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS); |
| 165 should_wait = true; |
| 166 } |
| 167 } |
| 168 |
| 169 if (should_wait) |
| 170 handle_event_.Wait(); |
| 171 |
| 172 base::AutoLock lock(lock_); |
| 173 |
| 174 DCHECK(!ready_handles_.empty()); |
| 175 |
| 176 // Pop as many handles as we can out of the ready set and return them. |
| 177 *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size()); |
| 178 for (size_t i = 0; i < *num_ready_handles; ++i) { |
| 179 auto it = ready_handles_.begin(); |
| 180 ready_handles[i] = it->first; |
| 181 ready_results[i] = it->second.result; |
| 182 if (signals_states) |
| 183 signals_states[i] = it->second.signals_state; |
| 184 ready_handles_.erase(it); |
| 185 } |
| 186 } |
| 187 |
| 188 private: |
| 189 friend class base::RefCountedThreadSafe<State>; |
| 190 |
| 191 class Context : public base::RefCountedThreadSafe<Context> { |
| 192 public: |
| 193 Context(scoped_refptr<State> state, Handle handle) |
| 194 : state_(state), handle_(handle) {} |
| 195 |
| 196 Handle handle() const { return handle_; } |
| 197 |
| 198 uintptr_t context_value() const { |
| 199 return reinterpret_cast<uintptr_t>(this); |
| 200 } |
| 201 |
| 202 static void OnNotification(uintptr_t context, |
| 203 MojoResult result, |
| 204 MojoHandleSignalsState signals_state, |
| 205 MojoWatcherNotificationFlags flags) { |
| 206 reinterpret_cast<Context*>(context)->Notify(result, signals_state); |
| 207 } |
| 208 |
| 209 private: |
| 210 friend class base::RefCountedThreadSafe<Context>; |
| 211 |
| 212 ~Context() {} |
| 213 |
| 214 void Notify(MojoResult result, MojoHandleSignalsState signals_state) { |
| 215 state_->Notify(handle_, result, signals_state, this); |
| 216 } |
| 217 |
| 218 const scoped_refptr<State> state_; |
| 219 const Handle handle_; |
| 220 |
| 221 DISALLOW_COPY_AND_ASSIGN(Context); |
| 222 }; |
| 223 |
| 224 ~State() {} |
| 225 |
| 226 void Notify(Handle handle, |
| 227 MojoResult result, |
| 228 MojoHandleSignalsState signals_state, |
| 229 Context* context) { |
| 230 base::AutoLock lock(lock_); |
| 231 |
| 232 // This could be a cancellation notification following an explicit |
| 233 // RemoveHandle(), in which case we really don't care and don't want to |
| 234 // add it to the ready set. Only update and signal if that's not the case. |
| 235 if (!handle_to_context_.count(handle)) { |
| 236 DCHECK_EQ(MOJO_RESULT_CANCELLED, result); |
| 237 } else { |
| 238 ready_handles_[handle] = {result, signals_state}; |
| 239 handle_event_.Signal(); |
| 240 } |
| 241 |
| 242 // Whether it's an implicit or explicit cancellation, erase from |contexts_| |
| 243 // and append to |cancelled_contexts_|. |
| 244 if (result == MOJO_RESULT_CANCELLED) { |
| 245 contexts_.erase(context->context_value()); |
| 246 handle_to_context_.erase(handle); |
| 247 |
| 248 // NOTE: We retain a context ref in |cancelled_contexts_| to ensure that |
| 249 // this Context's heap address is not reused too soon. For example, it |
| 250 // would otherwise be possible for the user to call AddHandle() from the |
| 251 // WaitSet's thread immediately after this notification has fired on |
| 252 // another thread, potentially reusing the same heap address for the newly |
| 253 // added Context; and then they may call RemoveHandle() for this handle |
| 254 // (not knowing its context has just been implicitly cancelled) and |
| 255 // cause the new Context to be incorrectly removed from |contexts_|. |
| 256 // |
| 257 // This vector is cleared on the WaitSet's own thread every time |
| 258 // RemoveHandle is called. |
| 259 cancelled_contexts_.emplace_back(make_scoped_refptr(context)); |
| 260 |
| 261 // Balanced in State::AddHandle(). |
| 262 context->Release(); |
| 263 } |
| 264 } |
| 265 |
| 266 struct ReadyState { |
| 267 ReadyState() = default; |
| 268 ReadyState(MojoResult result, MojoHandleSignalsState signals_state) |
| 269 : result(result), signals_state(signals_state) {} |
| 270 ~ReadyState() = default; |
| 271 |
| 272 MojoResult result = MOJO_RESULT_UNKNOWN; |
| 273 MojoHandleSignalsState signals_state = {0, 0}; |
| 274 }; |
| 275 |
| 276 // Not guarded by lock. Must only be accessed from the WaitSet's owning |
| 277 // thread. |
| 278 ScopedWatcherHandle watcher_handle_; |
| 279 |
| 280 base::Lock lock_; |
| 281 std::map<uintptr_t, scoped_refptr<Context>> contexts_; |
| 282 std::map<Handle, scoped_refptr<Context>> handle_to_context_; |
| 283 std::map<Handle, ReadyState> ready_handles_; |
| 284 std::vector<scoped_refptr<Context>> cancelled_contexts_; |
| 285 |
| 286 // Event signaled any time a handle notification is received. |
| 287 base::WaitableEvent handle_event_; |
| 288 |
| 289 DISALLOW_COPY_AND_ASSIGN(State); |
| 290 }; |
| 291 |
| 292 WaitSet::WaitSet() : state_(new State) {} |
| 293 |
| 294 WaitSet::~WaitSet() { |
| 295 state_->ShutDown(); |
| 296 } |
| 297 |
| 298 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) { |
| 299 return state_->AddHandle(handle, signals); |
| 300 } |
| 301 |
| 302 MojoResult WaitSet::RemoveHandle(Handle handle) { |
| 303 return state_->RemoveHandle(handle); |
| 304 } |
| 305 |
| 306 void WaitSet::Wait(size_t* num_ready_handles, |
| 307 Handle* ready_handles, |
| 308 MojoResult* ready_results, |
| 309 MojoHandleSignalsState* signals_states) { |
| 310 state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states); |
| 311 } |
| 312 |
| 313 } // namespace mojo |
OLD | NEW |