OLD | NEW |
(Empty) | |
| 1 """ |
| 2 This module is the main part of the library, and is the only module that |
| 3 regular users should be concerned with. |
| 4 """ |
| 5 from threading import Thread, Event, Lock |
| 6 from datetime import datetime, timedelta |
| 7 from logging import getLogger |
| 8 import os |
| 9 |
| 10 from apscheduler.util import time_difference, asbool |
| 11 from apscheduler.triggers import DateTrigger, IntervalTrigger, CronTrigger |
| 12 |
| 13 |
| 14 logger = getLogger(__name__) |
| 15 |
| 16 |
| 17 class Job(object): |
| 18 """ |
| 19 Represents a task scheduled in the scheduler. |
| 20 """ |
| 21 |
| 22 def __init__(self, trigger, func, args, kwargs): |
| 23 self.thread = None |
| 24 self.trigger = trigger |
| 25 self.func = func |
| 26 self.args = args |
| 27 self.kwargs = kwargs |
| 28 if hasattr(func, '__name__'): |
| 29 self.name = func.__name__ |
| 30 else: |
| 31 self.name = str(func) |
| 32 |
| 33 def run(self): |
| 34 """ |
| 35 Starts the execution of this job in a separate thread. |
| 36 """ |
| 37 if (self.thread and self.thread.isAlive()): |
| 38 logger.info('Skipping run of job %s (previously triggered ' |
| 39 'instance is still running)', self) |
| 40 else: |
| 41 self.thread = Thread(target=self.run_in_thread) |
| 42 self.thread.setDaemon(False) |
| 43 self.thread.start() |
| 44 |
| 45 def run_in_thread(self): |
| 46 """ |
| 47 Runs the associated callable. |
| 48 This method is executed in a dedicated thread. |
| 49 """ |
| 50 try: |
| 51 self.func(*self.args, **self.kwargs) |
| 52 except: |
| 53 logger.exception('Error executing job "%s"', self) |
| 54 raise |
| 55 |
| 56 def __str__(self): |
| 57 return '%s: %s' % (self.name, repr(self.trigger)) |
| 58 |
| 59 def __repr__(self): |
| 60 return '%s(%s, %s)' % (self.__class__.__name__, self.name, |
| 61 repr(self.trigger)) |
| 62 |
| 63 |
| 64 class SchedulerShutdownError(Exception): |
| 65 """ |
| 66 Thrown when attempting to use the scheduler after |
| 67 it's been shut down. |
| 68 """ |
| 69 |
| 70 def __init__(self): |
| 71 Exception.__init__(self, 'Scheduler has already been shut down') |
| 72 |
| 73 |
| 74 class SchedulerAlreadyRunningError(Exception): |
| 75 """ |
| 76 Thrown when attempting to start the scheduler, but it's already running. |
| 77 """ |
| 78 |
| 79 def __init__(self): |
| 80 Exception.__init__(self, 'Scheduler is already running') |
| 81 |
| 82 |
| 83 class Scheduler(object): |
| 84 """ |
| 85 This class is responsible for scheduling jobs and triggering |
| 86 their execution. |
| 87 """ |
| 88 |
| 89 stopped = False |
| 90 thread = None |
| 91 misfire_grace_time = 1 |
| 92 daemonic = True |
| 93 |
| 94 def __init__(self, **config): |
| 95 self.jobs = [] |
| 96 self.jobs_lock = Lock() |
| 97 self.wakeup = Event() |
| 98 self.configure(config) |
| 99 |
| 100 def configure(self, config): |
| 101 """ |
| 102 Updates the configuration with the given options. |
| 103 """ |
| 104 for key, val in config.items(): |
| 105 if key.startswith('apscheduler.'): |
| 106 key = key[12:] |
| 107 if key == 'misfire_grace_time': |
| 108 self.misfire_grace_time = int(val) |
| 109 elif key == 'daemonic': |
| 110 self.daemonic = asbool(val) |
| 111 |
| 112 def start(self): |
| 113 """ |
| 114 Starts the scheduler in a new thread. |
| 115 """ |
| 116 if self.thread and self.thread.isAlive(): |
| 117 raise SchedulerAlreadyRunningError |
| 118 |
| 119 self.stopped = False |
| 120 self.thread = Thread(target=self.run, name='APScheduler') |
| 121 self.thread.setDaemon(self.daemonic) |
| 122 self.thread.start() |
| 123 logger.info('Scheduler started') |
| 124 |
| 125 def shutdown(self, timeout=0): |
| 126 """ |
| 127 Shuts down the scheduler and terminates the thread. |
| 128 Does not terminate any currently running jobs. |
| 129 |
| 130 :param timeout: time (in seconds) to wait for the scheduler thread to |
| 131 terminate, 0 to wait forever, None to skip waiting |
| 132 """ |
| 133 if self.stopped or not self.thread.isAlive(): |
| 134 return |
| 135 |
| 136 logger.info('Scheduler shutting down') |
| 137 self.stopped = True |
| 138 self.wakeup.set() |
| 139 if timeout is not None: |
| 140 self.thread.join(timeout) |
| 141 self.jobs = [] |
| 142 |
| 143 def cron_schedule(self, year='*', month='*', day='*', week='*', |
| 144 day_of_week='*', hour='*', minute='*', second='*', |
| 145 args=None, kwargs=None): |
| 146 """ |
| 147 Decorator that causes its host function to be scheduled |
| 148 according to the given parameters. |
| 149 This decorator does not wrap its host function. |
| 150 The scheduled function will be called without any arguments. |
| 151 See :meth:`add_cron_job` for more information. |
| 152 """ |
| 153 def inner(func): |
| 154 self.add_cron_job(func, year, month, day, week, day_of_week, hour, |
| 155 minute, second, args, kwargs) |
| 156 return func |
| 157 return inner |
| 158 |
| 159 def interval_schedule(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, |
| 160 start_date=None, repeat=0, args=None, kwargs=None): |
| 161 """ |
| 162 Decorator that causes its host function to be scheduled |
| 163 for execution on specified intervals. |
| 164 This decorator does not wrap its host function. |
| 165 The scheduled function will be called without any arguments. |
| 166 Note that the default repeat value is 0, which means to repeat forever. |
| 167 See :meth:`add_delayed_job` for more information. |
| 168 """ |
| 169 def inner(func): |
| 170 self.add_interval_job(func, weeks, days, hours, minutes, seconds, |
| 171 start_date, repeat, args, kwargs) |
| 172 return func |
| 173 return inner |
| 174 |
| 175 def _add_job(self, trigger, func, args, kwargs): |
| 176 """ |
| 177 Adds a Job to the job list and notifies the scheduler thread. |
| 178 |
| 179 :param trigger: trigger for the given callable |
| 180 :param args: list of positional arguments to call func with |
| 181 :param kwargs: dict of keyword arguments to call func with |
| 182 :return: the scheduled job |
| 183 :rtype: Job |
| 184 """ |
| 185 if self.stopped: |
| 186 raise SchedulerShutdownError |
| 187 if not hasattr(func, '__call__'): |
| 188 raise TypeError('func must be callable') |
| 189 |
| 190 if args is None: |
| 191 args = [] |
| 192 if kwargs is None: |
| 193 kwargs = {} |
| 194 |
| 195 job = Job(trigger, func, args, kwargs) |
| 196 self.jobs_lock.acquire() |
| 197 try: |
| 198 self.jobs.append(job) |
| 199 finally: |
| 200 self.jobs_lock.release() |
| 201 logger.info('Added job "%s"', job) |
| 202 |
| 203 # Notify the scheduler about the new job |
| 204 self.wakeup.set() |
| 205 |
| 206 return job |
| 207 |
| 208 def add_date_job(self, func, date, args=None, kwargs=None): |
| 209 """ |
| 210 Adds a job to be completed on a specific date and time. |
| 211 |
| 212 :param func: callable to run |
| 213 :param args: positional arguments to call func with |
| 214 :param kwargs: keyword arguments to call func with |
| 215 """ |
| 216 trigger = DateTrigger(date) |
| 217 return self._add_job(trigger, func, args, kwargs) |
| 218 |
| 219 def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0, |
| 220 seconds=0, start_date=None, repeat=0, args=None, |
| 221 kwargs=None): |
| 222 """ |
| 223 Adds a job to be completed on specified intervals. |
| 224 |
| 225 :param func: callable to run |
| 226 :param weeks: number of weeks to wait |
| 227 :param days: number of days to wait |
| 228 :param hours: number of hours to wait |
| 229 :param minutes: number of minutes to wait |
| 230 :param seconds: number of seconds to wait |
| 231 :param start_date: when to first execute the job and start the |
| 232 counter (default is after the given interval) |
| 233 :param repeat: number of times the job will be run (0 = repeat |
| 234 indefinitely) |
| 235 :param args: list of positional arguments to call func with |
| 236 :param kwargs: dict of keyword arguments to call func with |
| 237 """ |
| 238 interval = timedelta(weeks=weeks, days=days, hours=hours, |
| 239 minutes=minutes, seconds=seconds) |
| 240 trigger = IntervalTrigger(interval, repeat, start_date) |
| 241 return self._add_job(trigger, func, args, kwargs) |
| 242 |
| 243 def add_cron_job(self, func, year='*', month='*', day='*', week='*', |
| 244 day_of_week='*', hour='*', minute='*', second='*', |
| 245 args=None, kwargs=None): |
| 246 """ |
| 247 Adds a job to be completed on times that match the given expressions. |
| 248 |
| 249 :param func: callable to run |
| 250 :param year: year to run on |
| 251 :param month: month to run on (0 = January) |
| 252 :param day: day of month to run on |
| 253 :param week: week of the year to run on |
| 254 :param day_of_week: weekday to run on (0 = Monday) |
| 255 :param hour: hour to run on |
| 256 :param second: second to run on |
| 257 :param args: list of positional arguments to call func with |
| 258 :param kwargs: dict of keyword arguments to call func with |
| 259 :return: the scheduled job |
| 260 :rtype: Job |
| 261 """ |
| 262 trigger = CronTrigger(year=year, month=month, day=day, week=week, |
| 263 day_of_week=day_of_week, hour=hour, |
| 264 minute=minute, second=second) |
| 265 return self._add_job(trigger, func, args, kwargs) |
| 266 |
| 267 def is_job_active(self, job): |
| 268 """ |
| 269 Determines if the given job is still on the job list. |
| 270 |
| 271 :return: True if the job is still active, False if not |
| 272 """ |
| 273 self.jobs_lock.acquire() |
| 274 try: |
| 275 return job in self.jobs |
| 276 finally: |
| 277 self.jobs_lock.release() |
| 278 |
| 279 def unschedule_job(self, job): |
| 280 """ |
| 281 Removes a job, preventing it from being fired any more. |
| 282 """ |
| 283 self.jobs_lock.acquire() |
| 284 try: |
| 285 self.jobs.remove(job) |
| 286 finally: |
| 287 self.jobs_lock.release() |
| 288 logger.info('Removed job "%s"', job) |
| 289 self.wakeup.set() |
| 290 |
| 291 def unschedule_func(self, func): |
| 292 """ |
| 293 Removes all jobs that would execute the given function. |
| 294 """ |
| 295 self.jobs_lock.acquire() |
| 296 try: |
| 297 remove_list = [job for job in self.jobs if job.func == func] |
| 298 for job in remove_list: |
| 299 self.jobs.remove(job) |
| 300 logger.info('Removed job "%s"', job) |
| 301 finally: |
| 302 self.jobs_lock.release() |
| 303 |
| 304 # Have the scheduler calculate a new wakeup time |
| 305 self.wakeup.set() |
| 306 |
| 307 def dump_jobs(self): |
| 308 """ |
| 309 Gives a textual listing of all jobs currently scheduled on this |
| 310 scheduler. |
| 311 |
| 312 :rtype: str |
| 313 """ |
| 314 job_strs = [] |
| 315 now = datetime.now() |
| 316 self.jobs_lock.acquire() |
| 317 try: |
| 318 for job in self.jobs: |
| 319 next_fire_time = job.trigger.get_next_fire_time(now) |
| 320 job_str = '%s (next fire time: %s)' % (str(job), |
| 321 next_fire_time) |
| 322 job_strs.append(job_str) |
| 323 finally: |
| 324 self.jobs_lock.release() |
| 325 |
| 326 if job_strs: |
| 327 return os.linesep.join(job_strs) |
| 328 return 'No jobs currently scheduled.' |
| 329 |
| 330 def _get_next_wakeup_time(self, now): |
| 331 """ |
| 332 Determines the time of the next job execution, and removes finished |
| 333 jobs. |
| 334 |
| 335 :param now: the result of datetime.now(), generated elsewhere for |
| 336 consistency. |
| 337 """ |
| 338 next_wakeup = None |
| 339 finished_jobs = [] |
| 340 |
| 341 self.jobs_lock.acquire() |
| 342 try: |
| 343 for job in self.jobs: |
| 344 next_run = job.trigger.get_next_fire_time(now) |
| 345 if next_run is None: |
| 346 finished_jobs.append(job) |
| 347 elif next_run and (next_wakeup is None or \ |
| 348 next_run < next_wakeup): |
| 349 next_wakeup = next_run |
| 350 |
| 351 # Clear out any finished jobs |
| 352 for job in finished_jobs: |
| 353 self.jobs.remove(job) |
| 354 logger.info('Removed finished job "%s"', job) |
| 355 finally: |
| 356 self.jobs_lock.release() |
| 357 |
| 358 return next_wakeup |
| 359 |
| 360 def _get_current_jobs(self): |
| 361 """ |
| 362 Determines which jobs should be executed right now. |
| 363 """ |
| 364 current_jobs = [] |
| 365 now = datetime.now() |
| 366 start = now - timedelta(seconds=self.misfire_grace_time) |
| 367 |
| 368 self.jobs_lock.acquire() |
| 369 try: |
| 370 for job in self.jobs: |
| 371 next_run = job.trigger.get_next_fire_time(start) |
| 372 if next_run: |
| 373 time_diff = time_difference(now, next_run) |
| 374 if next_run < now and time_diff <= self.misfire_grace_time: |
| 375 current_jobs.append(job) |
| 376 finally: |
| 377 self.jobs_lock.release() |
| 378 |
| 379 return current_jobs |
| 380 |
| 381 def run(self): |
| 382 """ |
| 383 Runs the main loop of the scheduler. |
| 384 """ |
| 385 self.wakeup.clear() |
| 386 while not self.stopped: |
| 387 # Execute any jobs scheduled to be run right now |
| 388 for job in self._get_current_jobs(): |
| 389 logger.debug('Executing job "%s"', job) |
| 390 job.run() |
| 391 |
| 392 # Figure out when the next job should be run, and |
| 393 # adjust the wait time accordingly |
| 394 now = datetime.now() |
| 395 next_wakeup_time = self._get_next_wakeup_time(now) |
| 396 |
| 397 # Sleep until the next job is scheduled to be run, |
| 398 # or a new job is added, or the scheduler is stopped |
| 399 if next_wakeup_time is not None: |
| 400 wait_seconds = time_difference(next_wakeup_time, now) |
| 401 logger.debug('Next wakeup is due at %s (in %f seconds)', |
| 402 next_wakeup_time, wait_seconds) |
| 403 self.wakeup.wait(wait_seconds) |
| 404 else: |
| 405 logger.debug('No jobs; waiting until a job is added') |
| 406 self.wakeup.wait() |
| 407 self.wakeup.clear() |
OLD | NEW |