| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 import os, sys, time, logging | |
| 5 from cStringIO import StringIO | |
| 6 | |
| 7 from twisted.trial import unittest | |
| 8 | |
| 9 from twisted.python import log | |
| 10 from twisted.python import failure | |
| 11 | |
| 12 | |
| 13 class LogTest(unittest.TestCase): | |
| 14 | |
| 15 def setUp(self): | |
| 16 self.catcher = [] | |
| 17 log.addObserver(self.catcher.append) | |
| 18 | |
| 19 def tearDown(self): | |
| 20 log.removeObserver(self.catcher.append) | |
| 21 | |
| 22 def testObservation(self): | |
| 23 catcher = self.catcher | |
| 24 log.msg("test", testShouldCatch=True) | |
| 25 i = catcher.pop() | |
| 26 self.assertEquals(i["message"][0], "test") | |
| 27 self.assertEquals(i["testShouldCatch"], True) | |
| 28 self.failUnless(i.has_key("time")) | |
| 29 self.assertEquals(len(catcher), 0) | |
| 30 | |
| 31 def testContext(self): | |
| 32 catcher = self.catcher | |
| 33 log.callWithContext({"subsystem": "not the default", | |
| 34 "subsubsystem": "a", | |
| 35 "other": "c"}, | |
| 36 log.callWithContext, | |
| 37 {"subsubsystem": "b"}, log.msg, "foo", other="d") | |
| 38 i = catcher.pop() | |
| 39 self.assertEquals(i['subsubsystem'], 'b') | |
| 40 self.assertEquals(i['subsystem'], 'not the default') | |
| 41 self.assertEquals(i['other'], 'd') | |
| 42 self.assertEquals(i['message'][0], 'foo') | |
| 43 | |
| 44 def testErrors(self): | |
| 45 for e, ig in [("hello world","hello world"), | |
| 46 (KeyError(), KeyError), | |
| 47 (failure.Failure(RuntimeError()), RuntimeError)]: | |
| 48 log.err(e) | |
| 49 i = self.catcher.pop() | |
| 50 self.assertEquals(i['isError'], 1) | |
| 51 self.flushLoggedErrors(ig) | |
| 52 | |
| 53 def testErrorsWithWhy(self): | |
| 54 for e, ig in [("hello world","hello world"), | |
| 55 (KeyError(), KeyError), | |
| 56 (failure.Failure(RuntimeError()), RuntimeError)]: | |
| 57 log.err(e, 'foobar') | |
| 58 i = self.catcher.pop() | |
| 59 self.assertEquals(i['isError'], 1) | |
| 60 self.assertEquals(i['why'], 'foobar') | |
| 61 self.flushLoggedErrors(ig) | |
| 62 | |
| 63 | |
| 64 def testErroneousErrors(self): | |
| 65 L1 = [] | |
| 66 L2 = [] | |
| 67 log.addObserver(lambda events: L1.append(events)) | |
| 68 log.addObserver(lambda events: 1/0) | |
| 69 log.addObserver(lambda events: L2.append(events)) | |
| 70 log.msg("Howdy, y'all.") | |
| 71 | |
| 72 # XXX - use private _flushErrors so we don't also catch | |
| 73 # the deprecation warnings | |
| 74 excs = [f.type for f in log._flushErrors(ZeroDivisionError)] | |
| 75 self.assertEquals([ZeroDivisionError], excs) | |
| 76 | |
| 77 self.assertEquals(len(L1), 2) | |
| 78 self.assertEquals(len(L2), 2) | |
| 79 | |
| 80 self.assertEquals(L1[1]['message'], ("Howdy, y'all.",)) | |
| 81 self.assertEquals(L2[0]['message'], ("Howdy, y'all.",)) | |
| 82 | |
| 83 # The observer has been removed, there should be no exception | |
| 84 log.msg("Howdy, y'all.") | |
| 85 | |
| 86 self.assertEquals(len(L1), 3) | |
| 87 self.assertEquals(len(L2), 3) | |
| 88 self.assertEquals(L1[2]['message'], ("Howdy, y'all.",)) | |
| 89 self.assertEquals(L2[2]['message'], ("Howdy, y'all.",)) | |
| 90 | |
| 91 | |
| 92 class FakeFile(list): | |
| 93 def write(self, bytes): | |
| 94 self.append(bytes) | |
| 95 | |
| 96 def flush(self): | |
| 97 pass | |
| 98 | |
| 99 class EvilStr: | |
| 100 def __str__(self): | |
| 101 1/0 | |
| 102 | |
| 103 class EvilRepr: | |
| 104 def __str__(self): | |
| 105 return "Happy Evil Repr" | |
| 106 def __repr__(self): | |
| 107 1/0 | |
| 108 | |
| 109 class EvilReprStr(EvilStr, EvilRepr): | |
| 110 pass | |
| 111 | |
| 112 class LogPublisherTestCaseMixin: | |
| 113 def setUp(self): | |
| 114 """ | |
| 115 Add a log observer which records log events in C{self.out}. Also, | |
| 116 make sure the default string encoding is ASCII so that | |
| 117 L{testSingleUnicode} can test the behavior of logging unencodable | |
| 118 unicode messages. | |
| 119 """ | |
| 120 self.out = FakeFile() | |
| 121 self.lp = log.LogPublisher() | |
| 122 self.flo = log.FileLogObserver(self.out) | |
| 123 self.lp.addObserver(self.flo.emit) | |
| 124 | |
| 125 try: | |
| 126 str(u'\N{VULGAR FRACTION ONE HALF}') | |
| 127 except UnicodeEncodeError: | |
| 128 # This is the behavior we want - don't change anything. | |
| 129 self._origEncoding = None | |
| 130 else: | |
| 131 reload(sys) | |
| 132 self._origEncoding = sys.getdefaultencoding() | |
| 133 sys.setdefaultencoding('ascii') | |
| 134 | |
| 135 | |
| 136 def tearDown(self): | |
| 137 """ | |
| 138 Verify that everything written to the fake file C{self.out} was a | |
| 139 C{str}. Also, restore the default string encoding to its previous | |
| 140 setting, if it was modified by L{setUp}. | |
| 141 """ | |
| 142 for chunk in self.out: | |
| 143 self.failUnless(isinstance(chunk, str), "%r was not a string" % (chu
nk,)) | |
| 144 | |
| 145 if self._origEncoding is not None: | |
| 146 sys.setdefaultencoding(self._origEncoding) | |
| 147 del sys.setdefaultencoding | |
| 148 | |
| 149 | |
| 150 | |
| 151 class LogPublisherTestCase(LogPublisherTestCaseMixin, unittest.TestCase): | |
| 152 def testSingleString(self): | |
| 153 self.lp.msg("Hello, world.") | |
| 154 self.assertEquals(len(self.out), 1) | |
| 155 | |
| 156 | |
| 157 def testMultipleString(self): | |
| 158 # Test some stupid behavior that will be deprecated real soon. | |
| 159 # If you are reading this and trying to learn how the logging | |
| 160 # system works, *do not use this feature*. | |
| 161 self.lp.msg("Hello, ", "world.") | |
| 162 self.assertEquals(len(self.out), 1) | |
| 163 | |
| 164 | |
| 165 def testSingleUnicode(self): | |
| 166 self.lp.msg(u"Hello, \N{VULGAR FRACTION ONE HALF} world.") | |
| 167 self.assertEquals(len(self.out), 1) | |
| 168 self.assertIn('with str error Traceback', self.out[0]) | |
| 169 self.assertIn('UnicodeEncodeError', self.out[0]) | |
| 170 | |
| 171 | |
| 172 | |
| 173 class FileObserverTestCase(LogPublisherTestCaseMixin, unittest.TestCase): | |
| 174 def test_getTimezoneOffset(self): | |
| 175 """ | |
| 176 Attempt to verify that L{FileLogObserver.getTimezoneOffset} returns | |
| 177 correct values for the current C{TZ} environment setting. Do this | |
| 178 by setting C{TZ} to various well-known values and asserting that the | |
| 179 reported offset is correct. | |
| 180 """ | |
| 181 localDaylightTuple = (2006, 6, 30, 0, 0, 0, 4, 181, 1) | |
| 182 utcDaylightTimestamp = time.mktime(localDaylightTuple) | |
| 183 localStandardTuple = (2007, 1, 31, 0, 0, 0, 2, 31, 0) | |
| 184 utcStandardTimestamp = time.mktime(localStandardTuple) | |
| 185 | |
| 186 originalTimezone = os.environ.get('TZ', None) | |
| 187 try: | |
| 188 # Test something west of UTC | |
| 189 os.environ['TZ'] = 'America/New_York' | |
| 190 time.tzset() | |
| 191 self.assertEqual( | |
| 192 self.flo.getTimezoneOffset(utcDaylightTimestamp), | |
| 193 14400) | |
| 194 self.assertEqual( | |
| 195 self.flo.getTimezoneOffset(utcStandardTimestamp), | |
| 196 18000) | |
| 197 | |
| 198 # Test something east of UTC | |
| 199 os.environ['TZ'] = 'Europe/Berlin' | |
| 200 time.tzset() | |
| 201 self.assertEqual( | |
| 202 self.flo.getTimezoneOffset(utcDaylightTimestamp), | |
| 203 -7200) | |
| 204 self.assertEqual( | |
| 205 self.flo.getTimezoneOffset(utcStandardTimestamp), | |
| 206 -3600) | |
| 207 | |
| 208 # Test a timezone that doesn't have DST | |
| 209 os.environ['TZ'] = 'Africa/Johannesburg' | |
| 210 time.tzset() | |
| 211 self.assertEqual( | |
| 212 self.flo.getTimezoneOffset(utcDaylightTimestamp), | |
| 213 -7200) | |
| 214 self.assertEqual( | |
| 215 self.flo.getTimezoneOffset(utcStandardTimestamp), | |
| 216 -7200) | |
| 217 finally: | |
| 218 if originalTimezone is None: | |
| 219 del os.environ['TZ'] | |
| 220 else: | |
| 221 os.environ['TZ'] = originalTimezone | |
| 222 time.tzset() | |
| 223 if getattr(time, 'tzset', None) is None: | |
| 224 test_getTimezoneOffset.skip = ( | |
| 225 "Platform cannot change timezone, cannot verify correct offsets " | |
| 226 "in well-known timezones.") | |
| 227 | |
| 228 | |
| 229 def test_timeFormatting(self): | |
| 230 """ | |
| 231 Test the method of L{FileLogObserver} which turns a timestamp into a | |
| 232 human-readable string. | |
| 233 """ | |
| 234 # There is no function in the time module which converts a UTC time | |
| 235 # tuple to a timestamp. | |
| 236 when = time.mktime((2001, 2, 3, 4, 5, 6, 7, 8, 0)) - time.timezone | |
| 237 | |
| 238 # Pretend to be in US/Eastern for a moment | |
| 239 self.flo.getTimezoneOffset = lambda when: 18000 | |
| 240 self.assertEquals(self.flo.formatTime(when), '2001-02-02 23:05:06-0500') | |
| 241 | |
| 242 # Okay now we're in Eastern Europe somewhere | |
| 243 self.flo.getTimezoneOffset = lambda when: -3600 | |
| 244 self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:05:06+0100') | |
| 245 | |
| 246 # And off in the Pacific or someplace like that | |
| 247 self.flo.getTimezoneOffset = lambda when: -39600 | |
| 248 self.assertEquals(self.flo.formatTime(when), '2001-02-03 15:05:06+1100') | |
| 249 | |
| 250 # One of those weird places with a half-hour offset timezone | |
| 251 self.flo.getTimezoneOffset = lambda when: 5400 | |
| 252 self.assertEquals(self.flo.formatTime(when), '2001-02-03 02:35:06-0130') | |
| 253 | |
| 254 # Half-hour offset in the other direction | |
| 255 self.flo.getTimezoneOffset = lambda when: -5400 | |
| 256 self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:35:06+0130') | |
| 257 | |
| 258 # If a strftime-format string is present on the logger, it should | |
| 259 # use that instead. Note we don't assert anything about day, hour | |
| 260 # or minute because we cannot easily control what time.strftime() | |
| 261 # thinks the local timezone is. | |
| 262 self.flo.timeFormat = '%Y %m' | |
| 263 self.assertEquals(self.flo.formatTime(when), '2001 02') | |
| 264 | |
| 265 | |
| 266 def test_loggingAnObjectWithBroken__str__(self): | |
| 267 #HELLO, MCFLY | |
| 268 self.lp.msg(EvilStr()) | |
| 269 self.assertEquals(len(self.out), 1) | |
| 270 # Logging system shouldn't need to crap itself for this trivial case | |
| 271 self.assertNotIn('UNFORMATTABLE', self.out[0]) | |
| 272 | |
| 273 | |
| 274 def test_formattingAnObjectWithBroken__str__(self): | |
| 275 self.lp.msg(format='%(blat)s', blat=EvilStr()) | |
| 276 self.assertEquals(len(self.out), 1) | |
| 277 self.assertIn('Invalid format string or unformattable object', self.out[
0]) | |
| 278 | |
| 279 | |
| 280 def test_brokenSystem__str__(self): | |
| 281 self.lp.msg('huh', system=EvilStr()) | |
| 282 self.assertEquals(len(self.out), 1) | |
| 283 self.assertIn('Invalid format string or unformattable object', self.out[
0]) | |
| 284 | |
| 285 | |
| 286 def test_formattingAnObjectWithBroken__repr__Indirect(self): | |
| 287 self.lp.msg(format='%(blat)s', blat=[EvilRepr()]) | |
| 288 self.assertEquals(len(self.out), 1) | |
| 289 self.assertIn('UNFORMATTABLE OBJECT', self.out[0]) | |
| 290 | |
| 291 | |
| 292 def test_systemWithBroker__repr__Indirect(self): | |
| 293 self.lp.msg('huh', system=[EvilRepr()]) | |
| 294 self.assertEquals(len(self.out), 1) | |
| 295 self.assertIn('UNFORMATTABLE OBJECT', self.out[0]) | |
| 296 | |
| 297 | |
| 298 def test_simpleBrokenFormat(self): | |
| 299 self.lp.msg(format='hooj %s %s', blat=1) | |
| 300 self.assertEquals(len(self.out), 1) | |
| 301 self.assertIn('Invalid format string or unformattable object', self.out[
0]) | |
| 302 | |
| 303 | |
| 304 def test_ridiculousFormat(self): | |
| 305 self.lp.msg(format=42, blat=1) | |
| 306 self.assertEquals(len(self.out), 1) | |
| 307 self.assertIn('Invalid format string or unformattable object', self.out[
0]) | |
| 308 | |
| 309 | |
| 310 def test_evilFormat__repr__And__str__(self): | |
| 311 self.lp.msg(format=EvilReprStr(), blat=1) | |
| 312 self.assertEquals(len(self.out), 1) | |
| 313 self.assertIn('PATHOLOGICAL', self.out[0]) | |
| 314 | |
| 315 | |
| 316 def test_strangeEventDict(self): | |
| 317 """ | |
| 318 This kind of eventDict used to fail silently, so test it does. | |
| 319 """ | |
| 320 self.lp.msg(message='', isError=False) | |
| 321 self.assertEquals(len(self.out), 0) | |
| 322 | |
| 323 | |
| 324 class PythonLoggingObserverTestCase(unittest.TestCase): | |
| 325 """ | |
| 326 Test the bridge with python logging module. | |
| 327 """ | |
| 328 def setUp(self): | |
| 329 self.out = StringIO() | |
| 330 | |
| 331 rootLogger = logging.getLogger("") | |
| 332 self.originalLevel = rootLogger.getEffectiveLevel() | |
| 333 rootLogger.setLevel(logging.DEBUG) | |
| 334 self.hdlr = logging.StreamHandler(self.out) | |
| 335 fmt = logging.Formatter(logging.BASIC_FORMAT) | |
| 336 self.hdlr.setFormatter(fmt) | |
| 337 rootLogger.addHandler(self.hdlr) | |
| 338 | |
| 339 self.lp = log.LogPublisher() | |
| 340 self.obs = log.PythonLoggingObserver() | |
| 341 self.lp.addObserver(self.obs.emit) | |
| 342 | |
| 343 def tearDown(self): | |
| 344 rootLogger = logging.getLogger("") | |
| 345 rootLogger.removeHandler(self.hdlr) | |
| 346 rootLogger.setLevel(self.originalLevel) | |
| 347 logging.shutdown() | |
| 348 | |
| 349 def test_singleString(self): | |
| 350 """ | |
| 351 Test simple output, and default log level. | |
| 352 """ | |
| 353 self.lp.msg("Hello, world.") | |
| 354 self.assertIn("Hello, world.", self.out.getvalue()) | |
| 355 self.assertIn("INFO", self.out.getvalue()) | |
| 356 | |
| 357 def test_errorString(self): | |
| 358 """ | |
| 359 Test error output. | |
| 360 """ | |
| 361 self.lp.msg(failure=failure.Failure(ValueError("That is bad.")), isError
=True) | |
| 362 self.assertIn("ERROR", self.out.getvalue()) | |
| 363 | |
| 364 def test_formatString(self): | |
| 365 """ | |
| 366 Test logging with a format. | |
| 367 """ | |
| 368 self.lp.msg(format="%(bar)s oo %(foo)s", bar="Hello", foo="world") | |
| 369 self.assertIn("Hello oo world", self.out.getvalue()) | |
| 370 | |
| 371 def test_customLevel(self): | |
| 372 """ | |
| 373 Test the logLevel keyword for customizing level used. | |
| 374 """ | |
| 375 self.lp.msg("Spam egg.", logLevel=logging.DEBUG) | |
| 376 self.assertIn("Spam egg.", self.out.getvalue()) | |
| 377 self.assertIn("DEBUG", self.out.getvalue()) | |
| 378 self.out.reset() | |
| 379 self.lp.msg("Foo bar.", logLevel=logging.WARNING) | |
| 380 self.assertIn("Foo bar.", self.out.getvalue()) | |
| 381 self.assertIn("WARNING", self.out.getvalue()) | |
| 382 | |
| 383 def test_strangeEventDict(self): | |
| 384 """ | |
| 385 Verify that an event dictionary which is not an error and has an empty | |
| 386 message isn't recorded. | |
| 387 """ | |
| 388 self.lp.msg(message='', isError=False) | |
| 389 self.assertEquals(self.out.getvalue(), '') | |
| 390 | |
| 391 | |
| 392 class PythonLoggingIntegrationTestCase(unittest.TestCase): | |
| 393 """ | |
| 394 Test integration of python logging bridge. | |
| 395 """ | |
| 396 def test_startStopObserver(self): | |
| 397 """ | |
| 398 Test that start and stop methods of the observer actually register | |
| 399 and unregister to the log system. | |
| 400 """ | |
| 401 oldAddObserver = log.addObserver | |
| 402 oldRemoveObserver = log.removeObserver | |
| 403 l = [] | |
| 404 try: | |
| 405 log.addObserver = l.append | |
| 406 log.removeObserver = l.remove | |
| 407 obs = log.PythonLoggingObserver() | |
| 408 obs.start() | |
| 409 self.assertEquals(l[0], obs.emit) | |
| 410 obs.stop() | |
| 411 self.assertEquals(len(l), 0) | |
| 412 finally: | |
| 413 log.addObserver = oldAddObserver | |
| 414 log.removeObserver = oldRemoveObserver | |
| 415 | |
| 416 def test_inheritance(self): | |
| 417 """ | |
| 418 Test that we can inherit L{log.PythonLoggingObserver} and use super: | |
| 419 that's basically a validation that L{log.PythonLoggingObserver} is | |
| 420 new-style class. | |
| 421 """ | |
| 422 class MyObserver(log.PythonLoggingObserver): | |
| 423 def emit(self, eventDict): | |
| 424 super(MyObserver, self).emit(eventDict) | |
| 425 obs = MyObserver() | |
| 426 l = [] | |
| 427 oldEmit = log.PythonLoggingObserver.emit | |
| 428 try: | |
| 429 log.PythonLoggingObserver.emit = l.append | |
| 430 obs.emit('foo') | |
| 431 self.assertEquals(len(l), 1) | |
| 432 finally: | |
| 433 log.PythonLoggingObserver.emit = oldEmit | |
| 434 | |
| OLD | NEW |