OLD | NEW |
(Empty) | |
| 1 # BER decoder |
| 2 from pyasn1.type import tag, univ, char, useful, tagmap |
| 3 from pyasn1.codec.ber import eoo |
| 4 from pyasn1.compat.octets import oct2int, isOctetsType |
| 5 from pyasn1 import debug, error |
| 6 |
| 7 class AbstractDecoder: |
| 8 protoComponent = None |
| 9 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 10 length, state, decodeFun, substrateFun): |
| 11 raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,)) |
| 12 |
| 13 def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 14 length, state, decodeFun, substrateFun): |
| 15 raise error.PyAsn1Error('Indefinite length mode decoder not implemented
for %s' % (tagSet,)) |
| 16 |
| 17 class AbstractSimpleDecoder(AbstractDecoder): |
| 18 tagFormats = (tag.tagFormatSimple,) |
| 19 def _createComponent(self, asn1Spec, tagSet, value=None): |
| 20 if tagSet[0][1] not in self.tagFormats: |
| 21 raise error.PyAsn1Error('Invalid tag format %s for %s' % (tagSet[0],
self.protoComponent.prettyPrintType())) |
| 22 if asn1Spec is None: |
| 23 return self.protoComponent.clone(value, tagSet) |
| 24 elif value is None: |
| 25 return asn1Spec |
| 26 else: |
| 27 return asn1Spec.clone(value) |
| 28 |
| 29 class AbstractConstructedDecoder(AbstractDecoder): |
| 30 tagFormats = (tag.tagFormatConstructed,) |
| 31 def _createComponent(self, asn1Spec, tagSet, value=None): |
| 32 if tagSet[0][1] not in self.tagFormats: |
| 33 raise error.PyAsn1Error('Invalid tag format %s for %s' % (tagSet[0],
self.protoComponent.prettyPrintType())) |
| 34 if asn1Spec is None: |
| 35 return self.protoComponent.clone(tagSet) |
| 36 else: |
| 37 return asn1Spec.clone() |
| 38 |
| 39 class ExplicitTagDecoder(AbstractSimpleDecoder): |
| 40 protoComponent = univ.Any('') |
| 41 tagFormats = (tag.tagFormatConstructed,) |
| 42 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 43 length, state, decodeFun, substrateFun): |
| 44 if substrateFun: |
| 45 return substrateFun( |
| 46 self._createComponent(asn1Spec, tagSet, ''), |
| 47 substrate, length |
| 48 ) |
| 49 head, tail = substrate[:length], substrate[length:] |
| 50 value, _ = decodeFun(head, asn1Spec, tagSet, length) |
| 51 return value, tail |
| 52 |
| 53 def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 54 length, state, decodeFun, substrateFun): |
| 55 if substrateFun: |
| 56 return substrateFun( |
| 57 self._createComponent(asn1Spec, tagSet, ''), |
| 58 substrate, length |
| 59 ) |
| 60 value, substrate = decodeFun(substrate, asn1Spec, tagSet, length) |
| 61 terminator, substrate = decodeFun(substrate, allowEoo=True) |
| 62 if eoo.endOfOctets.isSameTypeWith(terminator) and \ |
| 63 terminator == eoo.endOfOctets: |
| 64 return value, substrate |
| 65 else: |
| 66 raise error.PyAsn1Error('Missing end-of-octets terminator') |
| 67 |
| 68 explicitTagDecoder = ExplicitTagDecoder() |
| 69 |
| 70 class IntegerDecoder(AbstractSimpleDecoder): |
| 71 protoComponent = univ.Integer(0) |
| 72 precomputedValues = { |
| 73 '\x00': 0, |
| 74 '\x01': 1, |
| 75 '\x02': 2, |
| 76 '\x03': 3, |
| 77 '\x04': 4, |
| 78 '\x05': 5, |
| 79 '\x06': 6, |
| 80 '\x07': 7, |
| 81 '\x08': 8, |
| 82 '\x09': 9, |
| 83 '\xff': -1, |
| 84 '\xfe': -2, |
| 85 '\xfd': -3, |
| 86 '\xfc': -4, |
| 87 '\xfb': -5 |
| 88 } |
| 89 |
| 90 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
| 91 state, decodeFun, substrateFun): |
| 92 head, tail = substrate[:length], substrate[length:] |
| 93 if not head: |
| 94 return self._createComponent(asn1Spec, tagSet, 0), tail |
| 95 if head in self.precomputedValues: |
| 96 value = self.precomputedValues[head] |
| 97 else: |
| 98 firstOctet = oct2int(head[0]) |
| 99 if firstOctet & 0x80: |
| 100 value = -1 |
| 101 else: |
| 102 value = 0 |
| 103 for octet in head: |
| 104 value = value << 8 | oct2int(octet) |
| 105 return self._createComponent(asn1Spec, tagSet, value), tail |
| 106 |
| 107 class BooleanDecoder(IntegerDecoder): |
| 108 protoComponent = univ.Boolean(0) |
| 109 def _createComponent(self, asn1Spec, tagSet, value=None): |
| 110 return IntegerDecoder._createComponent(self, asn1Spec, tagSet, value and
1 or 0) |
| 111 |
| 112 class BitStringDecoder(AbstractSimpleDecoder): |
| 113 protoComponent = univ.BitString(()) |
| 114 tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed) |
| 115 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
| 116 state, decodeFun, substrateFun): |
| 117 head, tail = substrate[:length], substrate[length:] |
| 118 if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? |
| 119 if not head: |
| 120 raise error.PyAsn1Error('Empty substrate') |
| 121 trailingBits = oct2int(head[0]) |
| 122 if trailingBits > 7: |
| 123 raise error.PyAsn1Error( |
| 124 'Trailing bits overflow %s' % trailingBits |
| 125 ) |
| 126 head = head[1:] |
| 127 lsb = p = 0; l = len(head)-1; b = [] |
| 128 while p <= l: |
| 129 if p == l: |
| 130 lsb = trailingBits |
| 131 j = 7 |
| 132 o = oct2int(head[p]) |
| 133 while j >= lsb: |
| 134 b.append((o>>j)&0x01) |
| 135 j = j - 1 |
| 136 p = p + 1 |
| 137 return self._createComponent(asn1Spec, tagSet, b), tail |
| 138 r = self._createComponent(asn1Spec, tagSet, ()) |
| 139 if substrateFun: |
| 140 return substrateFun(r, substrate, length) |
| 141 while head: |
| 142 component, head = decodeFun(head, self.protoComponent) |
| 143 r = r + component |
| 144 return r, tail |
| 145 |
| 146 def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 147 length, state, decodeFun, substrateFun): |
| 148 r = self._createComponent(asn1Spec, tagSet, '') |
| 149 if substrateFun: |
| 150 return substrateFun(r, substrate, length) |
| 151 while substrate: |
| 152 component, substrate = decodeFun(substrate, self.protoComponent, |
| 153 allowEoo=True) |
| 154 if eoo.endOfOctets.isSameTypeWith(component) and \ |
| 155 component == eoo.endOfOctets: |
| 156 break |
| 157 r = r + component |
| 158 else: |
| 159 raise error.SubstrateUnderrunError( |
| 160 'No EOO seen before substrate ends' |
| 161 ) |
| 162 return r, substrate |
| 163 |
| 164 class OctetStringDecoder(AbstractSimpleDecoder): |
| 165 protoComponent = univ.OctetString('') |
| 166 tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed) |
| 167 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
| 168 state, decodeFun, substrateFun): |
| 169 head, tail = substrate[:length], substrate[length:] |
| 170 if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? |
| 171 return self._createComponent(asn1Spec, tagSet, head), tail |
| 172 r = self._createComponent(asn1Spec, tagSet, '') |
| 173 if substrateFun: |
| 174 return substrateFun(r, substrate, length) |
| 175 while head: |
| 176 component, head = decodeFun(head, self.protoComponent) |
| 177 r = r + component |
| 178 return r, tail |
| 179 |
| 180 def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 181 length, state, decodeFun, substrateFun): |
| 182 r = self._createComponent(asn1Spec, tagSet, '') |
| 183 if substrateFun: |
| 184 return substrateFun(r, substrate, length) |
| 185 while substrate: |
| 186 component, substrate = decodeFun(substrate, self.protoComponent, |
| 187 allowEoo=True) |
| 188 if eoo.endOfOctets.isSameTypeWith(component) and \ |
| 189 component == eoo.endOfOctets: |
| 190 break |
| 191 r = r + component |
| 192 else: |
| 193 raise error.SubstrateUnderrunError( |
| 194 'No EOO seen before substrate ends' |
| 195 ) |
| 196 return r, substrate |
| 197 |
| 198 class NullDecoder(AbstractSimpleDecoder): |
| 199 protoComponent = univ.Null('') |
| 200 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 201 length, state, decodeFun, substrateFun): |
| 202 head, tail = substrate[:length], substrate[length:] |
| 203 r = self._createComponent(asn1Spec, tagSet) |
| 204 if head: |
| 205 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % l
ength) |
| 206 return r, tail |
| 207 |
| 208 class ObjectIdentifierDecoder(AbstractSimpleDecoder): |
| 209 protoComponent = univ.ObjectIdentifier(()) |
| 210 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
| 211 state, decodeFun, substrateFun): |
| 212 head, tail = substrate[:length], substrate[length:] |
| 213 if not head: |
| 214 raise error.PyAsn1Error('Empty substrate') |
| 215 |
| 216 oid = () |
| 217 index = 0 |
| 218 substrateLen = len(head) |
| 219 while index < substrateLen: |
| 220 subId = oct2int(head[index]) |
| 221 index += 1 |
| 222 if subId < 128: |
| 223 oid = oid + (subId,) |
| 224 elif subId > 128: |
| 225 # Construct subid from a number of octets |
| 226 nextSubId = subId |
| 227 subId = 0 |
| 228 while nextSubId >= 128: |
| 229 subId = (subId << 7) + (nextSubId & 0x7F) |
| 230 if index >= substrateLen: |
| 231 raise error.SubstrateUnderrunError( |
| 232 'Short substrate for sub-OID past %s' % (oid,) |
| 233 ) |
| 234 nextSubId = oct2int(head[index]) |
| 235 index += 1 |
| 236 oid = oid + ((subId << 7) + nextSubId,) |
| 237 elif subId == 128: |
| 238 # ASN.1 spec forbids leading zeros (0x80) in OID |
| 239 # encoding, tolerating it opens a vulnerability. See |
| 240 # http://www.cosic.esat.kuleuven.be/publications/article-1432.pd
f |
| 241 # page 7 |
| 242 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') |
| 243 |
| 244 # Decode two leading arcs |
| 245 if 0 <= oid[0] <= 39: |
| 246 oid = (0,) + oid |
| 247 elif 40 <= oid[0] <= 79: |
| 248 oid = (1, oid[0]-40) + oid[1:] |
| 249 elif oid[0] >= 80: |
| 250 oid = (2, oid[0]-80) + oid[1:] |
| 251 else: |
| 252 raise error.PyAsn1Error('Malformed first OID octet: %s' % head[0]) |
| 253 |
| 254 return self._createComponent(asn1Spec, tagSet, oid), tail |
| 255 |
| 256 class RealDecoder(AbstractSimpleDecoder): |
| 257 protoComponent = univ.Real() |
| 258 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 259 length, state, decodeFun, substrateFun): |
| 260 head, tail = substrate[:length], substrate[length:] |
| 261 if not head: |
| 262 return self._createComponent(asn1Spec, tagSet, 0.0), tail |
| 263 fo = oct2int(head[0]); head = head[1:] |
| 264 if fo & 0x80: # binary encoding |
| 265 if not head: |
| 266 raise error.PyAsn1Error("Incomplete floating-point value") |
| 267 n = (fo & 0x03) + 1 |
| 268 if n == 4: |
| 269 n = oct2int(head[0]) |
| 270 head = head[1:] |
| 271 eo, head = head[:n], head[n:] |
| 272 if not eo or not head: |
| 273 raise error.PyAsn1Error('Real exponent screwed') |
| 274 e = oct2int(eo[0]) & 0x80 and -1 or 0 |
| 275 while eo: # exponent |
| 276 e <<= 8 |
| 277 e |= oct2int(eo[0]) |
| 278 eo = eo[1:] |
| 279 b = fo >> 4 & 0x03 # base bits |
| 280 if b > 2: |
| 281 raise error.PyAsn1Error('Illegal Real base') |
| 282 if b == 1: # encbase = 8 |
| 283 e *= 3 |
| 284 elif b == 2: # encbase = 16 |
| 285 e *= 4 |
| 286 p = 0 |
| 287 while head: # value |
| 288 p <<= 8 |
| 289 p |= oct2int(head[0]) |
| 290 head = head[1:] |
| 291 if fo & 0x40: # sign bit |
| 292 p = -p |
| 293 sf = fo >> 2 & 0x03 # scale bits |
| 294 p *= 2**sf |
| 295 value = (p, 2, e) |
| 296 elif fo & 0x40: # infinite value |
| 297 value = fo & 0x01 and '-inf' or 'inf' |
| 298 elif fo & 0xc0 == 0: # character encoding |
| 299 if not head: |
| 300 raise error.PyAsn1Error("Incomplete floating-point value") |
| 301 try: |
| 302 if fo & 0x3 == 0x1: # NR1 |
| 303 value = (int(head), 10, 0) |
| 304 elif fo & 0x3 == 0x2: # NR2 |
| 305 value = float(head) |
| 306 elif fo & 0x3 == 0x3: # NR3 |
| 307 value = float(head) |
| 308 else: |
| 309 raise error.SubstrateUnderrunError( |
| 310 'Unknown NR (tag %s)' % fo |
| 311 ) |
| 312 except ValueError: |
| 313 raise error.SubstrateUnderrunError( |
| 314 'Bad character Real syntax' |
| 315 ) |
| 316 else: |
| 317 raise error.SubstrateUnderrunError( |
| 318 'Unknown encoding (tag %s)' % fo |
| 319 ) |
| 320 return self._createComponent(asn1Spec, tagSet, value), tail |
| 321 |
| 322 class SequenceDecoder(AbstractConstructedDecoder): |
| 323 protoComponent = univ.Sequence() |
| 324 def _getComponentTagMap(self, r, idx): |
| 325 try: |
| 326 return r.getComponentTagMapNearPosition(idx) |
| 327 except error.PyAsn1Error: |
| 328 return |
| 329 |
| 330 def _getComponentPositionByType(self, r, t, idx): |
| 331 return r.getComponentPositionNearType(t, idx) |
| 332 |
| 333 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 334 length, state, decodeFun, substrateFun): |
| 335 head, tail = substrate[:length], substrate[length:] |
| 336 r = self._createComponent(asn1Spec, tagSet) |
| 337 idx = 0 |
| 338 if substrateFun: |
| 339 return substrateFun(r, substrate, length) |
| 340 while head: |
| 341 asn1Spec = self._getComponentTagMap(r, idx) |
| 342 component, head = decodeFun(head, asn1Spec) |
| 343 idx = self._getComponentPositionByType( |
| 344 r, component.getEffectiveTagSet(), idx |
| 345 ) |
| 346 r.setComponentByPosition(idx, component, asn1Spec is None) |
| 347 idx = idx + 1 |
| 348 r.setDefaultComponents() |
| 349 r.verifySizeSpec() |
| 350 return r, tail |
| 351 |
| 352 def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 353 length, state, decodeFun, substrateFun): |
| 354 r = self._createComponent(asn1Spec, tagSet) |
| 355 if substrateFun: |
| 356 return substrateFun(r, substrate, length) |
| 357 idx = 0 |
| 358 while substrate: |
| 359 asn1Spec = self._getComponentTagMap(r, idx) |
| 360 component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True) |
| 361 if eoo.endOfOctets.isSameTypeWith(component) and \ |
| 362 component == eoo.endOfOctets: |
| 363 break |
| 364 idx = self._getComponentPositionByType( |
| 365 r, component.getEffectiveTagSet(), idx |
| 366 ) |
| 367 r.setComponentByPosition(idx, component, asn1Spec is None) |
| 368 idx = idx + 1 |
| 369 else: |
| 370 raise error.SubstrateUnderrunError( |
| 371 'No EOO seen before substrate ends' |
| 372 ) |
| 373 r.setDefaultComponents() |
| 374 r.verifySizeSpec() |
| 375 return r, substrate |
| 376 |
| 377 class SequenceOfDecoder(AbstractConstructedDecoder): |
| 378 protoComponent = univ.SequenceOf() |
| 379 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 380 length, state, decodeFun, substrateFun): |
| 381 head, tail = substrate[:length], substrate[length:] |
| 382 r = self._createComponent(asn1Spec, tagSet) |
| 383 if substrateFun: |
| 384 return substrateFun(r, substrate, length) |
| 385 asn1Spec = r.getComponentType() |
| 386 idx = 0 |
| 387 while head: |
| 388 component, head = decodeFun(head, asn1Spec) |
| 389 r.setComponentByPosition(idx, component, asn1Spec is None) |
| 390 idx = idx + 1 |
| 391 r.verifySizeSpec() |
| 392 return r, tail |
| 393 |
| 394 def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 395 length, state, decodeFun, substrateFun): |
| 396 r = self._createComponent(asn1Spec, tagSet) |
| 397 if substrateFun: |
| 398 return substrateFun(r, substrate, length) |
| 399 asn1Spec = r.getComponentType() |
| 400 idx = 0 |
| 401 while substrate: |
| 402 component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True) |
| 403 if eoo.endOfOctets.isSameTypeWith(component) and \ |
| 404 component == eoo.endOfOctets: |
| 405 break |
| 406 r.setComponentByPosition(idx, component, asn1Spec is None) |
| 407 idx = idx + 1 |
| 408 else: |
| 409 raise error.SubstrateUnderrunError( |
| 410 'No EOO seen before substrate ends' |
| 411 ) |
| 412 r.verifySizeSpec() |
| 413 return r, substrate |
| 414 |
| 415 class SetDecoder(SequenceDecoder): |
| 416 protoComponent = univ.Set() |
| 417 def _getComponentTagMap(self, r, idx): |
| 418 return r.getComponentTagMap() |
| 419 |
| 420 def _getComponentPositionByType(self, r, t, idx): |
| 421 nextIdx = r.getComponentPositionByType(t) |
| 422 if nextIdx is None: |
| 423 return idx |
| 424 else: |
| 425 return nextIdx |
| 426 |
| 427 class SetOfDecoder(SequenceOfDecoder): |
| 428 protoComponent = univ.SetOf() |
| 429 |
| 430 class ChoiceDecoder(AbstractConstructedDecoder): |
| 431 protoComponent = univ.Choice() |
| 432 tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed) |
| 433 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 434 length, state, decodeFun, substrateFun): |
| 435 head, tail = substrate[:length], substrate[length:] |
| 436 r = self._createComponent(asn1Spec, tagSet) |
| 437 if substrateFun: |
| 438 return substrateFun(r, substrate, length) |
| 439 if r.getTagSet() == tagSet: # explicitly tagged Choice |
| 440 component, head = decodeFun( |
| 441 head, r.getComponentTagMap() |
| 442 ) |
| 443 else: |
| 444 component, head = decodeFun( |
| 445 head, r.getComponentTagMap(), tagSet, length, state |
| 446 ) |
| 447 if isinstance(component, univ.Choice): |
| 448 effectiveTagSet = component.getEffectiveTagSet() |
| 449 else: |
| 450 effectiveTagSet = component.getTagSet() |
| 451 r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None) |
| 452 return r, tail |
| 453 |
| 454 def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 455 length, state, decodeFun, substrateFun): |
| 456 r = self._createComponent(asn1Spec, tagSet) |
| 457 if substrateFun: |
| 458 return substrateFun(r, substrate, length) |
| 459 if r.getTagSet() == tagSet: # explicitly tagged Choice |
| 460 component, substrate = decodeFun(substrate, r.getComponentTagMap()) |
| 461 # eat up EOO marker |
| 462 eooMarker, substrate = decodeFun(substrate, allowEoo=True) |
| 463 if not eoo.endOfOctets.isSameTypeWith(eooMarker) or \ |
| 464 eooMarker != eoo.endOfOctets: |
| 465 raise error.PyAsn1Error('No EOO seen before substrate ends') |
| 466 else: |
| 467 component, substrate= decodeFun( |
| 468 substrate, r.getComponentTagMap(), tagSet, length, state |
| 469 ) |
| 470 if isinstance(component, univ.Choice): |
| 471 effectiveTagSet = component.getEffectiveTagSet() |
| 472 else: |
| 473 effectiveTagSet = component.getTagSet() |
| 474 r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None) |
| 475 return r, substrate |
| 476 |
| 477 class AnyDecoder(AbstractSimpleDecoder): |
| 478 protoComponent = univ.Any() |
| 479 tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed) |
| 480 def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 481 length, state, decodeFun, substrateFun): |
| 482 if asn1Spec is None or \ |
| 483 asn1Spec is not None and tagSet != asn1Spec.getTagSet(): |
| 484 # untagged Any container, recover inner header substrate |
| 485 length = length + len(fullSubstrate) - len(substrate) |
| 486 substrate = fullSubstrate |
| 487 if substrateFun: |
| 488 return substrateFun(self._createComponent(asn1Spec, tagSet), |
| 489 substrate, length) |
| 490 head, tail = substrate[:length], substrate[length:] |
| 491 return self._createComponent(asn1Spec, tagSet, value=head), tail |
| 492 |
| 493 def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
| 494 length, state, decodeFun, substrateFun): |
| 495 if asn1Spec is not None and tagSet == asn1Spec.getTagSet(): |
| 496 # tagged Any type -- consume header substrate |
| 497 header = '' |
| 498 else: |
| 499 # untagged Any, recover header substrate |
| 500 header = fullSubstrate[:-len(substrate)] |
| 501 |
| 502 r = self._createComponent(asn1Spec, tagSet, header) |
| 503 |
| 504 # Any components do not inherit initial tag |
| 505 asn1Spec = self.protoComponent |
| 506 |
| 507 if substrateFun: |
| 508 return substrateFun(r, substrate, length) |
| 509 while substrate: |
| 510 component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True) |
| 511 if eoo.endOfOctets.isSameTypeWith(component) and \ |
| 512 component == eoo.endOfOctets: |
| 513 break |
| 514 r = r + component |
| 515 else: |
| 516 raise error.SubstrateUnderrunError( |
| 517 'No EOO seen before substrate ends' |
| 518 ) |
| 519 return r, substrate |
| 520 |
| 521 # character string types |
| 522 class UTF8StringDecoder(OctetStringDecoder): |
| 523 protoComponent = char.UTF8String() |
| 524 class NumericStringDecoder(OctetStringDecoder): |
| 525 protoComponent = char.NumericString() |
| 526 class PrintableStringDecoder(OctetStringDecoder): |
| 527 protoComponent = char.PrintableString() |
| 528 class TeletexStringDecoder(OctetStringDecoder): |
| 529 protoComponent = char.TeletexString() |
| 530 class VideotexStringDecoder(OctetStringDecoder): |
| 531 protoComponent = char.VideotexString() |
| 532 class IA5StringDecoder(OctetStringDecoder): |
| 533 protoComponent = char.IA5String() |
| 534 class GraphicStringDecoder(OctetStringDecoder): |
| 535 protoComponent = char.GraphicString() |
| 536 class VisibleStringDecoder(OctetStringDecoder): |
| 537 protoComponent = char.VisibleString() |
| 538 class GeneralStringDecoder(OctetStringDecoder): |
| 539 protoComponent = char.GeneralString() |
| 540 class UniversalStringDecoder(OctetStringDecoder): |
| 541 protoComponent = char.UniversalString() |
| 542 class BMPStringDecoder(OctetStringDecoder): |
| 543 protoComponent = char.BMPString() |
| 544 |
| 545 # "useful" types |
| 546 class ObjectDescriptorDecoder(OctetStringDecoder): |
| 547 protoComponent = useful.ObjectDescriptor() |
| 548 class GeneralizedTimeDecoder(OctetStringDecoder): |
| 549 protoComponent = useful.GeneralizedTime() |
| 550 class UTCTimeDecoder(OctetStringDecoder): |
| 551 protoComponent = useful.UTCTime() |
| 552 |
| 553 tagMap = { |
| 554 univ.Integer.tagSet: IntegerDecoder(), |
| 555 univ.Boolean.tagSet: BooleanDecoder(), |
| 556 univ.BitString.tagSet: BitStringDecoder(), |
| 557 univ.OctetString.tagSet: OctetStringDecoder(), |
| 558 univ.Null.tagSet: NullDecoder(), |
| 559 univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(), |
| 560 univ.Enumerated.tagSet: IntegerDecoder(), |
| 561 univ.Real.tagSet: RealDecoder(), |
| 562 univ.Sequence.tagSet: SequenceDecoder(), # conflicts with SequenceOf |
| 563 univ.Set.tagSet: SetDecoder(), # conflicts with SetOf |
| 564 univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any |
| 565 # character string types |
| 566 char.UTF8String.tagSet: UTF8StringDecoder(), |
| 567 char.NumericString.tagSet: NumericStringDecoder(), |
| 568 char.PrintableString.tagSet: PrintableStringDecoder(), |
| 569 char.TeletexString.tagSet: TeletexStringDecoder(), |
| 570 char.VideotexString.tagSet: VideotexStringDecoder(), |
| 571 char.IA5String.tagSet: IA5StringDecoder(), |
| 572 char.GraphicString.tagSet: GraphicStringDecoder(), |
| 573 char.VisibleString.tagSet: VisibleStringDecoder(), |
| 574 char.GeneralString.tagSet: GeneralStringDecoder(), |
| 575 char.UniversalString.tagSet: UniversalStringDecoder(), |
| 576 char.BMPString.tagSet: BMPStringDecoder(), |
| 577 # useful types |
| 578 useful.ObjectDescriptor.tagSet: ObjectDescriptorDecoder(), |
| 579 useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(), |
| 580 useful.UTCTime.tagSet: UTCTimeDecoder() |
| 581 } |
| 582 |
| 583 # Type-to-codec map for ambiguous ASN.1 types |
| 584 typeMap = { |
| 585 univ.Set.typeId: SetDecoder(), |
| 586 univ.SetOf.typeId: SetOfDecoder(), |
| 587 univ.Sequence.typeId: SequenceDecoder(), |
| 588 univ.SequenceOf.typeId: SequenceOfDecoder(), |
| 589 univ.Choice.typeId: ChoiceDecoder(), |
| 590 univ.Any.typeId: AnyDecoder() |
| 591 } |
| 592 |
| 593 ( stDecodeTag, stDecodeLength, stGetValueDecoder, stGetValueDecoderByAsn1Spec, |
| 594 stGetValueDecoderByTag, stTryAsExplicitTag, stDecodeValue, |
| 595 stDumpRawValue, stErrorCondition, stStop ) = [x for x in range(10)] |
| 596 |
| 597 class Decoder: |
| 598 defaultErrorState = stErrorCondition |
| 599 # defaultErrorState = stDumpRawValue |
| 600 defaultRawDecoder = AnyDecoder() |
| 601 supportIndefLength = True |
| 602 def __init__(self, tagMap, typeMap={}): |
| 603 self.__tagMap = tagMap |
| 604 self.__typeMap = typeMap |
| 605 # Tag & TagSet objects caches |
| 606 self.__tagCache = {} |
| 607 self.__tagSetCache = {} |
| 608 |
| 609 def __call__(self, substrate, asn1Spec=None, tagSet=None, |
| 610 length=None, state=stDecodeTag, recursiveFlag=1, |
| 611 substrateFun=None, allowEoo=False): |
| 612 if debug.logger & debug.flagDecoder: |
| 613 debug.logger('decoder called at scope %s with state %d, working with
up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.
hexdump(substrate))) |
| 614 fullSubstrate = substrate |
| 615 while state != stStop: |
| 616 if state == stDecodeTag: |
| 617 if not substrate: |
| 618 raise error.SubstrateUnderrunError( |
| 619 'Short octet stream on tag decoding' |
| 620 ) |
| 621 if not isOctetsType(substrate) and \ |
| 622 not isinstance(substrate, univ.OctetString): |
| 623 raise error.PyAsn1Error('Bad octet stream type') |
| 624 # Decode tag |
| 625 firstOctet = substrate[0] |
| 626 substrate = substrate[1:] |
| 627 if firstOctet in self.__tagCache: |
| 628 lastTag = self.__tagCache[firstOctet] |
| 629 else: |
| 630 t = oct2int(firstOctet) |
| 631 # Look for end-of-octets sentinel |
| 632 if t == 0: |
| 633 if substrate and oct2int(substrate[0]) == 0: |
| 634 if allowEoo and self.supportIndefLength: |
| 635 debug.logger and debug.logger & debug.flagDecode
r and debug.logger('end-of-octets sentinel found') |
| 636 value, substrate = eoo.endOfOctets, substrate[1:
] |
| 637 state = stStop |
| 638 continue |
| 639 else: |
| 640 raise error.PyAsn1Error('Unexpected end-of-conte
nts sentinel') |
| 641 else: |
| 642 raise error.PyAsn1Error('Zero tag encountered') |
| 643 tagClass = t&0xC0 |
| 644 tagFormat = t&0x20 |
| 645 tagId = t&0x1F |
| 646 if tagId == 0x1F: |
| 647 tagId = 0 |
| 648 while 1: |
| 649 if not substrate: |
| 650 raise error.SubstrateUnderrunError( |
| 651 'Short octet stream on long tag decoding' |
| 652 ) |
| 653 t = oct2int(substrate[0]) |
| 654 tagId = tagId << 7 | (t&0x7F) |
| 655 substrate = substrate[1:] |
| 656 if not t&0x80: |
| 657 break |
| 658 lastTag = tag.Tag( |
| 659 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId |
| 660 ) |
| 661 if tagId < 31: |
| 662 # cache short tags |
| 663 self.__tagCache[firstOctet] = lastTag |
| 664 if tagSet is None: |
| 665 if firstOctet in self.__tagSetCache: |
| 666 tagSet = self.__tagSetCache[firstOctet] |
| 667 else: |
| 668 # base tag not recovered |
| 669 tagSet = tag.TagSet((), lastTag) |
| 670 if firstOctet in self.__tagCache: |
| 671 self.__tagSetCache[firstOctet] = tagSet |
| 672 else: |
| 673 tagSet = lastTag + tagSet |
| 674 state = stDecodeLength |
| 675 debug.logger and debug.logger & debug.flagDecoder and debug.logg
er('tag decoded into %s, decoding length' % tagSet) |
| 676 if state == stDecodeLength: |
| 677 # Decode length |
| 678 if not substrate: |
| 679 raise error.SubstrateUnderrunError( |
| 680 'Short octet stream on length decoding' |
| 681 ) |
| 682 firstOctet = oct2int(substrate[0]) |
| 683 if firstOctet == 128: |
| 684 size = 1 |
| 685 length = -1 |
| 686 elif firstOctet < 128: |
| 687 length, size = firstOctet, 1 |
| 688 else: |
| 689 size = firstOctet & 0x7F |
| 690 # encoded in size bytes |
| 691 length = 0 |
| 692 lengthString = substrate[1:size+1] |
| 693 # missing check on maximum size, which shouldn't be a |
| 694 # problem, we can handle more than is possible |
| 695 if len(lengthString) != size: |
| 696 raise error.SubstrateUnderrunError( |
| 697 '%s<%s at %s' % |
| 698 (size, len(lengthString), tagSet) |
| 699 ) |
| 700 for char in lengthString: |
| 701 length = (length << 8) | oct2int(char) |
| 702 size = size + 1 |
| 703 substrate = substrate[size:] |
| 704 if length != -1 and len(substrate) < length: |
| 705 raise error.SubstrateUnderrunError( |
| 706 '%d-octet short' % (length - len(substrate)) |
| 707 ) |
| 708 if length == -1 and not self.supportIndefLength: |
| 709 error.PyAsn1Error('Indefinite length encoding not supported
by this codec') |
| 710 state = stGetValueDecoder |
| 711 debug.logger and debug.logger & debug.flagDecoder and debug.logg
er('value length decoded into %d, payload substrate is: %s' % (length, debug.hex
dump(length == -1 and substrate or substrate[:length]))) |
| 712 if state == stGetValueDecoder: |
| 713 if asn1Spec is None: |
| 714 state = stGetValueDecoderByTag |
| 715 else: |
| 716 state = stGetValueDecoderByAsn1Spec |
| 717 # |
| 718 # There're two ways of creating subtypes in ASN.1 what influences |
| 719 # decoder operation. These methods are: |
| 720 # 1) Either base types used in or no IMPLICIT tagging has been |
| 721 # applied on subtyping. |
| 722 # 2) Subtype syntax drops base type information (by means of |
| 723 # IMPLICIT tagging. |
| 724 # The first case allows for complete tag recovery from substrate |
| 725 # while the second one requires original ASN.1 type spec for |
| 726 # decoding. |
| 727 # |
| 728 # In either case a set of tags (tagSet) is coming from substrate |
| 729 # in an incremental, tag-by-tag fashion (this is the case of |
| 730 # EXPLICIT tag which is most basic). Outermost tag comes first |
| 731 # from the wire. |
| 732 # |
| 733 if state == stGetValueDecoderByTag: |
| 734 if tagSet in self.__tagMap: |
| 735 concreteDecoder = self.__tagMap[tagSet] |
| 736 else: |
| 737 concreteDecoder = None |
| 738 if concreteDecoder: |
| 739 state = stDecodeValue |
| 740 else: |
| 741 _k = tagSet[:1] |
| 742 if _k in self.__tagMap: |
| 743 concreteDecoder = self.__tagMap[_k] |
| 744 else: |
| 745 concreteDecoder = None |
| 746 if concreteDecoder: |
| 747 state = stDecodeValue |
| 748 else: |
| 749 state = stTryAsExplicitTag |
| 750 if debug.logger and debug.logger & debug.flagDecoder: |
| 751 debug.logger('codec %s chosen by a built-in type, decoding %
s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state
== stDecodeValue and 'value' or 'as explicit tag')) |
| 752 debug.scope.push(concreteDecoder is None and '?' or concrete
Decoder.protoComponent.__class__.__name__) |
| 753 if state == stGetValueDecoderByAsn1Spec: |
| 754 if isinstance(asn1Spec, (dict, tagmap.TagMap)): |
| 755 if tagSet in asn1Spec: |
| 756 __chosenSpec = asn1Spec[tagSet] |
| 757 else: |
| 758 __chosenSpec = None |
| 759 if debug.logger and debug.logger & debug.flagDecoder: |
| 760 debug.logger('candidate ASN.1 spec is a map of:') |
| 761 for t, v in asn1Spec.getPosMap().items(): |
| 762 debug.logger(' %s -> %s' % (t, v.__class__.__name__
)) |
| 763 if asn1Spec.getNegMap(): |
| 764 debug.logger('but neither of: ') |
| 765 for t, v in asn1Spec.getNegMap().items(): |
| 766 debug.logger(' %s -> %s' % (t, v.__class__.__na
me__)) |
| 767 debug.logger('new candidate ASN.1 spec is %s, chosen by
%s' % (__chosenSpec is None and '<none>' or __chosenSpec.prettyPrintType(), tagS
et)) |
| 768 else: |
| 769 __chosenSpec = asn1Spec |
| 770 debug.logger and debug.logger & debug.flagDecoder and debug.
logger('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) |
| 771 if __chosenSpec is not None and ( |
| 772 tagSet == __chosenSpec.getTagSet() or \ |
| 773 tagSet in __chosenSpec.getTagMap() |
| 774 ): |
| 775 # use base type for codec lookup to recover untagged types |
| 776 baseTagSet = __chosenSpec.baseTagSet |
| 777 if __chosenSpec.typeId is not None and \ |
| 778 __chosenSpec.typeId in self.__typeMap: |
| 779 # ambiguous type |
| 780 concreteDecoder = self.__typeMap[__chosenSpec.typeId] |
| 781 debug.logger and debug.logger & debug.flagDecoder and de
bug.logger('value decoder chosen for an ambiguous type by type ID %s' % (__chose
nSpec.typeId,)) |
| 782 elif baseTagSet in self.__tagMap: |
| 783 # base type or tagged subtype |
| 784 concreteDecoder = self.__tagMap[baseTagSet] |
| 785 debug.logger and debug.logger & debug.flagDecoder and de
bug.logger('value decoder chosen by base %s' % (baseTagSet,)) |
| 786 else: |
| 787 concreteDecoder = None |
| 788 if concreteDecoder: |
| 789 asn1Spec = __chosenSpec |
| 790 state = stDecodeValue |
| 791 else: |
| 792 state = stTryAsExplicitTag |
| 793 else: |
| 794 concreteDecoder = None |
| 795 state = stTryAsExplicitTag |
| 796 if debug.logger and debug.logger & debug.flagDecoder: |
| 797 debug.logger('codec %s chosen by ASN.1 spec, decoding %s' %
(state == stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", stat
e == stDecodeValue and 'value' or 'as explicit tag')) |
| 798 debug.scope.push(__chosenSpec is None and '?' or __chosenSpe
c.__class__.__name__) |
| 799 if state == stTryAsExplicitTag: |
| 800 if tagSet and \ |
| 801 tagSet[0][1] == tag.tagFormatConstructed and \ |
| 802 tagSet[0][0] != tag.tagClassUniversal: |
| 803 # Assume explicit tagging |
| 804 concreteDecoder = explicitTagDecoder |
| 805 state = stDecodeValue |
| 806 else: |
| 807 concreteDecoder = None |
| 808 state = self.defaultErrorState |
| 809 debug.logger and debug.logger & debug.flagDecoder and debug.logg
er('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class
__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as failure')) |
| 810 if state == stDumpRawValue: |
| 811 concreteDecoder = self.defaultRawDecoder |
| 812 debug.logger and debug.logger & debug.flagDecoder and debug.logg
er('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) |
| 813 state = stDecodeValue |
| 814 if state == stDecodeValue: |
| 815 if recursiveFlag == 0 and not substrateFun: # legacy |
| 816 substrateFun = lambda a,b,c: (a,b[:c]) |
| 817 if length == -1: # indef length |
| 818 value, substrate = concreteDecoder.indefLenValueDecoder( |
| 819 fullSubstrate, substrate, asn1Spec, tagSet, length, |
| 820 stGetValueDecoder, self, substrateFun |
| 821 ) |
| 822 else: |
| 823 value, substrate = concreteDecoder.valueDecoder( |
| 824 fullSubstrate, substrate, asn1Spec, tagSet, length, |
| 825 stGetValueDecoder, self, substrateFun |
| 826 ) |
| 827 state = stStop |
| 828 debug.logger and debug.logger & debug.flagDecoder and debug.logg
er('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concr
eteDecoder.__class__.__name__, value.__class__.__name__, value.prettyPrint(), su
bstrate and debug.hexdump(substrate) or '<none>')) |
| 829 if state == stErrorCondition: |
| 830 raise error.PyAsn1Error( |
| 831 '%s not in asn1Spec: %s' % (tagSet, asn1Spec) |
| 832 ) |
| 833 if debug.logger and debug.logger & debug.flagDecoder: |
| 834 debug.scope.pop() |
| 835 debug.logger('decoder left scope %s, call completed' % debug.scope) |
| 836 return value, substrate |
| 837 |
| 838 decode = Decoder(tagMap, typeMap) |
| 839 |
| 840 # XXX |
| 841 # non-recursive decoding; return position rather than substrate |
OLD | NEW |