Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(48)

Side by Side Diff: mojo/public/cpp/system/simple_watcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/system/simple_watcher.h"
6
7 #include "base/bind.h"
8 #include "base/macros.h"
9 #include "base/memory/ptr_util.h"
10 #include "base/single_thread_task_runner.h"
11 #include "base/synchronization/lock.h"
12 #include "base/trace_event/heap_profiler.h"
13 #include "mojo/public/c/system/watcher.h"
14
15 namespace mojo {
16
17 // Thread-safe Context object used to dispatch watch notifications from a
18 // arbitrary threads.
19 class SimpleWatcher::Context : public base::RefCountedThreadSafe<Context> {
20 public:
21 // Creates a |Context| instance for a new watch on |watcher|, to watch
22 // |handle| for |signals|.
23 static scoped_refptr<Context> Create(
24 base::WeakPtr<SimpleWatcher> watcher,
25 scoped_refptr<base::SingleThreadTaskRunner> task_runner,
26 WatcherHandle watcher_handle,
27 Handle handle,
28 MojoHandleSignals signals,
29 MojoResult* watch_result) {
30 scoped_refptr<Context> context = new Context(watcher, task_runner);
31
32 // If MojoWatch succeeds, it assumes ownership of a reference to |context|.
33 // In that case, this reference is balanced in CallNotify() when |result| is
34 // |MOJO_RESULT_CANCELLED|.
35 context->AddRef();
36
37 *watch_result = MojoWatch(watcher_handle.value(), handle.value(), signals,
38 context->value());
39 if (*watch_result != MOJO_RESULT_OK) {
40 // Balanced by the AddRef() above since watching failed.
41 context->Release();
42 return nullptr;
43 }
44
45 return context;
46 }
47
48 static void CallNotify(uintptr_t context_value,
49 MojoResult result,
50 MojoHandleSignalsState signals_state,
51 MojoWatcherNotificationFlags flags) {
52 auto* context = reinterpret_cast<Context*>(context_value);
53 context->Notify(result, signals_state, flags);
54
55 // That was the last notification for the context. We can release the ref
56 // owned by the watch, which may in turn delete the Context.
57 if (result == MOJO_RESULT_CANCELLED)
58 context->Release();
59 }
60
61 uintptr_t value() const { return reinterpret_cast<uintptr_t>(this); }
62
63 void DisableCancellationNotifications() {
64 base::AutoLock lock(lock_);
65 enable_cancellation_notifications_ = false;
66 }
67
68 private:
69 friend class base::RefCountedThreadSafe<Context>;
70
71 Context(base::WeakPtr<SimpleWatcher> weak_watcher,
72 scoped_refptr<base::SingleThreadTaskRunner> task_runner)
73 : weak_watcher_(weak_watcher), task_runner_(task_runner) {}
74 ~Context() {}
75
76 void Notify(MojoResult result,
77 MojoHandleSignalsState signals_state,
78 MojoWatcherNotificationFlags flags) {
79 if (result == MOJO_RESULT_CANCELLED) {
80 // The SimpleWatcher may have explicitly cancelled this watch, so we don't
81 // bother dispatching the notification - it would be ignored anyway.
82 //
83 // TODO(rockot): This shouldn't really be necessary, but there are already
84 // instances today where bindings object may be bound and subsequently
85 // closed due to pipe error, all before the thread's TaskRunner has been
86 // properly initialized.
87 base::AutoLock lock(lock_);
88 if (!enable_cancellation_notifications_)
89 return;
90 }
91
92 if ((flags & MOJO_WATCHER_NOTIFICATION_FLAG_FROM_SYSTEM) &&
93 task_runner_->RunsTasksOnCurrentThread() && weak_watcher_ &&
94 weak_watcher_->is_default_task_runner_) {
95 // System notifications will trigger from the task runner passed to
96 // mojo::edk::InitIPCSupport(). In Chrome this happens to always be the
97 // default task runner for the IO thread.
98 weak_watcher_->OnHandleReady(make_scoped_refptr(this), result);
99 } else {
100 task_runner_->PostTask(
101 FROM_HERE, base::Bind(&SimpleWatcher::OnHandleReady, weak_watcher_,
102 make_scoped_refptr(this), result));
103 }
104 }
105
106 const base::WeakPtr<SimpleWatcher> weak_watcher_;
107 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
108
109 base::Lock lock_;
110 bool enable_cancellation_notifications_ = true;
111
112 DISALLOW_COPY_AND_ASSIGN(Context);
113 };
114
115 SimpleWatcher::SimpleWatcher(const tracked_objects::Location& from_here,
116 ArmingPolicy arming_policy,
117 scoped_refptr<base::SingleThreadTaskRunner> runner)
118 : arming_policy_(arming_policy),
119 task_runner_(std::move(runner)),
120 is_default_task_runner_(task_runner_ ==
121 base::ThreadTaskRunnerHandle::Get()),
122 heap_profiler_tag_(from_here.file_name()),
123 weak_factory_(this) {
124 MojoResult rv = CreateWatcher(&Context::CallNotify, &watcher_handle_);
125 DCHECK_EQ(MOJO_RESULT_OK, rv);
126 DCHECK(task_runner_->BelongsToCurrentThread());
127 }
128
129 SimpleWatcher::~SimpleWatcher() {
130 if (IsWatching())
131 Cancel();
132 }
133
134 bool SimpleWatcher::IsWatching() const {
135 DCHECK(thread_checker_.CalledOnValidThread());
136 return context_ != nullptr;
137 }
138
139 MojoResult SimpleWatcher::Watch(Handle handle,
140 MojoHandleSignals signals,
141 const ReadyCallback& callback) {
142 DCHECK(thread_checker_.CalledOnValidThread());
143 DCHECK(!IsWatching());
144 DCHECK(!callback.is_null());
145
146 callback_ = callback;
147 handle_ = handle;
148
149 MojoResult watch_result = MOJO_RESULT_UNKNOWN;
150 context_ =
151 Context::Create(weak_factory_.GetWeakPtr(), task_runner_,
152 watcher_handle_.get(), handle_, signals, &watch_result);
153 if (!context_) {
154 handle_.set_value(kInvalidHandleValue);
155 callback_.Reset();
156 DCHECK_EQ(MOJO_RESULT_INVALID_ARGUMENT, watch_result);
157 return watch_result;
158 }
159
160 if (arming_policy_ == ArmingPolicy::AUTOMATIC)
161 ArmOrNotify();
162
163 return MOJO_RESULT_OK;
164 }
165
166 void SimpleWatcher::Cancel() {
167 DCHECK(thread_checker_.CalledOnValidThread());
168
169 // The watcher may have already been cancelled if the handle was closed.
170 if (!context_)
171 return;
172
173 // Prevent the cancellation notification from being dispatched to
174 // OnHandleReady() when cancellation is explicit. See the note in the
175 // implementation of DisableCancellationNotifications() above.
176 context_->DisableCancellationNotifications();
177
178 handle_.set_value(kInvalidHandleValue);
179 callback_.Reset();
180
181 // Ensure |context_| is unset by the time we call MojoCancelWatch, as may
182 // re-enter the notification callback and we want to ensure |context_| is
183 // unset by then.
184 scoped_refptr<Context> context;
185 std::swap(context, context_);
186 MojoResult rv =
187 MojoCancelWatch(watcher_handle_.get().value(), context->value());
188
189 // It's possible this cancellation could race with a handle closure
190 // notification, in which case the watch may have already been implicitly
191 // cancelled.
192 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
193 }
194
195 MojoResult SimpleWatcher::Arm(MojoResult* ready_result) {
196 DCHECK(thread_checker_.CalledOnValidThread());
197 uint32_t num_ready_contexts = 1;
198 uintptr_t ready_context;
199 MojoResult local_ready_result;
200 MojoHandleSignalsState ready_state;
201 MojoResult rv =
202 MojoArmWatcher(watcher_handle_.get().value(), &num_ready_contexts,
203 &ready_context, &local_ready_result, &ready_state);
204 if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
205 DCHECK(context_);
206 DCHECK_EQ(1u, num_ready_contexts);
207 DCHECK_EQ(context_->value(), ready_context);
208 if (ready_result)
209 *ready_result = local_ready_result;
210 }
211
212 return rv;
213 }
214
215 void SimpleWatcher::ArmOrNotify() {
216 DCHECK(thread_checker_.CalledOnValidThread());
217
218 // Already cancelled, nothing to do.
219 if (!IsWatching())
220 return;
221
222 MojoResult ready_result;
223 MojoResult rv = Arm(&ready_result);
224 if (rv == MOJO_RESULT_OK)
225 return;
226
227 DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, rv);
228 task_runner_->PostTask(FROM_HERE, base::Bind(&SimpleWatcher::OnHandleReady,
229 weak_factory_.GetWeakPtr(),
230 context_, ready_result));
231 }
232
233 void SimpleWatcher::OnHandleReady(scoped_refptr<const Context> context,
234 MojoResult result) {
235 DCHECK(thread_checker_.CalledOnValidThread());
236
237 // This notification may be for a previously watched context, in which case
238 // we just ignore it.
239 if (context != context_)
240 return;
241
242 ReadyCallback callback = callback_;
243 if (result == MOJO_RESULT_CANCELLED) {
244 // Implicit cancellation due to someone closing the watched handle. We clear
245 // the SimppleWatcher's state before dispatching this.
246 context_ = nullptr;
247 handle_.set_value(kInvalidHandleValue);
248 callback_.Reset();
249 }
250
251 // NOTE: It's legal for |callback| to delete |this|.
252 if (!callback.is_null()) {
253 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION event(heap_profiler_tag_);
254
255 base::WeakPtr<SimpleWatcher> weak_self = weak_factory_.GetWeakPtr();
256 callback.Run(result);
257 if (!weak_self)
258 return;
259
260 if (unsatisfiable_)
261 return;
262
263 // Prevent |MOJO_RESULT_FAILED_PRECONDITION| task spam by only notifying
264 // at most once in AUTOMATIC arming mode.
265 if (result == MOJO_RESULT_FAILED_PRECONDITION)
266 unsatisfiable_ = true;
267
268 if (arming_policy_ == ArmingPolicy::AUTOMATIC && IsWatching())
269 ArmOrNotify();
270 }
271 }
272
273 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698