Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: chrome/test/pyautolib/timer_queue.py

Issue 222873002: Remove pyauto tests. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: sync Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import copy
6 import threading
7 import time
8
9 class TimerQueue(threading.Thread):
10 """Executes timers at a given interval.
11
12 This class provides the ability to run methods at a given interval. All
13 methods are fired synchronously. Only one method is running at a time.
14
15 Example of using TimerQueue:
16 def _fooPrinter(word):
17 print('foo : %s' % word)
18
19 timers = TimerQueue()
20 timers.addTimer(self._fooPrinter, 15, args=('hello',))
21 timers.start()
22
23 >> hello will be printed after 15 seconds
24
25 Note: TimerQueue is a subclass of threading.Thread, call start() to activate;
26 do not call run() directly.
27 """
28
29 def __init__(self):
30 """Initializes a TimerQueue object."""
31 threading.Thread.__init__(self, name='timer_thread')
32 self.timer_queue_lock = threading.Lock()
33 self.terminate = False
34 self.wait_time = 1
35 self.timers = []
36
37 def AddTimer(self, method, interval, args=()):
38 """Adds a timer to the queue.
39
40 Args:
41 method: the method to be called at the given interval
42 interval: delay between method runs, in seconds
43 args: arguments to be passed to the method
44 """
45 self.timer_queue_lock.acquire()
46 next_time = time.time() + interval
47 self.timers.append({'method': method, 'interval': interval,
48 'next time': next_time, 'args': copy.copy(args)})
49 self.timer_queue_lock.release()
50
51 def SetResolution(self, resolution):
52 """Sets the timer check frequency, in seconds."""
53 self.wait_time = resolution
54
55 def RemoveTimer(self, method):
56 """Removes a timer from the queue.
57
58 Args:
59 method: the timer containing the given method to be removed
60 """
61 self.timer_queue_lock.acquire()
62 for timer in self.timers:
63 if timer['method'] == method:
64 self.timers.remove(timer)
65 break
66 self.timer_queue_lock.release()
67
68 def Stop(self):
69 """Stops the timer."""
70 self.terminate = True
71
72 def run(self):
73 """Primary run loop for the timer."""
74 while True:
75 now = time.time()
76 self.timer_queue_lock.acquire()
77 for timer in self.timers:
78 if timer['next time'] <= now:
79 # Use * to break the list into separate arguments
80 timer['method'](*timer['args'])
81 timer['next time'] += timer['interval']
82 self.timer_queue_lock.release()
83 if self.terminate:
84 return
85 time.sleep(self.wait_time)
OLDNEW
« no previous file with comments | « chrome/test/pyautolib/remote_inspector_client.py ('k') | chrome/test/pyautolib/webdriver.DEPS/DEPS » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698