OLD | NEW |
(Empty) | |
| 1 # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 |
| 2 # For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt |
| 3 |
| 4 """Tests for concurrency libraries.""" |
| 5 |
| 6 import threading |
| 7 |
| 8 import coverage |
| 9 from coverage import env |
| 10 from coverage.files import abs_file |
| 11 |
| 12 from tests.coveragetest import CoverageTest |
| 13 |
| 14 |
| 15 # These libraries aren't always available, we'll skip tests if they aren't. |
| 16 |
| 17 try: |
| 18 import eventlet |
| 19 except ImportError: |
| 20 eventlet = None |
| 21 |
| 22 try: |
| 23 import gevent |
| 24 except ImportError: |
| 25 gevent = None |
| 26 |
| 27 import greenlet |
| 28 |
| 29 |
| 30 def line_count(s): |
| 31 """How many non-blank non-comment lines are in `s`?""" |
| 32 def code_line(l): |
| 33 """Is this a code line? Not blank, and not a full-line comment.""" |
| 34 return l.strip() and not l.strip().startswith('#') |
| 35 return sum(1 for l in s.splitlines() if code_line(l)) |
| 36 |
| 37 |
| 38 class ConcurrencyTest(CoverageTest): |
| 39 """Tests of the concurrency support in coverage.py.""" |
| 40 |
| 41 LIMIT = 1000 |
| 42 |
| 43 # The code common to all the concurrency models. |
| 44 COMMON = """ |
| 45 class Producer(threading.Thread): |
| 46 def __init__(self, q): |
| 47 threading.Thread.__init__(self) |
| 48 self.q = q |
| 49 |
| 50 def run(self): |
| 51 for i in range({LIMIT}): |
| 52 self.q.put(i) |
| 53 self.q.put(None) |
| 54 |
| 55 class Consumer(threading.Thread): |
| 56 def __init__(self, q): |
| 57 threading.Thread.__init__(self) |
| 58 self.q = q |
| 59 |
| 60 def run(self): |
| 61 sum = 0 |
| 62 while True: |
| 63 i = self.q.get() |
| 64 if i is None: |
| 65 print(sum) |
| 66 break |
| 67 sum += i |
| 68 |
| 69 q = queue.Queue() |
| 70 c = Consumer(q) |
| 71 p = Producer(q) |
| 72 c.start() |
| 73 p.start() |
| 74 |
| 75 p.join() |
| 76 c.join() |
| 77 """.format(LIMIT=LIMIT) |
| 78 |
| 79 # Import the things to use threads. |
| 80 if env.PY2: |
| 81 THREAD = """\ |
| 82 import threading |
| 83 import Queue as queue |
| 84 """ + COMMON |
| 85 else: |
| 86 THREAD = """\ |
| 87 import threading |
| 88 import queue |
| 89 """ + COMMON |
| 90 |
| 91 # Import the things to use eventlet. |
| 92 EVENTLET = """\ |
| 93 import eventlet.green.threading as threading |
| 94 import eventlet.queue as queue |
| 95 """ + COMMON |
| 96 |
| 97 # Import the things to use gevent. |
| 98 GEVENT = """\ |
| 99 from gevent import monkey |
| 100 monkey.patch_thread() |
| 101 import threading |
| 102 import gevent.queue as queue |
| 103 """ + COMMON |
| 104 |
| 105 # Uncomplicated code that doesn't use any of the concurrency stuff, to test |
| 106 # the simple case under each of the regimes. |
| 107 SIMPLE = """\ |
| 108 total = 0 |
| 109 for i in range({LIMIT}): |
| 110 total += i |
| 111 print(total) |
| 112 """.format(LIMIT=LIMIT) |
| 113 |
| 114 def try_some_code(self, code, concurrency, the_module, expected_out=None): |
| 115 """Run some concurrency testing code and see that it was all covered. |
| 116 |
| 117 `code` is the Python code to execute. `concurrency` is the name of |
| 118 the concurrency regime to test it under. `the_module` is the imported |
| 119 module that must be available for this to work at all. `expected_out` |
| 120 is the text we expect the code to produce. |
| 121 |
| 122 """ |
| 123 |
| 124 self.make_file("try_it.py", code) |
| 125 |
| 126 cmd = "coverage run --concurrency=%s try_it.py" % concurrency |
| 127 out = self.run_command(cmd) |
| 128 |
| 129 if not the_module: |
| 130 # We don't even have the underlying module installed, we expect |
| 131 # coverage to alert us to this fact. |
| 132 expected_out = ( |
| 133 "Couldn't trace with concurrency=%s, " |
| 134 "the module isn't installed.\n" % concurrency |
| 135 ) |
| 136 self.assertEqual(out, expected_out) |
| 137 elif env.C_TRACER or concurrency == "thread": |
| 138 # We can fully measure the code if we are using the C tracer, which |
| 139 # can support all the concurrency, or if we are using threads. |
| 140 if expected_out is None: |
| 141 expected_out = "%d\n" % (sum(range(self.LIMIT))) |
| 142 self.assertEqual(out, expected_out) |
| 143 |
| 144 # Read the coverage file and see that try_it.py has all its lines |
| 145 # executed. |
| 146 data = coverage.CoverageData() |
| 147 data.read_file(".coverage") |
| 148 |
| 149 # If the test fails, it's helpful to see this info: |
| 150 fname = abs_file("try_it.py") |
| 151 linenos = data.lines(fname) |
| 152 print("{0}: {1}".format(len(linenos), linenos)) |
| 153 print_simple_annotation(code, linenos) |
| 154 |
| 155 lines = line_count(code) |
| 156 self.assertEqual(data.line_counts()['try_it.py'], lines) |
| 157 else: |
| 158 expected_out = ( |
| 159 "Can't support concurrency=%s with PyTracer, " |
| 160 "only threads are supported\n" % concurrency |
| 161 ) |
| 162 self.assertEqual(out, expected_out) |
| 163 |
| 164 def test_threads(self): |
| 165 self.try_some_code(self.THREAD, "thread", threading) |
| 166 |
| 167 def test_threads_simple_code(self): |
| 168 self.try_some_code(self.SIMPLE, "thread", threading) |
| 169 |
| 170 def test_eventlet(self): |
| 171 self.try_some_code(self.EVENTLET, "eventlet", eventlet) |
| 172 |
| 173 def test_eventlet_simple_code(self): |
| 174 self.try_some_code(self.SIMPLE, "eventlet", eventlet) |
| 175 |
| 176 def test_gevent(self): |
| 177 self.try_some_code(self.GEVENT, "gevent", gevent) |
| 178 |
| 179 def test_gevent_simple_code(self): |
| 180 self.try_some_code(self.SIMPLE, "gevent", gevent) |
| 181 |
| 182 def test_greenlet(self): |
| 183 GREENLET = """\ |
| 184 from greenlet import greenlet |
| 185 |
| 186 def test1(x, y): |
| 187 z = gr2.switch(x+y) |
| 188 print(z) |
| 189 |
| 190 def test2(u): |
| 191 print(u) |
| 192 gr1.switch(42) |
| 193 |
| 194 gr1 = greenlet(test1) |
| 195 gr2 = greenlet(test2) |
| 196 gr1.switch("hello", " world") |
| 197 """ |
| 198 self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") |
| 199 |
| 200 def test_greenlet_simple_code(self): |
| 201 self.try_some_code(self.SIMPLE, "greenlet", greenlet) |
| 202 |
| 203 def test_bug_330(self): |
| 204 BUG_330 = """\ |
| 205 from weakref import WeakKeyDictionary |
| 206 import eventlet |
| 207 |
| 208 def do(): |
| 209 eventlet.sleep(.01) |
| 210 |
| 211 gts = WeakKeyDictionary() |
| 212 for _ in range(100): |
| 213 gts[eventlet.spawn(do)] = True |
| 214 eventlet.sleep(.005) |
| 215 |
| 216 eventlet.sleep(.1) |
| 217 print(len(gts)) |
| 218 """ |
| 219 self.try_some_code(BUG_330, "eventlet", eventlet, "0\n") |
| 220 |
| 221 |
| 222 class MultiprocessingTest(CoverageTest): |
| 223 """Test support of the multiprocessing module.""" |
| 224 |
| 225 def setUp(self): |
| 226 super(MultiprocessingTest, self).setUp() |
| 227 # Currently, this doesn't work on Windows, something about pickling |
| 228 # the monkey-patched Process class? |
| 229 if env.WINDOWS: |
| 230 self.skip("Multiprocessing support doesn't work on Windows") |
| 231 |
| 232 def test_multiprocessing(self): |
| 233 self.make_file("multi.py", """\ |
| 234 import multiprocessing |
| 235 import os |
| 236 import time |
| 237 |
| 238 def func(x): |
| 239 # Need to pause, or the tasks go too quick, and some processes |
| 240 # in the pool don't get any work, and then don't record data. |
| 241 time.sleep(0.02) |
| 242 # Use different lines in different subprocesses. |
| 243 if x % 2: |
| 244 y = x*x |
| 245 else: |
| 246 y = x*x*x |
| 247 return os.getpid(), y |
| 248 |
| 249 if __name__ == "__main__": |
| 250 pool = multiprocessing.Pool(3) |
| 251 inputs = range(30) |
| 252 outputs = pool.imap_unordered(func, inputs) |
| 253 pids = set() |
| 254 total = 0 |
| 255 for pid, sq in outputs: |
| 256 pids.add(pid) |
| 257 total += sq |
| 258 print("%d pids, total = %d" % (len(pids), total)) |
| 259 pool.close() |
| 260 pool.join() |
| 261 """) |
| 262 |
| 263 out = self.run_command( |
| 264 "coverage run --concurrency=multiprocessing multi.py" |
| 265 ) |
| 266 total = sum(x*x if x%2 else x*x*x for x in range(30)) |
| 267 self.assertEqual(out.rstrip(), "3 pids, total = %d" % total) |
| 268 |
| 269 self.run_command("coverage combine") |
| 270 out = self.run_command("coverage report -m") |
| 271 last_line = self.squeezed_lines(out)[-1] |
| 272 self.assertEqual(last_line, "multi.py 21 0 100%") |
| 273 |
| 274 |
| 275 def print_simple_annotation(code, linenos): |
| 276 """Print the lines in `code` with X for each line number in `linenos`.""" |
| 277 for lineno, line in enumerate(code.splitlines(), start=1): |
| 278 print(" {0} {1}".format("X" if lineno in linenos else " ", line)) |
OLD | NEW |