| Index: third_party/twisted_8_1/twisted/test/test_log.py
|
| diff --git a/third_party/twisted_8_1/twisted/test/test_log.py b/third_party/twisted_8_1/twisted/test/test_log.py
|
| deleted file mode 100644
|
| index ba48f49a6025c9f2532f5375e15a10215249e58a..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/test/test_log.py
|
| +++ /dev/null
|
| @@ -1,434 +0,0 @@
|
| -# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -import os, sys, time, logging
|
| -from cStringIO import StringIO
|
| -
|
| -from twisted.trial import unittest
|
| -
|
| -from twisted.python import log
|
| -from twisted.python import failure
|
| -
|
| -
|
| -class LogTest(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.catcher = []
|
| - log.addObserver(self.catcher.append)
|
| -
|
| - def tearDown(self):
|
| - log.removeObserver(self.catcher.append)
|
| -
|
| - def testObservation(self):
|
| - catcher = self.catcher
|
| - log.msg("test", testShouldCatch=True)
|
| - i = catcher.pop()
|
| - self.assertEquals(i["message"][0], "test")
|
| - self.assertEquals(i["testShouldCatch"], True)
|
| - self.failUnless(i.has_key("time"))
|
| - self.assertEquals(len(catcher), 0)
|
| -
|
| - def testContext(self):
|
| - catcher = self.catcher
|
| - log.callWithContext({"subsystem": "not the default",
|
| - "subsubsystem": "a",
|
| - "other": "c"},
|
| - log.callWithContext,
|
| - {"subsubsystem": "b"}, log.msg, "foo", other="d")
|
| - i = catcher.pop()
|
| - self.assertEquals(i['subsubsystem'], 'b')
|
| - self.assertEquals(i['subsystem'], 'not the default')
|
| - self.assertEquals(i['other'], 'd')
|
| - self.assertEquals(i['message'][0], 'foo')
|
| -
|
| - def testErrors(self):
|
| - for e, ig in [("hello world","hello world"),
|
| - (KeyError(), KeyError),
|
| - (failure.Failure(RuntimeError()), RuntimeError)]:
|
| - log.err(e)
|
| - i = self.catcher.pop()
|
| - self.assertEquals(i['isError'], 1)
|
| - self.flushLoggedErrors(ig)
|
| -
|
| - def testErrorsWithWhy(self):
|
| - for e, ig in [("hello world","hello world"),
|
| - (KeyError(), KeyError),
|
| - (failure.Failure(RuntimeError()), RuntimeError)]:
|
| - log.err(e, 'foobar')
|
| - i = self.catcher.pop()
|
| - self.assertEquals(i['isError'], 1)
|
| - self.assertEquals(i['why'], 'foobar')
|
| - self.flushLoggedErrors(ig)
|
| -
|
| -
|
| - def testErroneousErrors(self):
|
| - L1 = []
|
| - L2 = []
|
| - log.addObserver(lambda events: L1.append(events))
|
| - log.addObserver(lambda events: 1/0)
|
| - log.addObserver(lambda events: L2.append(events))
|
| - log.msg("Howdy, y'all.")
|
| -
|
| - # XXX - use private _flushErrors so we don't also catch
|
| - # the deprecation warnings
|
| - excs = [f.type for f in log._flushErrors(ZeroDivisionError)]
|
| - self.assertEquals([ZeroDivisionError], excs)
|
| -
|
| - self.assertEquals(len(L1), 2)
|
| - self.assertEquals(len(L2), 2)
|
| -
|
| - self.assertEquals(L1[1]['message'], ("Howdy, y'all.",))
|
| - self.assertEquals(L2[0]['message'], ("Howdy, y'all.",))
|
| -
|
| - # The observer has been removed, there should be no exception
|
| - log.msg("Howdy, y'all.")
|
| -
|
| - self.assertEquals(len(L1), 3)
|
| - self.assertEquals(len(L2), 3)
|
| - self.assertEquals(L1[2]['message'], ("Howdy, y'all.",))
|
| - self.assertEquals(L2[2]['message'], ("Howdy, y'all.",))
|
| -
|
| -
|
| -class FakeFile(list):
|
| - def write(self, bytes):
|
| - self.append(bytes)
|
| -
|
| - def flush(self):
|
| - pass
|
| -
|
| -class EvilStr:
|
| - def __str__(self):
|
| - 1/0
|
| -
|
| -class EvilRepr:
|
| - def __str__(self):
|
| - return "Happy Evil Repr"
|
| - def __repr__(self):
|
| - 1/0
|
| -
|
| -class EvilReprStr(EvilStr, EvilRepr):
|
| - pass
|
| -
|
| -class LogPublisherTestCaseMixin:
|
| - def setUp(self):
|
| - """
|
| - Add a log observer which records log events in C{self.out}. Also,
|
| - make sure the default string encoding is ASCII so that
|
| - L{testSingleUnicode} can test the behavior of logging unencodable
|
| - unicode messages.
|
| - """
|
| - self.out = FakeFile()
|
| - self.lp = log.LogPublisher()
|
| - self.flo = log.FileLogObserver(self.out)
|
| - self.lp.addObserver(self.flo.emit)
|
| -
|
| - try:
|
| - str(u'\N{VULGAR FRACTION ONE HALF}')
|
| - except UnicodeEncodeError:
|
| - # This is the behavior we want - don't change anything.
|
| - self._origEncoding = None
|
| - else:
|
| - reload(sys)
|
| - self._origEncoding = sys.getdefaultencoding()
|
| - sys.setdefaultencoding('ascii')
|
| -
|
| -
|
| - def tearDown(self):
|
| - """
|
| - Verify that everything written to the fake file C{self.out} was a
|
| - C{str}. Also, restore the default string encoding to its previous
|
| - setting, if it was modified by L{setUp}.
|
| - """
|
| - for chunk in self.out:
|
| - self.failUnless(isinstance(chunk, str), "%r was not a string" % (chunk,))
|
| -
|
| - if self._origEncoding is not None:
|
| - sys.setdefaultencoding(self._origEncoding)
|
| - del sys.setdefaultencoding
|
| -
|
| -
|
| -
|
| -class LogPublisherTestCase(LogPublisherTestCaseMixin, unittest.TestCase):
|
| - def testSingleString(self):
|
| - self.lp.msg("Hello, world.")
|
| - self.assertEquals(len(self.out), 1)
|
| -
|
| -
|
| - def testMultipleString(self):
|
| - # Test some stupid behavior that will be deprecated real soon.
|
| - # If you are reading this and trying to learn how the logging
|
| - # system works, *do not use this feature*.
|
| - self.lp.msg("Hello, ", "world.")
|
| - self.assertEquals(len(self.out), 1)
|
| -
|
| -
|
| - def testSingleUnicode(self):
|
| - self.lp.msg(u"Hello, \N{VULGAR FRACTION ONE HALF} world.")
|
| - self.assertEquals(len(self.out), 1)
|
| - self.assertIn('with str error Traceback', self.out[0])
|
| - self.assertIn('UnicodeEncodeError', self.out[0])
|
| -
|
| -
|
| -
|
| -class FileObserverTestCase(LogPublisherTestCaseMixin, unittest.TestCase):
|
| - def test_getTimezoneOffset(self):
|
| - """
|
| - Attempt to verify that L{FileLogObserver.getTimezoneOffset} returns
|
| - correct values for the current C{TZ} environment setting. Do this
|
| - by setting C{TZ} to various well-known values and asserting that the
|
| - reported offset is correct.
|
| - """
|
| - localDaylightTuple = (2006, 6, 30, 0, 0, 0, 4, 181, 1)
|
| - utcDaylightTimestamp = time.mktime(localDaylightTuple)
|
| - localStandardTuple = (2007, 1, 31, 0, 0, 0, 2, 31, 0)
|
| - utcStandardTimestamp = time.mktime(localStandardTuple)
|
| -
|
| - originalTimezone = os.environ.get('TZ', None)
|
| - try:
|
| - # Test something west of UTC
|
| - os.environ['TZ'] = 'America/New_York'
|
| - time.tzset()
|
| - self.assertEqual(
|
| - self.flo.getTimezoneOffset(utcDaylightTimestamp),
|
| - 14400)
|
| - self.assertEqual(
|
| - self.flo.getTimezoneOffset(utcStandardTimestamp),
|
| - 18000)
|
| -
|
| - # Test something east of UTC
|
| - os.environ['TZ'] = 'Europe/Berlin'
|
| - time.tzset()
|
| - self.assertEqual(
|
| - self.flo.getTimezoneOffset(utcDaylightTimestamp),
|
| - -7200)
|
| - self.assertEqual(
|
| - self.flo.getTimezoneOffset(utcStandardTimestamp),
|
| - -3600)
|
| -
|
| - # Test a timezone that doesn't have DST
|
| - os.environ['TZ'] = 'Africa/Johannesburg'
|
| - time.tzset()
|
| - self.assertEqual(
|
| - self.flo.getTimezoneOffset(utcDaylightTimestamp),
|
| - -7200)
|
| - self.assertEqual(
|
| - self.flo.getTimezoneOffset(utcStandardTimestamp),
|
| - -7200)
|
| - finally:
|
| - if originalTimezone is None:
|
| - del os.environ['TZ']
|
| - else:
|
| - os.environ['TZ'] = originalTimezone
|
| - time.tzset()
|
| - if getattr(time, 'tzset', None) is None:
|
| - test_getTimezoneOffset.skip = (
|
| - "Platform cannot change timezone, cannot verify correct offsets "
|
| - "in well-known timezones.")
|
| -
|
| -
|
| - def test_timeFormatting(self):
|
| - """
|
| - Test the method of L{FileLogObserver} which turns a timestamp into a
|
| - human-readable string.
|
| - """
|
| - # There is no function in the time module which converts a UTC time
|
| - # tuple to a timestamp.
|
| - when = time.mktime((2001, 2, 3, 4, 5, 6, 7, 8, 0)) - time.timezone
|
| -
|
| - # Pretend to be in US/Eastern for a moment
|
| - self.flo.getTimezoneOffset = lambda when: 18000
|
| - self.assertEquals(self.flo.formatTime(when), '2001-02-02 23:05:06-0500')
|
| -
|
| - # Okay now we're in Eastern Europe somewhere
|
| - self.flo.getTimezoneOffset = lambda when: -3600
|
| - self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:05:06+0100')
|
| -
|
| - # And off in the Pacific or someplace like that
|
| - self.flo.getTimezoneOffset = lambda when: -39600
|
| - self.assertEquals(self.flo.formatTime(when), '2001-02-03 15:05:06+1100')
|
| -
|
| - # One of those weird places with a half-hour offset timezone
|
| - self.flo.getTimezoneOffset = lambda when: 5400
|
| - self.assertEquals(self.flo.formatTime(when), '2001-02-03 02:35:06-0130')
|
| -
|
| - # Half-hour offset in the other direction
|
| - self.flo.getTimezoneOffset = lambda when: -5400
|
| - self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:35:06+0130')
|
| -
|
| - # If a strftime-format string is present on the logger, it should
|
| - # use that instead. Note we don't assert anything about day, hour
|
| - # or minute because we cannot easily control what time.strftime()
|
| - # thinks the local timezone is.
|
| - self.flo.timeFormat = '%Y %m'
|
| - self.assertEquals(self.flo.formatTime(when), '2001 02')
|
| -
|
| -
|
| - def test_loggingAnObjectWithBroken__str__(self):
|
| - #HELLO, MCFLY
|
| - self.lp.msg(EvilStr())
|
| - self.assertEquals(len(self.out), 1)
|
| - # Logging system shouldn't need to crap itself for this trivial case
|
| - self.assertNotIn('UNFORMATTABLE', self.out[0])
|
| -
|
| -
|
| - def test_formattingAnObjectWithBroken__str__(self):
|
| - self.lp.msg(format='%(blat)s', blat=EvilStr())
|
| - self.assertEquals(len(self.out), 1)
|
| - self.assertIn('Invalid format string or unformattable object', self.out[0])
|
| -
|
| -
|
| - def test_brokenSystem__str__(self):
|
| - self.lp.msg('huh', system=EvilStr())
|
| - self.assertEquals(len(self.out), 1)
|
| - self.assertIn('Invalid format string or unformattable object', self.out[0])
|
| -
|
| -
|
| - def test_formattingAnObjectWithBroken__repr__Indirect(self):
|
| - self.lp.msg(format='%(blat)s', blat=[EvilRepr()])
|
| - self.assertEquals(len(self.out), 1)
|
| - self.assertIn('UNFORMATTABLE OBJECT', self.out[0])
|
| -
|
| -
|
| - def test_systemWithBroker__repr__Indirect(self):
|
| - self.lp.msg('huh', system=[EvilRepr()])
|
| - self.assertEquals(len(self.out), 1)
|
| - self.assertIn('UNFORMATTABLE OBJECT', self.out[0])
|
| -
|
| -
|
| - def test_simpleBrokenFormat(self):
|
| - self.lp.msg(format='hooj %s %s', blat=1)
|
| - self.assertEquals(len(self.out), 1)
|
| - self.assertIn('Invalid format string or unformattable object', self.out[0])
|
| -
|
| -
|
| - def test_ridiculousFormat(self):
|
| - self.lp.msg(format=42, blat=1)
|
| - self.assertEquals(len(self.out), 1)
|
| - self.assertIn('Invalid format string or unformattable object', self.out[0])
|
| -
|
| -
|
| - def test_evilFormat__repr__And__str__(self):
|
| - self.lp.msg(format=EvilReprStr(), blat=1)
|
| - self.assertEquals(len(self.out), 1)
|
| - self.assertIn('PATHOLOGICAL', self.out[0])
|
| -
|
| -
|
| - def test_strangeEventDict(self):
|
| - """
|
| - This kind of eventDict used to fail silently, so test it does.
|
| - """
|
| - self.lp.msg(message='', isError=False)
|
| - self.assertEquals(len(self.out), 0)
|
| -
|
| -
|
| -class PythonLoggingObserverTestCase(unittest.TestCase):
|
| - """
|
| - Test the bridge with python logging module.
|
| - """
|
| - def setUp(self):
|
| - self.out = StringIO()
|
| -
|
| - rootLogger = logging.getLogger("")
|
| - self.originalLevel = rootLogger.getEffectiveLevel()
|
| - rootLogger.setLevel(logging.DEBUG)
|
| - self.hdlr = logging.StreamHandler(self.out)
|
| - fmt = logging.Formatter(logging.BASIC_FORMAT)
|
| - self.hdlr.setFormatter(fmt)
|
| - rootLogger.addHandler(self.hdlr)
|
| -
|
| - self.lp = log.LogPublisher()
|
| - self.obs = log.PythonLoggingObserver()
|
| - self.lp.addObserver(self.obs.emit)
|
| -
|
| - def tearDown(self):
|
| - rootLogger = logging.getLogger("")
|
| - rootLogger.removeHandler(self.hdlr)
|
| - rootLogger.setLevel(self.originalLevel)
|
| - logging.shutdown()
|
| -
|
| - def test_singleString(self):
|
| - """
|
| - Test simple output, and default log level.
|
| - """
|
| - self.lp.msg("Hello, world.")
|
| - self.assertIn("Hello, world.", self.out.getvalue())
|
| - self.assertIn("INFO", self.out.getvalue())
|
| -
|
| - def test_errorString(self):
|
| - """
|
| - Test error output.
|
| - """
|
| - self.lp.msg(failure=failure.Failure(ValueError("That is bad.")), isError=True)
|
| - self.assertIn("ERROR", self.out.getvalue())
|
| -
|
| - def test_formatString(self):
|
| - """
|
| - Test logging with a format.
|
| - """
|
| - self.lp.msg(format="%(bar)s oo %(foo)s", bar="Hello", foo="world")
|
| - self.assertIn("Hello oo world", self.out.getvalue())
|
| -
|
| - def test_customLevel(self):
|
| - """
|
| - Test the logLevel keyword for customizing level used.
|
| - """
|
| - self.lp.msg("Spam egg.", logLevel=logging.DEBUG)
|
| - self.assertIn("Spam egg.", self.out.getvalue())
|
| - self.assertIn("DEBUG", self.out.getvalue())
|
| - self.out.reset()
|
| - self.lp.msg("Foo bar.", logLevel=logging.WARNING)
|
| - self.assertIn("Foo bar.", self.out.getvalue())
|
| - self.assertIn("WARNING", self.out.getvalue())
|
| -
|
| - def test_strangeEventDict(self):
|
| - """
|
| - Verify that an event dictionary which is not an error and has an empty
|
| - message isn't recorded.
|
| - """
|
| - self.lp.msg(message='', isError=False)
|
| - self.assertEquals(self.out.getvalue(), '')
|
| -
|
| -
|
| -class PythonLoggingIntegrationTestCase(unittest.TestCase):
|
| - """
|
| - Test integration of python logging bridge.
|
| - """
|
| - def test_startStopObserver(self):
|
| - """
|
| - Test that start and stop methods of the observer actually register
|
| - and unregister to the log system.
|
| - """
|
| - oldAddObserver = log.addObserver
|
| - oldRemoveObserver = log.removeObserver
|
| - l = []
|
| - try:
|
| - log.addObserver = l.append
|
| - log.removeObserver = l.remove
|
| - obs = log.PythonLoggingObserver()
|
| - obs.start()
|
| - self.assertEquals(l[0], obs.emit)
|
| - obs.stop()
|
| - self.assertEquals(len(l), 0)
|
| - finally:
|
| - log.addObserver = oldAddObserver
|
| - log.removeObserver = oldRemoveObserver
|
| -
|
| - def test_inheritance(self):
|
| - """
|
| - Test that we can inherit L{log.PythonLoggingObserver} and use super:
|
| - that's basically a validation that L{log.PythonLoggingObserver} is
|
| - new-style class.
|
| - """
|
| - class MyObserver(log.PythonLoggingObserver):
|
| - def emit(self, eventDict):
|
| - super(MyObserver, self).emit(eventDict)
|
| - obs = MyObserver()
|
| - l = []
|
| - oldEmit = log.PythonLoggingObserver.emit
|
| - try:
|
| - log.PythonLoggingObserver.emit = l.append
|
| - obs.emit('foo')
|
| - self.assertEquals(len(l), 1)
|
| - finally:
|
| - log.PythonLoggingObserver.emit = oldEmit
|
| -
|
|
|