OLD | NEW |
| (Empty) |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """ Wrapper that allows method execution in parallel. | |
6 | |
7 This class wraps a list of objects of the same type, emulates their | |
8 interface, and executes any functions called on the objects in parallel | |
9 in ReraiserThreads. | |
10 | |
11 This means that, given a list of objects: | |
12 | |
13 class Foo: | |
14 def __init__(self): | |
15 self.baz = Baz() | |
16 | |
17 def bar(self, my_param): | |
18 // do something | |
19 | |
20 list_of_foos = [Foo(1), Foo(2), Foo(3)] | |
21 | |
22 we can take a sequential operation on that list of objects: | |
23 | |
24 for f in list_of_foos: | |
25 f.bar('Hello') | |
26 | |
27 and run it in parallel across all of the objects: | |
28 | |
29 Parallelizer(list_of_foos).bar('Hello') | |
30 | |
31 It can also handle (non-method) attributes of objects, so that this: | |
32 | |
33 for f in list_of_foos: | |
34 f.baz.myBazMethod() | |
35 | |
36 can be run in parallel with: | |
37 | |
38 Parallelizer(list_of_foos).baz.myBazMethod() | |
39 | |
40 Because it emulates the interface of the wrapped objects, a Parallelizer | |
41 can be passed to a method or function that takes objects of that type: | |
42 | |
43 def DoesSomethingWithFoo(the_foo): | |
44 the_foo.bar('Hello') | |
45 the_foo.bar('world') | |
46 the_foo.baz.myBazMethod | |
47 | |
48 DoesSomethingWithFoo(Parallelizer(list_of_foos)) | |
49 | |
50 Note that this class spins up a thread for each object. Using this class | |
51 to parallelize operations that are already fast will incur a net performance | |
52 penalty. | |
53 | |
54 """ | |
55 # pylint: disable=protected-access | |
56 | |
57 from devil.utils import reraiser_thread | |
58 from devil.utils import watchdog_timer | |
59 | |
60 _DEFAULT_TIMEOUT = 30 | |
61 _DEFAULT_RETRIES = 3 | |
62 | |
63 | |
64 class Parallelizer(object): | |
65 """Allows parallel execution of method calls across a group of objects.""" | |
66 | |
67 def __init__(self, objs): | |
68 assert (objs is not None and len(objs) > 0), ( | |
69 "Passed empty list to 'Parallelizer'") | |
70 self._orig_objs = objs | |
71 self._objs = objs | |
72 | |
73 def __getattr__(self, name): | |
74 """Emulate getting the |name| attribute of |self|. | |
75 | |
76 Args: | |
77 name: The name of the attribute to retrieve. | |
78 Returns: | |
79 A Parallelizer emulating the |name| attribute of |self|. | |
80 """ | |
81 self.pGet(None) | |
82 | |
83 r = type(self)(self._orig_objs) | |
84 r._objs = [getattr(o, name) for o in self._objs] | |
85 return r | |
86 | |
87 def __getitem__(self, index): | |
88 """Emulate getting the value of |self| at |index|. | |
89 | |
90 Returns: | |
91 A Parallelizer emulating the value of |self| at |index|. | |
92 """ | |
93 self.pGet(None) | |
94 | |
95 r = type(self)(self._orig_objs) | |
96 r._objs = [o[index] for o in self._objs] | |
97 return r | |
98 | |
99 def __call__(self, *args, **kwargs): | |
100 """Emulate calling |self| with |args| and |kwargs|. | |
101 | |
102 Note that this call is asynchronous. Call pFinish on the return value to | |
103 block until the call finishes. | |
104 | |
105 Returns: | |
106 A Parallelizer wrapping the ReraiserThreadGroup running the call in | |
107 parallel. | |
108 Raises: | |
109 AttributeError if the wrapped objects aren't callable. | |
110 """ | |
111 self.pGet(None) | |
112 | |
113 if not self._objs: | |
114 raise AttributeError('Nothing to call.') | |
115 for o in self._objs: | |
116 if not callable(o): | |
117 raise AttributeError("'%s' is not callable" % o.__name__) | |
118 | |
119 r = type(self)(self._orig_objs) | |
120 r._objs = reraiser_thread.ReraiserThreadGroup( | |
121 [reraiser_thread.ReraiserThread( | |
122 o, args=args, kwargs=kwargs, | |
123 name='%s.%s' % (str(d), o.__name__)) | |
124 for d, o in zip(self._orig_objs, self._objs)]) | |
125 r._objs.StartAll() # pylint: disable=W0212 | |
126 return r | |
127 | |
128 def pFinish(self, timeout): | |
129 """Finish any outstanding asynchronous operations. | |
130 | |
131 Args: | |
132 timeout: The maximum number of seconds to wait for an individual | |
133 result to return, or None to wait forever. | |
134 Returns: | |
135 self, now emulating the return values. | |
136 """ | |
137 self._assertNoShadow('pFinish') | |
138 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): | |
139 self._objs.JoinAll() | |
140 self._objs = self._objs.GetAllReturnValues( | |
141 watchdog_timer.WatchdogTimer(timeout)) | |
142 return self | |
143 | |
144 def pGet(self, timeout): | |
145 """Get the current wrapped objects. | |
146 | |
147 Args: | |
148 timeout: Same as |pFinish|. | |
149 Returns: | |
150 A list of the results, in order of the provided devices. | |
151 Raises: | |
152 Any exception raised by any of the called functions. | |
153 """ | |
154 self._assertNoShadow('pGet') | |
155 self.pFinish(timeout) | |
156 return self._objs | |
157 | |
158 def pMap(self, f, *args, **kwargs): | |
159 """Map a function across the current wrapped objects in parallel. | |
160 | |
161 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. | |
162 | |
163 Note that this call is asynchronous. Call pFinish on the return value to | |
164 block until the call finishes. | |
165 | |
166 Args: | |
167 f: The function to call. | |
168 args: The positional args to pass to f. | |
169 kwargs: The keyword args to pass to f. | |
170 Returns: | |
171 A Parallelizer wrapping the ReraiserThreadGroup running the map in | |
172 parallel. | |
173 """ | |
174 self._assertNoShadow('pMap') | |
175 r = type(self)(self._orig_objs) | |
176 r._objs = reraiser_thread.ReraiserThreadGroup( | |
177 [reraiser_thread.ReraiserThread( | |
178 f, args=tuple([o] + list(args)), kwargs=kwargs, | |
179 name='%s(%s)' % (f.__name__, d)) | |
180 for d, o in zip(self._orig_objs, self._objs)]) | |
181 r._objs.StartAll() # pylint: disable=W0212 | |
182 return r | |
183 | |
184 def _assertNoShadow(self, attr_name): | |
185 """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts. | |
186 | |
187 If the wrapped objects _do_ have an |attr_name| attribute, it will be | |
188 inaccessible to clients. | |
189 | |
190 Args: | |
191 attr_name: The attribute to check. | |
192 Raises: | |
193 AssertionError if the wrapped objects have an attribute named 'attr_name' | |
194 or '_assertNoShadow'. | |
195 """ | |
196 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): | |
197 assert not hasattr(self._objs, '_assertNoShadow') | |
198 assert not hasattr(self._objs, attr_name) | |
199 else: | |
200 assert not any(hasattr(o, '_assertNoShadow') for o in self._objs) | |
201 assert not any(hasattr(o, attr_name) for o in self._objs) | |
202 | |
203 | |
204 class SyncParallelizer(Parallelizer): | |
205 """A Parallelizer that blocks on function calls.""" | |
206 | |
207 #override | |
208 def __call__(self, *args, **kwargs): | |
209 """Emulate calling |self| with |args| and |kwargs|. | |
210 | |
211 Note that this call is synchronous. | |
212 | |
213 Returns: | |
214 A Parallelizer emulating the value returned from calling |self| with | |
215 |args| and |kwargs|. | |
216 Raises: | |
217 AttributeError if the wrapped objects aren't callable. | |
218 """ | |
219 r = super(SyncParallelizer, self).__call__(*args, **kwargs) | |
220 r.pFinish(None) | |
221 return r | |
222 | |
223 #override | |
224 def pMap(self, f, *args, **kwargs): | |
225 """Map a function across the current wrapped objects in parallel. | |
226 | |
227 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. | |
228 | |
229 Note that this call is synchronous. | |
230 | |
231 Args: | |
232 f: The function to call. | |
233 args: The positional args to pass to f. | |
234 kwargs: The keyword args to pass to f. | |
235 Returns: | |
236 A Parallelizer wrapping the ReraiserThreadGroup running the map in | |
237 parallel. | |
238 """ | |
239 r = super(SyncParallelizer, self).pMap(f, *args, **kwargs) | |
240 r.pFinish(None) | |
241 return r | |
242 | |
OLD | NEW |