OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 import os, sys, time, logging | |
5 from cStringIO import StringIO | |
6 | |
7 from twisted.trial import unittest | |
8 | |
9 from twisted.python import log | |
10 from twisted.python import failure | |
11 | |
12 | |
13 class LogTest(unittest.TestCase): | |
14 | |
15 def setUp(self): | |
16 self.catcher = [] | |
17 log.addObserver(self.catcher.append) | |
18 | |
19 def tearDown(self): | |
20 log.removeObserver(self.catcher.append) | |
21 | |
22 def testObservation(self): | |
23 catcher = self.catcher | |
24 log.msg("test", testShouldCatch=True) | |
25 i = catcher.pop() | |
26 self.assertEquals(i["message"][0], "test") | |
27 self.assertEquals(i["testShouldCatch"], True) | |
28 self.failUnless(i.has_key("time")) | |
29 self.assertEquals(len(catcher), 0) | |
30 | |
31 def testContext(self): | |
32 catcher = self.catcher | |
33 log.callWithContext({"subsystem": "not the default", | |
34 "subsubsystem": "a", | |
35 "other": "c"}, | |
36 log.callWithContext, | |
37 {"subsubsystem": "b"}, log.msg, "foo", other="d") | |
38 i = catcher.pop() | |
39 self.assertEquals(i['subsubsystem'], 'b') | |
40 self.assertEquals(i['subsystem'], 'not the default') | |
41 self.assertEquals(i['other'], 'd') | |
42 self.assertEquals(i['message'][0], 'foo') | |
43 | |
44 def testErrors(self): | |
45 for e, ig in [("hello world","hello world"), | |
46 (KeyError(), KeyError), | |
47 (failure.Failure(RuntimeError()), RuntimeError)]: | |
48 log.err(e) | |
49 i = self.catcher.pop() | |
50 self.assertEquals(i['isError'], 1) | |
51 self.flushLoggedErrors(ig) | |
52 | |
53 def testErrorsWithWhy(self): | |
54 for e, ig in [("hello world","hello world"), | |
55 (KeyError(), KeyError), | |
56 (failure.Failure(RuntimeError()), RuntimeError)]: | |
57 log.err(e, 'foobar') | |
58 i = self.catcher.pop() | |
59 self.assertEquals(i['isError'], 1) | |
60 self.assertEquals(i['why'], 'foobar') | |
61 self.flushLoggedErrors(ig) | |
62 | |
63 | |
64 def testErroneousErrors(self): | |
65 L1 = [] | |
66 L2 = [] | |
67 log.addObserver(lambda events: L1.append(events)) | |
68 log.addObserver(lambda events: 1/0) | |
69 log.addObserver(lambda events: L2.append(events)) | |
70 log.msg("Howdy, y'all.") | |
71 | |
72 # XXX - use private _flushErrors so we don't also catch | |
73 # the deprecation warnings | |
74 excs = [f.type for f in log._flushErrors(ZeroDivisionError)] | |
75 self.assertEquals([ZeroDivisionError], excs) | |
76 | |
77 self.assertEquals(len(L1), 2) | |
78 self.assertEquals(len(L2), 2) | |
79 | |
80 self.assertEquals(L1[1]['message'], ("Howdy, y'all.",)) | |
81 self.assertEquals(L2[0]['message'], ("Howdy, y'all.",)) | |
82 | |
83 # The observer has been removed, there should be no exception | |
84 log.msg("Howdy, y'all.") | |
85 | |
86 self.assertEquals(len(L1), 3) | |
87 self.assertEquals(len(L2), 3) | |
88 self.assertEquals(L1[2]['message'], ("Howdy, y'all.",)) | |
89 self.assertEquals(L2[2]['message'], ("Howdy, y'all.",)) | |
90 | |
91 | |
92 class FakeFile(list): | |
93 def write(self, bytes): | |
94 self.append(bytes) | |
95 | |
96 def flush(self): | |
97 pass | |
98 | |
99 class EvilStr: | |
100 def __str__(self): | |
101 1/0 | |
102 | |
103 class EvilRepr: | |
104 def __str__(self): | |
105 return "Happy Evil Repr" | |
106 def __repr__(self): | |
107 1/0 | |
108 | |
109 class EvilReprStr(EvilStr, EvilRepr): | |
110 pass | |
111 | |
112 class LogPublisherTestCaseMixin: | |
113 def setUp(self): | |
114 """ | |
115 Add a log observer which records log events in C{self.out}. Also, | |
116 make sure the default string encoding is ASCII so that | |
117 L{testSingleUnicode} can test the behavior of logging unencodable | |
118 unicode messages. | |
119 """ | |
120 self.out = FakeFile() | |
121 self.lp = log.LogPublisher() | |
122 self.flo = log.FileLogObserver(self.out) | |
123 self.lp.addObserver(self.flo.emit) | |
124 | |
125 try: | |
126 str(u'\N{VULGAR FRACTION ONE HALF}') | |
127 except UnicodeEncodeError: | |
128 # This is the behavior we want - don't change anything. | |
129 self._origEncoding = None | |
130 else: | |
131 reload(sys) | |
132 self._origEncoding = sys.getdefaultencoding() | |
133 sys.setdefaultencoding('ascii') | |
134 | |
135 | |
136 def tearDown(self): | |
137 """ | |
138 Verify that everything written to the fake file C{self.out} was a | |
139 C{str}. Also, restore the default string encoding to its previous | |
140 setting, if it was modified by L{setUp}. | |
141 """ | |
142 for chunk in self.out: | |
143 self.failUnless(isinstance(chunk, str), "%r was not a string" % (chu
nk,)) | |
144 | |
145 if self._origEncoding is not None: | |
146 sys.setdefaultencoding(self._origEncoding) | |
147 del sys.setdefaultencoding | |
148 | |
149 | |
150 | |
151 class LogPublisherTestCase(LogPublisherTestCaseMixin, unittest.TestCase): | |
152 def testSingleString(self): | |
153 self.lp.msg("Hello, world.") | |
154 self.assertEquals(len(self.out), 1) | |
155 | |
156 | |
157 def testMultipleString(self): | |
158 # Test some stupid behavior that will be deprecated real soon. | |
159 # If you are reading this and trying to learn how the logging | |
160 # system works, *do not use this feature*. | |
161 self.lp.msg("Hello, ", "world.") | |
162 self.assertEquals(len(self.out), 1) | |
163 | |
164 | |
165 def testSingleUnicode(self): | |
166 self.lp.msg(u"Hello, \N{VULGAR FRACTION ONE HALF} world.") | |
167 self.assertEquals(len(self.out), 1) | |
168 self.assertIn('with str error Traceback', self.out[0]) | |
169 self.assertIn('UnicodeEncodeError', self.out[0]) | |
170 | |
171 | |
172 | |
173 class FileObserverTestCase(LogPublisherTestCaseMixin, unittest.TestCase): | |
174 def test_getTimezoneOffset(self): | |
175 """ | |
176 Attempt to verify that L{FileLogObserver.getTimezoneOffset} returns | |
177 correct values for the current C{TZ} environment setting. Do this | |
178 by setting C{TZ} to various well-known values and asserting that the | |
179 reported offset is correct. | |
180 """ | |
181 localDaylightTuple = (2006, 6, 30, 0, 0, 0, 4, 181, 1) | |
182 utcDaylightTimestamp = time.mktime(localDaylightTuple) | |
183 localStandardTuple = (2007, 1, 31, 0, 0, 0, 2, 31, 0) | |
184 utcStandardTimestamp = time.mktime(localStandardTuple) | |
185 | |
186 originalTimezone = os.environ.get('TZ', None) | |
187 try: | |
188 # Test something west of UTC | |
189 os.environ['TZ'] = 'America/New_York' | |
190 time.tzset() | |
191 self.assertEqual( | |
192 self.flo.getTimezoneOffset(utcDaylightTimestamp), | |
193 14400) | |
194 self.assertEqual( | |
195 self.flo.getTimezoneOffset(utcStandardTimestamp), | |
196 18000) | |
197 | |
198 # Test something east of UTC | |
199 os.environ['TZ'] = 'Europe/Berlin' | |
200 time.tzset() | |
201 self.assertEqual( | |
202 self.flo.getTimezoneOffset(utcDaylightTimestamp), | |
203 -7200) | |
204 self.assertEqual( | |
205 self.flo.getTimezoneOffset(utcStandardTimestamp), | |
206 -3600) | |
207 | |
208 # Test a timezone that doesn't have DST | |
209 os.environ['TZ'] = 'Africa/Johannesburg' | |
210 time.tzset() | |
211 self.assertEqual( | |
212 self.flo.getTimezoneOffset(utcDaylightTimestamp), | |
213 -7200) | |
214 self.assertEqual( | |
215 self.flo.getTimezoneOffset(utcStandardTimestamp), | |
216 -7200) | |
217 finally: | |
218 if originalTimezone is None: | |
219 del os.environ['TZ'] | |
220 else: | |
221 os.environ['TZ'] = originalTimezone | |
222 time.tzset() | |
223 if getattr(time, 'tzset', None) is None: | |
224 test_getTimezoneOffset.skip = ( | |
225 "Platform cannot change timezone, cannot verify correct offsets " | |
226 "in well-known timezones.") | |
227 | |
228 | |
229 def test_timeFormatting(self): | |
230 """ | |
231 Test the method of L{FileLogObserver} which turns a timestamp into a | |
232 human-readable string. | |
233 """ | |
234 # There is no function in the time module which converts a UTC time | |
235 # tuple to a timestamp. | |
236 when = time.mktime((2001, 2, 3, 4, 5, 6, 7, 8, 0)) - time.timezone | |
237 | |
238 # Pretend to be in US/Eastern for a moment | |
239 self.flo.getTimezoneOffset = lambda when: 18000 | |
240 self.assertEquals(self.flo.formatTime(when), '2001-02-02 23:05:06-0500') | |
241 | |
242 # Okay now we're in Eastern Europe somewhere | |
243 self.flo.getTimezoneOffset = lambda when: -3600 | |
244 self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:05:06+0100') | |
245 | |
246 # And off in the Pacific or someplace like that | |
247 self.flo.getTimezoneOffset = lambda when: -39600 | |
248 self.assertEquals(self.flo.formatTime(when), '2001-02-03 15:05:06+1100') | |
249 | |
250 # One of those weird places with a half-hour offset timezone | |
251 self.flo.getTimezoneOffset = lambda when: 5400 | |
252 self.assertEquals(self.flo.formatTime(when), '2001-02-03 02:35:06-0130') | |
253 | |
254 # Half-hour offset in the other direction | |
255 self.flo.getTimezoneOffset = lambda when: -5400 | |
256 self.assertEquals(self.flo.formatTime(when), '2001-02-03 05:35:06+0130') | |
257 | |
258 # If a strftime-format string is present on the logger, it should | |
259 # use that instead. Note we don't assert anything about day, hour | |
260 # or minute because we cannot easily control what time.strftime() | |
261 # thinks the local timezone is. | |
262 self.flo.timeFormat = '%Y %m' | |
263 self.assertEquals(self.flo.formatTime(when), '2001 02') | |
264 | |
265 | |
266 def test_loggingAnObjectWithBroken__str__(self): | |
267 #HELLO, MCFLY | |
268 self.lp.msg(EvilStr()) | |
269 self.assertEquals(len(self.out), 1) | |
270 # Logging system shouldn't need to crap itself for this trivial case | |
271 self.assertNotIn('UNFORMATTABLE', self.out[0]) | |
272 | |
273 | |
274 def test_formattingAnObjectWithBroken__str__(self): | |
275 self.lp.msg(format='%(blat)s', blat=EvilStr()) | |
276 self.assertEquals(len(self.out), 1) | |
277 self.assertIn('Invalid format string or unformattable object', self.out[
0]) | |
278 | |
279 | |
280 def test_brokenSystem__str__(self): | |
281 self.lp.msg('huh', system=EvilStr()) | |
282 self.assertEquals(len(self.out), 1) | |
283 self.assertIn('Invalid format string or unformattable object', self.out[
0]) | |
284 | |
285 | |
286 def test_formattingAnObjectWithBroken__repr__Indirect(self): | |
287 self.lp.msg(format='%(blat)s', blat=[EvilRepr()]) | |
288 self.assertEquals(len(self.out), 1) | |
289 self.assertIn('UNFORMATTABLE OBJECT', self.out[0]) | |
290 | |
291 | |
292 def test_systemWithBroker__repr__Indirect(self): | |
293 self.lp.msg('huh', system=[EvilRepr()]) | |
294 self.assertEquals(len(self.out), 1) | |
295 self.assertIn('UNFORMATTABLE OBJECT', self.out[0]) | |
296 | |
297 | |
298 def test_simpleBrokenFormat(self): | |
299 self.lp.msg(format='hooj %s %s', blat=1) | |
300 self.assertEquals(len(self.out), 1) | |
301 self.assertIn('Invalid format string or unformattable object', self.out[
0]) | |
302 | |
303 | |
304 def test_ridiculousFormat(self): | |
305 self.lp.msg(format=42, blat=1) | |
306 self.assertEquals(len(self.out), 1) | |
307 self.assertIn('Invalid format string or unformattable object', self.out[
0]) | |
308 | |
309 | |
310 def test_evilFormat__repr__And__str__(self): | |
311 self.lp.msg(format=EvilReprStr(), blat=1) | |
312 self.assertEquals(len(self.out), 1) | |
313 self.assertIn('PATHOLOGICAL', self.out[0]) | |
314 | |
315 | |
316 def test_strangeEventDict(self): | |
317 """ | |
318 This kind of eventDict used to fail silently, so test it does. | |
319 """ | |
320 self.lp.msg(message='', isError=False) | |
321 self.assertEquals(len(self.out), 0) | |
322 | |
323 | |
324 class PythonLoggingObserverTestCase(unittest.TestCase): | |
325 """ | |
326 Test the bridge with python logging module. | |
327 """ | |
328 def setUp(self): | |
329 self.out = StringIO() | |
330 | |
331 rootLogger = logging.getLogger("") | |
332 self.originalLevel = rootLogger.getEffectiveLevel() | |
333 rootLogger.setLevel(logging.DEBUG) | |
334 self.hdlr = logging.StreamHandler(self.out) | |
335 fmt = logging.Formatter(logging.BASIC_FORMAT) | |
336 self.hdlr.setFormatter(fmt) | |
337 rootLogger.addHandler(self.hdlr) | |
338 | |
339 self.lp = log.LogPublisher() | |
340 self.obs = log.PythonLoggingObserver() | |
341 self.lp.addObserver(self.obs.emit) | |
342 | |
343 def tearDown(self): | |
344 rootLogger = logging.getLogger("") | |
345 rootLogger.removeHandler(self.hdlr) | |
346 rootLogger.setLevel(self.originalLevel) | |
347 logging.shutdown() | |
348 | |
349 def test_singleString(self): | |
350 """ | |
351 Test simple output, and default log level. | |
352 """ | |
353 self.lp.msg("Hello, world.") | |
354 self.assertIn("Hello, world.", self.out.getvalue()) | |
355 self.assertIn("INFO", self.out.getvalue()) | |
356 | |
357 def test_errorString(self): | |
358 """ | |
359 Test error output. | |
360 """ | |
361 self.lp.msg(failure=failure.Failure(ValueError("That is bad.")), isError
=True) | |
362 self.assertIn("ERROR", self.out.getvalue()) | |
363 | |
364 def test_formatString(self): | |
365 """ | |
366 Test logging with a format. | |
367 """ | |
368 self.lp.msg(format="%(bar)s oo %(foo)s", bar="Hello", foo="world") | |
369 self.assertIn("Hello oo world", self.out.getvalue()) | |
370 | |
371 def test_customLevel(self): | |
372 """ | |
373 Test the logLevel keyword for customizing level used. | |
374 """ | |
375 self.lp.msg("Spam egg.", logLevel=logging.DEBUG) | |
376 self.assertIn("Spam egg.", self.out.getvalue()) | |
377 self.assertIn("DEBUG", self.out.getvalue()) | |
378 self.out.reset() | |
379 self.lp.msg("Foo bar.", logLevel=logging.WARNING) | |
380 self.assertIn("Foo bar.", self.out.getvalue()) | |
381 self.assertIn("WARNING", self.out.getvalue()) | |
382 | |
383 def test_strangeEventDict(self): | |
384 """ | |
385 Verify that an event dictionary which is not an error and has an empty | |
386 message isn't recorded. | |
387 """ | |
388 self.lp.msg(message='', isError=False) | |
389 self.assertEquals(self.out.getvalue(), '') | |
390 | |
391 | |
392 class PythonLoggingIntegrationTestCase(unittest.TestCase): | |
393 """ | |
394 Test integration of python logging bridge. | |
395 """ | |
396 def test_startStopObserver(self): | |
397 """ | |
398 Test that start and stop methods of the observer actually register | |
399 and unregister to the log system. | |
400 """ | |
401 oldAddObserver = log.addObserver | |
402 oldRemoveObserver = log.removeObserver | |
403 l = [] | |
404 try: | |
405 log.addObserver = l.append | |
406 log.removeObserver = l.remove | |
407 obs = log.PythonLoggingObserver() | |
408 obs.start() | |
409 self.assertEquals(l[0], obs.emit) | |
410 obs.stop() | |
411 self.assertEquals(len(l), 0) | |
412 finally: | |
413 log.addObserver = oldAddObserver | |
414 log.removeObserver = oldRemoveObserver | |
415 | |
416 def test_inheritance(self): | |
417 """ | |
418 Test that we can inherit L{log.PythonLoggingObserver} and use super: | |
419 that's basically a validation that L{log.PythonLoggingObserver} is | |
420 new-style class. | |
421 """ | |
422 class MyObserver(log.PythonLoggingObserver): | |
423 def emit(self, eventDict): | |
424 super(MyObserver, self).emit(eventDict) | |
425 obs = MyObserver() | |
426 l = [] | |
427 oldEmit = log.PythonLoggingObserver.emit | |
428 try: | |
429 log.PythonLoggingObserver.emit = l.append | |
430 obs.emit('foo') | |
431 self.assertEquals(len(l), 1) | |
432 finally: | |
433 log.PythonLoggingObserver.emit = oldEmit | |
434 | |
OLD | NEW |