| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 | |
| 5 """Test cases for the NMEA GPS protocol""" | |
| 6 | |
| 7 import StringIO | |
| 8 | |
| 9 from twisted.trial import unittest | |
| 10 from twisted.internet import reactor, protocol | |
| 11 from twisted.python import reflect | |
| 12 | |
| 13 from twisted.protocols.gps import nmea | |
| 14 | |
| 15 class StringIOWithNoClose(StringIO.StringIO): | |
| 16 def close(self): | |
| 17 pass | |
| 18 | |
| 19 class ResultHarvester: | |
| 20 def __init__(self): | |
| 21 self.results = [] | |
| 22 | |
| 23 def __call__(self, *args): | |
| 24 self.results.append(args) | |
| 25 | |
| 26 def performTest(self, function, *args, **kwargs): | |
| 27 l = len(self.results) | |
| 28 try: | |
| 29 function(*args, **kwargs) | |
| 30 except Exception, e: | |
| 31 self.results.append(e) | |
| 32 if l == len(self.results): | |
| 33 self.results.append(NotImplementedError()) | |
| 34 | |
| 35 class NMEATester(nmea.NMEAReceiver): | |
| 36 ignore_invalid_sentence = 0 | |
| 37 ignore_checksum_mismatch = 0 | |
| 38 ignore_unknown_sentencetypes = 0 | |
| 39 convert_dates_before_y2k = 1 | |
| 40 | |
| 41 def connectionMade(self): | |
| 42 self.resultHarvester = ResultHarvester() | |
| 43 for fn in reflect.prefixedMethodNames(self.__class__, 'decode_'): | |
| 44 setattr(self, 'handle_' + fn, self.resultHarvester) | |
| 45 | |
| 46 class NMEAReceiverTestCase(unittest.TestCase): | |
| 47 messages = ( | |
| 48 # fix - signal acquired | |
| 49 "$GPGGA,231713.0,3910.413,N,07641.994,W,1,05,1.35,00044,M,-033,M,,*69", | |
| 50 # fix - signal not acquired | |
| 51 "$GPGGA,235947.000,0000.0000,N,00000.0000,E,0,00,0.0,0.0,M,,,,0000*00", | |
| 52 # junk | |
| 53 "lkjasdfkl!@#(*$!@(*#(ASDkfjasdfLMASDCVKAW!@#($)!(@#)(*", | |
| 54 # fix - signal acquired (invalid checksum) | |
| 55 "$GPGGA,231713.0,3910.413,N,07641.994,W,1,05,1.35,00044,M,-033,M,,*68", | |
| 56 # invalid sentence | |
| 57 "$GPGGX,231713.0,3910.413,N,07641.994,W,1,05,1.35,00044,M,-033,M,,*68", | |
| 58 # position acquired | |
| 59 "$GPGLL,4250.5589,S,14718.5084,E,092204.999,A*2D", | |
| 60 # position not acquired | |
| 61 "$GPGLL,0000.0000,N,00000.0000,E,235947.000,V*2D", | |
| 62 # active satellites (no fix) | |
| 63 "$GPGSA,A,1,,,,,,,,,,,,,0.0,0.0,0.0*30", | |
| 64 # active satellites | |
| 65 "$GPGSA,A,3,01,20,19,13,,,,,,,,,40.4,24.4,32.2*0A", | |
| 66 # positiontime (no fix) | |
| 67 "$GPRMC,235947.000,V,0000.0000,N,00000.0000,E,,,041299,,*1D", | |
| 68 # positiontime | |
| 69 "$GPRMC,092204.999,A,4250.5589,S,14718.5084,E,0.00,89.68,211200,,*25", | |
| 70 # course over ground (no fix - not implemented) | |
| 71 "$GPVTG,,T,,M,,N,,K*4E", | |
| 72 # course over ground (not implemented) | |
| 73 "$GPVTG,89.68,T,,M,0.00,N,0.0,K*5F", | |
| 74 ) | |
| 75 results = ( | |
| 76 (83833.0, 39.17355, -76.6999, nmea.POSFIX_SPS, 5, 1.35, (44.0, 'M'), (-3
3.0, 'M'), None), | |
| 77 (86387.0, 0.0, 0.0, 0, 0, 0.0, (0.0, 'M'), None, None), | |
| 78 nmea.InvalidSentence(), | |
| 79 nmea.InvalidChecksum(), | |
| 80 nmea.InvalidSentence(), | |
| 81 (-42.842648333333337, 147.30847333333332, 33724.999000000003, 1), | |
| 82 (0.0, 0.0, 86387.0, 0), | |
| 83 ((None, None, None, None, None, None, None, None, None, None, None, None
), (nmea.MODE_AUTO, nmea.MODE_NOFIX), 0.0, 0.0, 0.0), | |
| 84 ((1, 20, 19, 13, None, None, None, None, None, None, None, None), (nmea.
MODE_AUTO, nmea.MODE_3D), 40.4, 24.4, 32.2), | |
| 85 (0.0, 0.0, None, None, 86387.0, (1999, 12, 4), None), | |
| 86 (-42.842648333333337, 147.30847333333332, 0.0, 89.68, 33724.999, (2000,
12, 21), None), | |
| 87 NotImplementedError(), | |
| 88 NotImplementedError(), | |
| 89 ) | |
| 90 def testGPSMessages(self): | |
| 91 dummy = NMEATester() | |
| 92 dummy.makeConnection(protocol.FileWrapper(StringIOWithNoClose())) | |
| 93 for line in self.messages: | |
| 94 dummy.resultHarvester.performTest(dummy.lineReceived, line) | |
| 95 def munge(myTuple): | |
| 96 if type(myTuple) != type(()): | |
| 97 return | |
| 98 newTuple = [] | |
| 99 for v in myTuple: | |
| 100 if type(v) == type(1.1): | |
| 101 v = float(int(v * 10000.0)) * 0.0001 | |
| 102 newTuple.append(v) | |
| 103 return tuple(newTuple) | |
| 104 for (message, expectedResult, actualResult) in zip(self.messages, self.r
esults, dummy.resultHarvester.results): | |
| 105 expectedResult = munge(expectedResult) | |
| 106 actualResult = munge(actualResult) | |
| 107 if isinstance(expectedResult, Exception): | |
| 108 if isinstance(actualResult, Exception): | |
| 109 self.failUnlessEqual(expectedResult.__class__, actualResult.
__class__, "\nInput:\n%s\nExpected:\n%s.%s\nResults:\n%s.%s\n" % (message, expec
tedResult.__class__.__module__, expectedResult.__class__.__name__, actualResult.
__class__.__module__, actualResult.__class__.__name__)) | |
| 110 else: | |
| 111 self.failUnlessEqual(1, 0, "\nInput:\n%s\nExpected:\n%s.%s\n
Results:\n%r\n" % (message, expectedResult.__class__.__module__, expectedResult.
__class__.__name__, actualResult)) | |
| 112 else: | |
| 113 self.failUnlessEqual(expectedResult, actualResult, "\nInput:\n%s\n
Expected: %r\nResults: %r\n" % (message, expectedResult, actualResult)) | |
| 114 | |
| 115 testCases = [NMEAReceiverTestCase] | |
| OLD | NEW |