Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(959)

Side by Side Diff: third_party/APScheduler/apscheduler/scheduler.py

Issue 6673078: Initial checkin of media test matrix class, which will be used for media perf test later. (Closed) Base URL: http://git.chromium.org/git/chromium.git@trunk
Patch Set: Put APScheduler in third_party directory for Media Performance test. Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 """
2 This module is the main part of the library, and is the only module that
3 regular users should be concerned with.
4 """
5 from threading import Thread, Event, Lock
6 from datetime import datetime, timedelta
7 from logging import getLogger
8 import os
9
10 from apscheduler.util import time_difference, asbool
11 from apscheduler.triggers import DateTrigger, IntervalTrigger, CronTrigger
12
13
14 logger = getLogger(__name__)
15
16
17 class Job(object):
18 """
19 Represents a task scheduled in the scheduler.
20 """
21
22 def __init__(self, trigger, func, args, kwargs):
23 self.thread = None
24 self.trigger = trigger
25 self.func = func
26 self.args = args
27 self.kwargs = kwargs
28 if hasattr(func, '__name__'):
29 self.name = func.__name__
30 else:
31 self.name = str(func)
32
33 def run(self):
34 """
35 Starts the execution of this job in a separate thread.
36 """
37 if (self.thread and self.thread.isAlive()):
38 logger.info('Skipping run of job %s (previously triggered '
39 'instance is still running)', self)
40 else:
41 self.thread = Thread(target=self.run_in_thread)
42 self.thread.setDaemon(False)
43 self.thread.start()
44
45 def run_in_thread(self):
46 """
47 Runs the associated callable.
48 This method is executed in a dedicated thread.
49 """
50 try:
51 self.func(*self.args, **self.kwargs)
52 except:
53 logger.exception('Error executing job "%s"', self)
54 raise
55
56 def __str__(self):
57 return '%s: %s' % (self.name, repr(self.trigger))
58
59 def __repr__(self):
60 return '%s(%s, %s)' % (self.__class__.__name__, self.name,
61 repr(self.trigger))
62
63
64 class SchedulerShutdownError(Exception):
65 """
66 Thrown when attempting to use the scheduler after
67 it's been shut down.
68 """
69
70 def __init__(self):
71 Exception.__init__(self, 'Scheduler has already been shut down')
72
73
74 class SchedulerAlreadyRunningError(Exception):
75 """
76 Thrown when attempting to start the scheduler, but it's already running.
77 """
78
79 def __init__(self):
80 Exception.__init__(self, 'Scheduler is already running')
81
82
83 class Scheduler(object):
84 """
85 This class is responsible for scheduling jobs and triggering
86 their execution.
87 """
88
89 stopped = False
90 thread = None
91 misfire_grace_time = 1
92 daemonic = True
93
94 def __init__(self, **config):
95 self.jobs = []
96 self.jobs_lock = Lock()
97 self.wakeup = Event()
98 self.configure(config)
99
100 def configure(self, config):
101 """
102 Updates the configuration with the given options.
103 """
104 for key, val in config.items():
105 if key.startswith('apscheduler.'):
106 key = key[12:]
107 if key == 'misfire_grace_time':
108 self.misfire_grace_time = int(val)
109 elif key == 'daemonic':
110 self.daemonic = asbool(val)
111
112 def start(self):
113 """
114 Starts the scheduler in a new thread.
115 """
116 if self.thread and self.thread.isAlive():
117 raise SchedulerAlreadyRunningError
118
119 self.stopped = False
120 self.thread = Thread(target=self.run, name='APScheduler')
121 self.thread.setDaemon(self.daemonic)
122 self.thread.start()
123 logger.info('Scheduler started')
124
125 def shutdown(self, timeout=0):
126 """
127 Shuts down the scheduler and terminates the thread.
128 Does not terminate any currently running jobs.
129
130 :param timeout: time (in seconds) to wait for the scheduler thread to
131 terminate, 0 to wait forever, None to skip waiting
132 """
133 if self.stopped or not self.thread.isAlive():
134 return
135
136 logger.info('Scheduler shutting down')
137 self.stopped = True
138 self.wakeup.set()
139 if timeout is not None:
140 self.thread.join(timeout)
141 self.jobs = []
142
143 def cron_schedule(self, year='*', month='*', day='*', week='*',
144 day_of_week='*', hour='*', minute='*', second='*',
145 args=None, kwargs=None):
146 """
147 Decorator that causes its host function to be scheduled
148 according to the given parameters.
149 This decorator does not wrap its host function.
150 The scheduled function will be called without any arguments.
151 See :meth:`add_cron_job` for more information.
152 """
153 def inner(func):
154 self.add_cron_job(func, year, month, day, week, day_of_week, hour,
155 minute, second, args, kwargs)
156 return func
157 return inner
158
159 def interval_schedule(self, weeks=0, days=0, hours=0, minutes=0, seconds=0,
160 start_date=None, repeat=0, args=None, kwargs=None):
161 """
162 Decorator that causes its host function to be scheduled
163 for execution on specified intervals.
164 This decorator does not wrap its host function.
165 The scheduled function will be called without any arguments.
166 Note that the default repeat value is 0, which means to repeat forever.
167 See :meth:`add_delayed_job` for more information.
168 """
169 def inner(func):
170 self.add_interval_job(func, weeks, days, hours, minutes, seconds,
171 start_date, repeat, args, kwargs)
172 return func
173 return inner
174
175 def _add_job(self, trigger, func, args, kwargs):
176 """
177 Adds a Job to the job list and notifies the scheduler thread.
178
179 :param trigger: trigger for the given callable
180 :param args: list of positional arguments to call func with
181 :param kwargs: dict of keyword arguments to call func with
182 :return: the scheduled job
183 :rtype: Job
184 """
185 if self.stopped:
186 raise SchedulerShutdownError
187 if not hasattr(func, '__call__'):
188 raise TypeError('func must be callable')
189
190 if args is None:
191 args = []
192 if kwargs is None:
193 kwargs = {}
194
195 job = Job(trigger, func, args, kwargs)
196 self.jobs_lock.acquire()
197 try:
198 self.jobs.append(job)
199 finally:
200 self.jobs_lock.release()
201 logger.info('Added job "%s"', job)
202
203 # Notify the scheduler about the new job
204 self.wakeup.set()
205
206 return job
207
208 def add_date_job(self, func, date, args=None, kwargs=None):
209 """
210 Adds a job to be completed on a specific date and time.
211
212 :param func: callable to run
213 :param args: positional arguments to call func with
214 :param kwargs: keyword arguments to call func with
215 """
216 trigger = DateTrigger(date)
217 return self._add_job(trigger, func, args, kwargs)
218
219 def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0,
220 seconds=0, start_date=None, repeat=0, args=None,
221 kwargs=None):
222 """
223 Adds a job to be completed on specified intervals.
224
225 :param func: callable to run
226 :param weeks: number of weeks to wait
227 :param days: number of days to wait
228 :param hours: number of hours to wait
229 :param minutes: number of minutes to wait
230 :param seconds: number of seconds to wait
231 :param start_date: when to first execute the job and start the
232 counter (default is after the given interval)
233 :param repeat: number of times the job will be run (0 = repeat
234 indefinitely)
235 :param args: list of positional arguments to call func with
236 :param kwargs: dict of keyword arguments to call func with
237 """
238 interval = timedelta(weeks=weeks, days=days, hours=hours,
239 minutes=minutes, seconds=seconds)
240 trigger = IntervalTrigger(interval, repeat, start_date)
241 return self._add_job(trigger, func, args, kwargs)
242
243 def add_cron_job(self, func, year='*', month='*', day='*', week='*',
244 day_of_week='*', hour='*', minute='*', second='*',
245 args=None, kwargs=None):
246 """
247 Adds a job to be completed on times that match the given expressions.
248
249 :param func: callable to run
250 :param year: year to run on
251 :param month: month to run on (0 = January)
252 :param day: day of month to run on
253 :param week: week of the year to run on
254 :param day_of_week: weekday to run on (0 = Monday)
255 :param hour: hour to run on
256 :param second: second to run on
257 :param args: list of positional arguments to call func with
258 :param kwargs: dict of keyword arguments to call func with
259 :return: the scheduled job
260 :rtype: Job
261 """
262 trigger = CronTrigger(year=year, month=month, day=day, week=week,
263 day_of_week=day_of_week, hour=hour,
264 minute=minute, second=second)
265 return self._add_job(trigger, func, args, kwargs)
266
267 def is_job_active(self, job):
268 """
269 Determines if the given job is still on the job list.
270
271 :return: True if the job is still active, False if not
272 """
273 self.jobs_lock.acquire()
274 try:
275 return job in self.jobs
276 finally:
277 self.jobs_lock.release()
278
279 def unschedule_job(self, job):
280 """
281 Removes a job, preventing it from being fired any more.
282 """
283 self.jobs_lock.acquire()
284 try:
285 self.jobs.remove(job)
286 finally:
287 self.jobs_lock.release()
288 logger.info('Removed job "%s"', job)
289 self.wakeup.set()
290
291 def unschedule_func(self, func):
292 """
293 Removes all jobs that would execute the given function.
294 """
295 self.jobs_lock.acquire()
296 try:
297 remove_list = [job for job in self.jobs if job.func == func]
298 for job in remove_list:
299 self.jobs.remove(job)
300 logger.info('Removed job "%s"', job)
301 finally:
302 self.jobs_lock.release()
303
304 # Have the scheduler calculate a new wakeup time
305 self.wakeup.set()
306
307 def dump_jobs(self):
308 """
309 Gives a textual listing of all jobs currently scheduled on this
310 scheduler.
311
312 :rtype: str
313 """
314 job_strs = []
315 now = datetime.now()
316 self.jobs_lock.acquire()
317 try:
318 for job in self.jobs:
319 next_fire_time = job.trigger.get_next_fire_time(now)
320 job_str = '%s (next fire time: %s)' % (str(job),
321 next_fire_time)
322 job_strs.append(job_str)
323 finally:
324 self.jobs_lock.release()
325
326 if job_strs:
327 return os.linesep.join(job_strs)
328 return 'No jobs currently scheduled.'
329
330 def _get_next_wakeup_time(self, now):
331 """
332 Determines the time of the next job execution, and removes finished
333 jobs.
334
335 :param now: the result of datetime.now(), generated elsewhere for
336 consistency.
337 """
338 next_wakeup = None
339 finished_jobs = []
340
341 self.jobs_lock.acquire()
342 try:
343 for job in self.jobs:
344 next_run = job.trigger.get_next_fire_time(now)
345 if next_run is None:
346 finished_jobs.append(job)
347 elif next_run and (next_wakeup is None or \
348 next_run < next_wakeup):
349 next_wakeup = next_run
350
351 # Clear out any finished jobs
352 for job in finished_jobs:
353 self.jobs.remove(job)
354 logger.info('Removed finished job "%s"', job)
355 finally:
356 self.jobs_lock.release()
357
358 return next_wakeup
359
360 def _get_current_jobs(self):
361 """
362 Determines which jobs should be executed right now.
363 """
364 current_jobs = []
365 now = datetime.now()
366 start = now - timedelta(seconds=self.misfire_grace_time)
367
368 self.jobs_lock.acquire()
369 try:
370 for job in self.jobs:
371 next_run = job.trigger.get_next_fire_time(start)
372 if next_run:
373 time_diff = time_difference(now, next_run)
374 if next_run < now and time_diff <= self.misfire_grace_time:
375 current_jobs.append(job)
376 finally:
377 self.jobs_lock.release()
378
379 return current_jobs
380
381 def run(self):
382 """
383 Runs the main loop of the scheduler.
384 """
385 self.wakeup.clear()
386 while not self.stopped:
387 # Execute any jobs scheduled to be run right now
388 for job in self._get_current_jobs():
389 logger.debug('Executing job "%s"', job)
390 job.run()
391
392 # Figure out when the next job should be run, and
393 # adjust the wait time accordingly
394 now = datetime.now()
395 next_wakeup_time = self._get_next_wakeup_time(now)
396
397 # Sleep until the next job is scheduled to be run,
398 # or a new job is added, or the scheduler is stopped
399 if next_wakeup_time is not None:
400 wait_seconds = time_difference(next_wakeup_time, now)
401 logger.debug('Next wakeup is due at %s (in %f seconds)',
402 next_wakeup_time, wait_seconds)
403 self.wakeup.wait(wait_seconds)
404 else:
405 logger.debug('No jobs; waiting until a job is added')
406 self.wakeup.wait()
407 self.wakeup.clear()
OLDNEW
« no previous file with comments | « third_party/APScheduler/apscheduler/fields.py ('k') | third_party/APScheduler/apscheduler/triggers.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698