OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "remoting/base/plugin_thread_task_runner.h" | |
6 | |
7 #include "base/bind.h" | |
8 | |
9 namespace { | |
10 | |
11 base::TimeDelta CalcTimeDelta(base::TimeTicks when) { | |
12 return std::max(when - base::TimeTicks::Now(), base::TimeDelta()); | |
13 } | |
14 | |
15 } // namespace | |
16 | |
17 namespace remoting { | |
18 | |
19 PluginThreadTaskRunner::Delegate::~Delegate() { | |
20 } | |
21 | |
22 PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate) | |
23 : plugin_thread_id_(base::PlatformThread::CurrentId()), | |
24 event_(false, false), | |
25 delegate_(delegate), | |
26 next_sequence_num_(0), | |
27 quit_received_(false), | |
28 stopped_(false) { | |
29 } | |
30 | |
31 PluginThreadTaskRunner::~PluginThreadTaskRunner() { | |
32 DCHECK(delegate_ == NULL); | |
33 DCHECK(stopped_); | |
34 } | |
35 | |
36 void PluginThreadTaskRunner::DetachAndRunShutdownLoop() { | |
37 DCHECK(BelongsToCurrentThread()); | |
38 | |
39 // Detach from the plugin thread and redirect all tasks posted after this | |
40 // point to the shutdown task loop. | |
41 { | |
42 base::AutoLock auto_lock(lock_); | |
43 | |
44 DCHECK(delegate_ != NULL); | |
45 DCHECK(!stopped_); | |
46 | |
47 delegate_ = NULL; | |
48 stopped_ = quit_received_; | |
49 } | |
50 | |
51 // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled | |
52 // timers are cancelled. It is OK to clear |scheduled_timers_| even if | |
53 // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is | |
54 // called before NPP_Destroy()). | |
55 scheduled_timers_.clear(); | |
56 | |
57 // Run all tasks that are due. | |
58 ProcessIncomingTasks(); | |
59 RunDueTasks(base::TimeTicks::Now()); | |
60 | |
61 while (!stopped_) { | |
62 if (delayed_queue_.empty()) { | |
63 event_.Wait(); | |
64 } else { | |
65 event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time)); | |
66 } | |
67 | |
68 // Run all tasks that are due. | |
69 ProcessIncomingTasks(); | |
70 RunDueTasks(base::TimeTicks::Now()); | |
71 | |
72 base::AutoLock auto_lock(lock_); | |
73 stopped_ = quit_received_; | |
74 } | |
75 } | |
76 | |
77 void PluginThreadTaskRunner::Quit() { | |
78 base::AutoLock auto_lock(lock_); | |
79 | |
80 if (!quit_received_) { | |
81 quit_received_ = true; | |
82 event_.Signal(); | |
83 } | |
84 } | |
85 | |
86 bool PluginThreadTaskRunner::PostDelayedTask( | |
87 const tracked_objects::Location& from_here, | |
88 const base::Closure& task, | |
89 base::TimeDelta delay) { | |
90 | |
91 // Wrap the task into |base::PendingTask|. | |
92 base::TimeTicks delayed_run_time; | |
93 if (delay > base::TimeDelta()) { | |
94 delayed_run_time = base::TimeTicks::Now() + delay; | |
95 } else { | |
96 DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative"; | |
97 } | |
98 | |
99 base::PendingTask pending_task(from_here, task, delayed_run_time, false); | |
100 | |
101 // Push the task to the incoming queue. | |
102 base::AutoLock locked(lock_); | |
103 | |
104 // Initialize the sequence number. The sequence number provides FIFO ordering | |
105 // for tasks with the same |delayed_run_time|. | |
106 pending_task.sequence_num = next_sequence_num_++; | |
107 | |
108 // Post an asynchronous call on the plugin thread to process the task. | |
109 if (incoming_queue_.empty()) { | |
110 PostRunTasks(); | |
111 } | |
112 | |
113 incoming_queue_.push(pending_task); | |
114 pending_task.task.Reset(); | |
115 | |
116 // No tasks should be posted after Quit() has been called. | |
117 DCHECK(!quit_received_); | |
118 return true; | |
119 } | |
120 | |
121 bool PluginThreadTaskRunner::PostNonNestableDelayedTask( | |
122 const tracked_objects::Location& from_here, | |
123 const base::Closure& task, | |
124 base::TimeDelta delay) { | |
125 // All tasks running on this task loop are non-nestable. | |
126 return PostDelayedTask(from_here, task, delay); | |
127 } | |
128 | |
129 bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const { | |
130 // In pepper plugins ideally we should use pp::Core::IsMainThread, | |
131 // but it is problematic because we would need to keep reference to | |
132 // Core somewhere, e.g. make the delegate ref-counted. | |
133 return base::PlatformThread::CurrentId() == plugin_thread_id_; | |
134 } | |
135 | |
136 void PluginThreadTaskRunner::PostRunTasks() { | |
137 // Post tasks to the plugin thread when it is availabe or spin the shutdown | |
138 // task loop. | |
139 if (delegate_ != NULL) { | |
140 base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this); | |
141 delegate_->RunOnPluginThread( | |
142 base::TimeDelta(), | |
143 &PluginThreadTaskRunner::TaskSpringboard, | |
144 new base::Closure(closure)); | |
145 } else { | |
146 event_.Signal(); | |
147 } | |
148 } | |
149 | |
150 void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) { | |
151 DCHECK(BelongsToCurrentThread()); | |
152 | |
153 // |delegate_| is updated from the plugin thread only, so it is safe to access | |
154 // it here without taking the lock. | |
155 if (delegate_ != NULL) { | |
156 // Schedule RunDelayedTasks() to be called at |when| if it hasn't been | |
157 // scheduled already. | |
158 if (scheduled_timers_.insert(when).second) { | |
159 base::TimeDelta delay = CalcTimeDelta(when); | |
160 base::Closure closure = | |
161 base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when); | |
162 delegate_->RunOnPluginThread( | |
163 delay, | |
164 &PluginThreadTaskRunner::TaskSpringboard, | |
165 new base::Closure(closure)); | |
166 } | |
167 } else { | |
168 // Spin the shutdown loop if the task runner has already been detached. | |
169 // The shutdown loop will pick the tasks to run itself. | |
170 event_.Signal(); | |
171 } | |
172 } | |
173 | |
174 void PluginThreadTaskRunner::ProcessIncomingTasks() { | |
175 DCHECK(BelongsToCurrentThread()); | |
176 | |
177 // Grab all unsorted tasks accomulated so far. | |
178 base::TaskQueue work_queue; | |
179 { | |
180 base::AutoLock locked(lock_); | |
181 incoming_queue_.Swap(&work_queue); | |
182 } | |
183 | |
184 while (!work_queue.empty()) { | |
185 base::PendingTask pending_task = work_queue.front(); | |
186 work_queue.pop(); | |
187 | |
188 if (pending_task.delayed_run_time.is_null()) { | |
189 pending_task.task.Run(); | |
190 } else { | |
191 delayed_queue_.push(pending_task); | |
192 } | |
193 } | |
194 } | |
195 | |
196 void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) { | |
197 DCHECK(BelongsToCurrentThread()); | |
198 | |
199 scheduled_timers_.erase(when); | |
200 | |
201 // |stopped_| is updated by the plugin thread only, so it is safe to access | |
202 // it here without taking the lock. | |
203 if (!stopped_) { | |
204 ProcessIncomingTasks(); | |
205 RunDueTasks(base::TimeTicks::Now()); | |
206 } | |
207 } | |
208 | |
209 void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) { | |
210 DCHECK(BelongsToCurrentThread()); | |
211 | |
212 // Run all due tasks. | |
213 while (!delayed_queue_.empty() && | |
214 delayed_queue_.top().delayed_run_time <= now) { | |
215 delayed_queue_.top().task.Run(); | |
216 delayed_queue_.pop(); | |
217 } | |
218 | |
219 // Post a delayed asynchronous call to the plugin thread to process tasks from | |
220 // the delayed queue. | |
221 if (!delayed_queue_.empty()) { | |
222 base::TimeTicks when = delayed_queue_.top().delayed_run_time; | |
223 if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) { | |
224 PostDelayedRunTasks(when); | |
225 } | |
226 } | |
227 } | |
228 | |
229 void PluginThreadTaskRunner::RunTasks() { | |
230 DCHECK(BelongsToCurrentThread()); | |
231 | |
232 // |stopped_| is updated by the plugin thread only, so it is safe to access | |
233 // it here without taking the lock. | |
234 if (!stopped_) { | |
235 ProcessIncomingTasks(); | |
236 RunDueTasks(base::TimeTicks::Now()); | |
237 } | |
238 } | |
239 | |
240 // static | |
241 void PluginThreadTaskRunner::TaskSpringboard(void* data) { | |
242 base::Closure* task = reinterpret_cast<base::Closure*>(data); | |
243 task->Run(); | |
244 delete task; | |
245 } | |
246 | |
247 } // namespace remoting | |
OLD | NEW |