| OLD | NEW |
| (Empty) |
| 1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 """ Wrapper that allows method execution in parallel. | |
| 6 | |
| 7 This class wraps a list of objects of the same type, emulates their | |
| 8 interface, and executes any functions called on the objects in parallel | |
| 9 in ReraiserThreads. | |
| 10 | |
| 11 This means that, given a list of objects: | |
| 12 | |
| 13 class Foo: | |
| 14 def __init__(self): | |
| 15 self.baz = Baz() | |
| 16 | |
| 17 def bar(self, my_param): | |
| 18 // do something | |
| 19 | |
| 20 list_of_foos = [Foo(1), Foo(2), Foo(3)] | |
| 21 | |
| 22 we can take a sequential operation on that list of objects: | |
| 23 | |
| 24 for f in list_of_foos: | |
| 25 f.bar('Hello') | |
| 26 | |
| 27 and run it in parallel across all of the objects: | |
| 28 | |
| 29 Parallelizer(list_of_foos).bar('Hello') | |
| 30 | |
| 31 It can also handle (non-method) attributes of objects, so that this: | |
| 32 | |
| 33 for f in list_of_foos: | |
| 34 f.baz.myBazMethod() | |
| 35 | |
| 36 can be run in parallel with: | |
| 37 | |
| 38 Parallelizer(list_of_foos).baz.myBazMethod() | |
| 39 | |
| 40 Because it emulates the interface of the wrapped objects, a Parallelizer | |
| 41 can be passed to a method or function that takes objects of that type: | |
| 42 | |
| 43 def DoesSomethingWithFoo(the_foo): | |
| 44 the_foo.bar('Hello') | |
| 45 the_foo.bar('world') | |
| 46 the_foo.baz.myBazMethod | |
| 47 | |
| 48 DoesSomethingWithFoo(Parallelizer(list_of_foos)) | |
| 49 | |
| 50 Note that this class spins up a thread for each object. Using this class | |
| 51 to parallelize operations that are already fast will incur a net performance | |
| 52 penalty. | |
| 53 | |
| 54 """ | |
| 55 # pylint: disable=protected-access | |
| 56 | |
| 57 from devil.utils import reraiser_thread | |
| 58 from devil.utils import watchdog_timer | |
| 59 | |
| 60 _DEFAULT_TIMEOUT = 30 | |
| 61 _DEFAULT_RETRIES = 3 | |
| 62 | |
| 63 | |
| 64 class Parallelizer(object): | |
| 65 """Allows parallel execution of method calls across a group of objects.""" | |
| 66 | |
| 67 def __init__(self, objs): | |
| 68 assert (objs is not None and len(objs) > 0), ( | |
| 69 "Passed empty list to 'Parallelizer'") | |
| 70 self._orig_objs = objs | |
| 71 self._objs = objs | |
| 72 | |
| 73 def __getattr__(self, name): | |
| 74 """Emulate getting the |name| attribute of |self|. | |
| 75 | |
| 76 Args: | |
| 77 name: The name of the attribute to retrieve. | |
| 78 Returns: | |
| 79 A Parallelizer emulating the |name| attribute of |self|. | |
| 80 """ | |
| 81 self.pGet(None) | |
| 82 | |
| 83 r = type(self)(self._orig_objs) | |
| 84 r._objs = [getattr(o, name) for o in self._objs] | |
| 85 return r | |
| 86 | |
| 87 def __getitem__(self, index): | |
| 88 """Emulate getting the value of |self| at |index|. | |
| 89 | |
| 90 Returns: | |
| 91 A Parallelizer emulating the value of |self| at |index|. | |
| 92 """ | |
| 93 self.pGet(None) | |
| 94 | |
| 95 r = type(self)(self._orig_objs) | |
| 96 r._objs = [o[index] for o in self._objs] | |
| 97 return r | |
| 98 | |
| 99 def __call__(self, *args, **kwargs): | |
| 100 """Emulate calling |self| with |args| and |kwargs|. | |
| 101 | |
| 102 Note that this call is asynchronous. Call pFinish on the return value to | |
| 103 block until the call finishes. | |
| 104 | |
| 105 Returns: | |
| 106 A Parallelizer wrapping the ReraiserThreadGroup running the call in | |
| 107 parallel. | |
| 108 Raises: | |
| 109 AttributeError if the wrapped objects aren't callable. | |
| 110 """ | |
| 111 self.pGet(None) | |
| 112 | |
| 113 if not self._objs: | |
| 114 raise AttributeError('Nothing to call.') | |
| 115 for o in self._objs: | |
| 116 if not callable(o): | |
| 117 raise AttributeError("'%s' is not callable" % o.__name__) | |
| 118 | |
| 119 r = type(self)(self._orig_objs) | |
| 120 r._objs = reraiser_thread.ReraiserThreadGroup( | |
| 121 [reraiser_thread.ReraiserThread( | |
| 122 o, args=args, kwargs=kwargs, | |
| 123 name='%s.%s' % (str(d), o.__name__)) | |
| 124 for d, o in zip(self._orig_objs, self._objs)]) | |
| 125 r._objs.StartAll() # pylint: disable=W0212 | |
| 126 return r | |
| 127 | |
| 128 def pFinish(self, timeout): | |
| 129 """Finish any outstanding asynchronous operations. | |
| 130 | |
| 131 Args: | |
| 132 timeout: The maximum number of seconds to wait for an individual | |
| 133 result to return, or None to wait forever. | |
| 134 Returns: | |
| 135 self, now emulating the return values. | |
| 136 """ | |
| 137 self._assertNoShadow('pFinish') | |
| 138 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): | |
| 139 self._objs.JoinAll() | |
| 140 self._objs = self._objs.GetAllReturnValues( | |
| 141 watchdog_timer.WatchdogTimer(timeout)) | |
| 142 return self | |
| 143 | |
| 144 def pGet(self, timeout): | |
| 145 """Get the current wrapped objects. | |
| 146 | |
| 147 Args: | |
| 148 timeout: Same as |pFinish|. | |
| 149 Returns: | |
| 150 A list of the results, in order of the provided devices. | |
| 151 Raises: | |
| 152 Any exception raised by any of the called functions. | |
| 153 """ | |
| 154 self._assertNoShadow('pGet') | |
| 155 self.pFinish(timeout) | |
| 156 return self._objs | |
| 157 | |
| 158 def pMap(self, f, *args, **kwargs): | |
| 159 """Map a function across the current wrapped objects in parallel. | |
| 160 | |
| 161 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. | |
| 162 | |
| 163 Note that this call is asynchronous. Call pFinish on the return value to | |
| 164 block until the call finishes. | |
| 165 | |
| 166 Args: | |
| 167 f: The function to call. | |
| 168 args: The positional args to pass to f. | |
| 169 kwargs: The keyword args to pass to f. | |
| 170 Returns: | |
| 171 A Parallelizer wrapping the ReraiserThreadGroup running the map in | |
| 172 parallel. | |
| 173 """ | |
| 174 self._assertNoShadow('pMap') | |
| 175 r = type(self)(self._orig_objs) | |
| 176 r._objs = reraiser_thread.ReraiserThreadGroup( | |
| 177 [reraiser_thread.ReraiserThread( | |
| 178 f, args=tuple([o] + list(args)), kwargs=kwargs, | |
| 179 name='%s(%s)' % (f.__name__, d)) | |
| 180 for d, o in zip(self._orig_objs, self._objs)]) | |
| 181 r._objs.StartAll() # pylint: disable=W0212 | |
| 182 return r | |
| 183 | |
| 184 def _assertNoShadow(self, attr_name): | |
| 185 """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts. | |
| 186 | |
| 187 If the wrapped objects _do_ have an |attr_name| attribute, it will be | |
| 188 inaccessible to clients. | |
| 189 | |
| 190 Args: | |
| 191 attr_name: The attribute to check. | |
| 192 Raises: | |
| 193 AssertionError if the wrapped objects have an attribute named 'attr_name' | |
| 194 or '_assertNoShadow'. | |
| 195 """ | |
| 196 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): | |
| 197 assert not hasattr(self._objs, '_assertNoShadow') | |
| 198 assert not hasattr(self._objs, attr_name) | |
| 199 else: | |
| 200 assert not any(hasattr(o, '_assertNoShadow') for o in self._objs) | |
| 201 assert not any(hasattr(o, attr_name) for o in self._objs) | |
| 202 | |
| 203 | |
| 204 class SyncParallelizer(Parallelizer): | |
| 205 """A Parallelizer that blocks on function calls.""" | |
| 206 | |
| 207 #override | |
| 208 def __call__(self, *args, **kwargs): | |
| 209 """Emulate calling |self| with |args| and |kwargs|. | |
| 210 | |
| 211 Note that this call is synchronous. | |
| 212 | |
| 213 Returns: | |
| 214 A Parallelizer emulating the value returned from calling |self| with | |
| 215 |args| and |kwargs|. | |
| 216 Raises: | |
| 217 AttributeError if the wrapped objects aren't callable. | |
| 218 """ | |
| 219 r = super(SyncParallelizer, self).__call__(*args, **kwargs) | |
| 220 r.pFinish(None) | |
| 221 return r | |
| 222 | |
| 223 #override | |
| 224 def pMap(self, f, *args, **kwargs): | |
| 225 """Map a function across the current wrapped objects in parallel. | |
| 226 | |
| 227 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. | |
| 228 | |
| 229 Note that this call is synchronous. | |
| 230 | |
| 231 Args: | |
| 232 f: The function to call. | |
| 233 args: The positional args to pass to f. | |
| 234 kwargs: The keyword args to pass to f. | |
| 235 Returns: | |
| 236 A Parallelizer wrapping the ReraiserThreadGroup running the map in | |
| 237 parallel. | |
| 238 """ | |
| 239 r = super(SyncParallelizer, self).pMap(f, *args, **kwargs) | |
| 240 r.pFinish(None) | |
| 241 return r | |
| 242 | |
| OLD | NEW |