Index: tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py |
diff --git a/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py b/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py |
deleted file mode 100644 |
index be0cf49074d5d6bdfc84dbfc1cdeb321f3a3d3da..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py |
+++ /dev/null |
@@ -1,808 +0,0 @@ |
-# BER decoder |
-from pyasn1.type import tag, base, univ, char, useful, tagmap |
-from pyasn1.codec.ber import eoo |
-from pyasn1.compat.octets import oct2int, octs2ints, isOctetsType |
-from pyasn1 import debug, error |
- |
-class AbstractDecoder: |
- protoComponent = None |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,)) |
- |
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) |
- |
-class AbstractSimpleDecoder(AbstractDecoder): |
- tagFormats = (tag.tagFormatSimple,) |
- def _createComponent(self, asn1Spec, tagSet, value=None): |
- if tagSet[0][1] not in self.tagFormats: |
- raise error.PyAsn1Error('Invalid tag format %r for %r' % (tagSet[0], self.protoComponent,)) |
- if asn1Spec is None: |
- return self.protoComponent.clone(value, tagSet) |
- elif value is None: |
- return asn1Spec |
- else: |
- return asn1Spec.clone(value) |
- |
-class AbstractConstructedDecoder(AbstractDecoder): |
- tagFormats = (tag.tagFormatConstructed,) |
- def _createComponent(self, asn1Spec, tagSet, value=None): |
- if tagSet[0][1] not in self.tagFormats: |
- raise error.PyAsn1Error('Invalid tag format %r for %r' % (tagSet[0], self.protoComponent,)) |
- if asn1Spec is None: |
- return self.protoComponent.clone(tagSet) |
- else: |
- return asn1Spec.clone() |
- |
-class EndOfOctetsDecoder(AbstractSimpleDecoder): |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- return eoo.endOfOctets, substrate[length:] |
- |
-class ExplicitTagDecoder(AbstractSimpleDecoder): |
- protoComponent = univ.Any('') |
- tagFormats = (tag.tagFormatConstructed,) |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- if substrateFun: |
- return substrateFun( |
- self._createComponent(asn1Spec, tagSet, ''), |
- substrate, length |
- ) |
- head, tail = substrate[:length], substrate[length:] |
- value, _ = decodeFun(head, asn1Spec, tagSet, length) |
- return value, tail |
- |
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- if substrateFun: |
- return substrateFun( |
- self._createComponent(asn1Spec, tagSet, ''), |
- substrate, length |
- ) |
- value, substrate = decodeFun(substrate, asn1Spec, tagSet, length) |
- terminator, substrate = decodeFun(substrate) |
- if eoo.endOfOctets.isSameTypeWith(terminator) and \ |
- terminator == eoo.endOfOctets: |
- return value, substrate |
- else: |
- raise error.PyAsn1Error('Missing end-of-octets terminator') |
- |
-explicitTagDecoder = ExplicitTagDecoder() |
- |
-class IntegerDecoder(AbstractSimpleDecoder): |
- protoComponent = univ.Integer(0) |
- precomputedValues = { |
- '\x00': 0, |
- '\x01': 1, |
- '\x02': 2, |
- '\x03': 3, |
- '\x04': 4, |
- '\x05': 5, |
- '\x06': 6, |
- '\x07': 7, |
- '\x08': 8, |
- '\x09': 9, |
- '\xff': -1, |
- '\xfe': -2, |
- '\xfd': -3, |
- '\xfc': -4, |
- '\xfb': -5 |
- } |
- |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
- state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- if not head: |
- return self._createComponent(asn1Spec, tagSet, 0), tail |
- if head in self.precomputedValues: |
- value = self.precomputedValues[head] |
- else: |
- firstOctet = oct2int(head[0]) |
- if firstOctet & 0x80: |
- value = -1 |
- else: |
- value = 0 |
- for octet in head: |
- value = value << 8 | oct2int(octet) |
- return self._createComponent(asn1Spec, tagSet, value), tail |
- |
-class BooleanDecoder(IntegerDecoder): |
- protoComponent = univ.Boolean(0) |
- def _createComponent(self, asn1Spec, tagSet, value=None): |
- return IntegerDecoder._createComponent(self, asn1Spec, tagSet, value and 1 or 0) |
- |
-class BitStringDecoder(AbstractSimpleDecoder): |
- protoComponent = univ.BitString(()) |
- tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed) |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
- state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? |
- if not head: |
- raise error.PyAsn1Error('Empty substrate') |
- trailingBits = oct2int(head[0]) |
- if trailingBits > 7: |
- raise error.PyAsn1Error( |
- 'Trailing bits overflow %s' % trailingBits |
- ) |
- head = head[1:] |
- lsb = p = 0; l = len(head)-1; b = () |
- while p <= l: |
- if p == l: |
- lsb = trailingBits |
- j = 7 |
- o = oct2int(head[p]) |
- while j >= lsb: |
- b = b + ((o>>j)&0x01,) |
- j = j - 1 |
- p = p + 1 |
- return self._createComponent(asn1Spec, tagSet, b), tail |
- r = self._createComponent(asn1Spec, tagSet, ()) |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- while head: |
- component, head = decodeFun(head) |
- r = r + component |
- return r, tail |
- |
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- r = self._createComponent(asn1Spec, tagSet, '') |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- while substrate: |
- component, substrate = decodeFun(substrate) |
- if eoo.endOfOctets.isSameTypeWith(component) and \ |
- component == eoo.endOfOctets: |
- break |
- r = r + component |
- else: |
- raise error.SubstrateUnderrunError( |
- 'No EOO seen before substrate ends' |
- ) |
- return r, substrate |
- |
-class OctetStringDecoder(AbstractSimpleDecoder): |
- protoComponent = univ.OctetString('') |
- tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed) |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
- state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? |
- return self._createComponent(asn1Spec, tagSet, head), tail |
- r = self._createComponent(asn1Spec, tagSet, '') |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- while head: |
- component, head = decodeFun(head) |
- r = r + component |
- return r, tail |
- |
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- r = self._createComponent(asn1Spec, tagSet, '') |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- while substrate: |
- component, substrate = decodeFun(substrate) |
- if eoo.endOfOctets.isSameTypeWith(component) and \ |
- component == eoo.endOfOctets: |
- break |
- r = r + component |
- else: |
- raise error.SubstrateUnderrunError( |
- 'No EOO seen before substrate ends' |
- ) |
- return r, substrate |
- |
-class NullDecoder(AbstractSimpleDecoder): |
- protoComponent = univ.Null('') |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- r = self._createComponent(asn1Spec, tagSet) |
- if head: |
- raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) |
- return r, tail |
- |
-class ObjectIdentifierDecoder(AbstractSimpleDecoder): |
- protoComponent = univ.ObjectIdentifier(()) |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
- state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- if not head: |
- raise error.PyAsn1Error('Empty substrate') |
- |
- # Get the first subid |
- subId = oct2int(head[0]) |
- oid = divmod(subId, 40) |
- |
- index = 1 |
- substrateLen = len(head) |
- while index < substrateLen: |
- subId = oct2int(head[index]) |
- index = index + 1 |
- if subId == 128: |
- # ASN.1 spec forbids leading zeros (0x80) in sub-ID OID |
- # encoding, tolerating it opens a vulnerability. |
- # See http://www.cosic.esat.kuleuven.be/publications/article-1432.pdf page 7 |
- raise error.PyAsn1Error('Invalid leading 0x80 in sub-OID') |
- elif subId > 128: |
- # Construct subid from a number of octets |
- nextSubId = subId |
- subId = 0 |
- while nextSubId >= 128: |
- subId = (subId << 7) + (nextSubId & 0x7F) |
- if index >= substrateLen: |
- raise error.SubstrateUnderrunError( |
- 'Short substrate for sub-OID past %s' % (oid,) |
- ) |
- nextSubId = oct2int(head[index]) |
- index = index + 1 |
- subId = (subId << 7) + nextSubId |
- oid = oid + (subId,) |
- return self._createComponent(asn1Spec, tagSet, oid), tail |
- |
-class RealDecoder(AbstractSimpleDecoder): |
- protoComponent = univ.Real() |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- if not head: |
- return self._createComponent(asn1Spec, tagSet, 0.0), tail |
- fo = oct2int(head[0]); head = head[1:] |
- if fo & 0x80: # binary enoding |
- n = (fo & 0x03) + 1 |
- if n == 4: |
- n = oct2int(head[0]) |
- eo, head = head[:n], head[n:] |
- if not eo or not head: |
- raise error.PyAsn1Error('Real exponent screwed') |
- e = oct2int(eo[0]) & 0x80 and -1 or 0 |
- while eo: # exponent |
- e <<= 8 |
- e |= oct2int(eo[0]) |
- eo = eo[1:] |
- p = 0 |
- while head: # value |
- p <<= 8 |
- p |= oct2int(head[0]) |
- head = head[1:] |
- if fo & 0x40: # sign bit |
- p = -p |
- value = (p, 2, e) |
- elif fo & 0x40: # infinite value |
- value = fo & 0x01 and '-inf' or 'inf' |
- elif fo & 0xc0 == 0: # character encoding |
- try: |
- if fo & 0x3 == 0x1: # NR1 |
- value = (int(head), 10, 0) |
- elif fo & 0x3 == 0x2: # NR2 |
- value = float(head) |
- elif fo & 0x3 == 0x3: # NR3 |
- value = float(head) |
- else: |
- raise error.SubstrateUnderrunError( |
- 'Unknown NR (tag %s)' % fo |
- ) |
- except ValueError: |
- raise error.SubstrateUnderrunError( |
- 'Bad character Real syntax' |
- ) |
- else: |
- raise error.SubstrateUnderrunError( |
- 'Unknown encoding (tag %s)' % fo |
- ) |
- return self._createComponent(asn1Spec, tagSet, value), tail |
- |
-class SequenceDecoder(AbstractConstructedDecoder): |
- protoComponent = univ.Sequence() |
- def _getComponentTagMap(self, r, idx): |
- try: |
- return r.getComponentTagMapNearPosition(idx) |
- except error.PyAsn1Error: |
- return |
- |
- def _getComponentPositionByType(self, r, t, idx): |
- return r.getComponentPositionNearType(t, idx) |
- |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- r = self._createComponent(asn1Spec, tagSet) |
- idx = 0 |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- while head: |
- asn1Spec = self._getComponentTagMap(r, idx) |
- component, head = decodeFun(head, asn1Spec) |
- idx = self._getComponentPositionByType( |
- r, component.getEffectiveTagSet(), idx |
- ) |
- r.setComponentByPosition(idx, component, asn1Spec is None) |
- idx = idx + 1 |
- r.setDefaultComponents() |
- r.verifySizeSpec() |
- return r, tail |
- |
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- r = self._createComponent(asn1Spec, tagSet) |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- idx = 0 |
- while substrate: |
- asn1Spec = self._getComponentTagMap(r, idx) |
- component, substrate = decodeFun(substrate, asn1Spec) |
- if eoo.endOfOctets.isSameTypeWith(component) and \ |
- component == eoo.endOfOctets: |
- break |
- idx = self._getComponentPositionByType( |
- r, component.getEffectiveTagSet(), idx |
- ) |
- r.setComponentByPosition(idx, component, asn1Spec is None) |
- idx = idx + 1 |
- else: |
- raise error.SubstrateUnderrunError( |
- 'No EOO seen before substrate ends' |
- ) |
- r.setDefaultComponents() |
- r.verifySizeSpec() |
- return r, substrate |
- |
-class SequenceOfDecoder(AbstractConstructedDecoder): |
- protoComponent = univ.SequenceOf() |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- r = self._createComponent(asn1Spec, tagSet) |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- asn1Spec = r.getComponentType() |
- idx = 0 |
- while head: |
- component, head = decodeFun(head, asn1Spec) |
- r.setComponentByPosition(idx, component, asn1Spec is None) |
- idx = idx + 1 |
- r.verifySizeSpec() |
- return r, tail |
- |
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- r = self._createComponent(asn1Spec, tagSet) |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- asn1Spec = r.getComponentType() |
- idx = 0 |
- while substrate: |
- component, substrate = decodeFun(substrate, asn1Spec) |
- if eoo.endOfOctets.isSameTypeWith(component) and \ |
- component == eoo.endOfOctets: |
- break |
- r.setComponentByPosition(idx, component, asn1Spec is None) |
- idx = idx + 1 |
- else: |
- raise error.SubstrateUnderrunError( |
- 'No EOO seen before substrate ends' |
- ) |
- r.verifySizeSpec() |
- return r, substrate |
- |
-class SetDecoder(SequenceDecoder): |
- protoComponent = univ.Set() |
- def _getComponentTagMap(self, r, idx): |
- return r.getComponentTagMap() |
- |
- def _getComponentPositionByType(self, r, t, idx): |
- nextIdx = r.getComponentPositionByType(t) |
- if nextIdx is None: |
- return idx |
- else: |
- return nextIdx |
- |
-class SetOfDecoder(SequenceOfDecoder): |
- protoComponent = univ.SetOf() |
- |
-class ChoiceDecoder(AbstractConstructedDecoder): |
- protoComponent = univ.Choice() |
- tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed) |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- head, tail = substrate[:length], substrate[length:] |
- r = self._createComponent(asn1Spec, tagSet) |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- if r.getTagSet() == tagSet: # explicitly tagged Choice |
- component, head = decodeFun( |
- head, r.getComponentTagMap() |
- ) |
- else: |
- component, head = decodeFun( |
- head, r.getComponentTagMap(), tagSet, length, state |
- ) |
- if isinstance(component, univ.Choice): |
- effectiveTagSet = component.getEffectiveTagSet() |
- else: |
- effectiveTagSet = component.getTagSet() |
- r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None) |
- return r, tail |
- |
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- r = self._createComponent(asn1Spec, tagSet) |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- if r.getTagSet() == tagSet: # explicitly tagged Choice |
- component, substrate = decodeFun(substrate, r.getComponentTagMap()) |
- eooMarker, substrate = decodeFun(substrate) # eat up EOO marker |
- if not eoo.endOfOctets.isSameTypeWith(eooMarker) or \ |
- eooMarker != eoo.endOfOctets: |
- raise error.PyAsn1Error('No EOO seen before substrate ends') |
- else: |
- component, substrate= decodeFun( |
- substrate, r.getComponentTagMap(), tagSet, length, state |
- ) |
- if isinstance(component, univ.Choice): |
- effectiveTagSet = component.getEffectiveTagSet() |
- else: |
- effectiveTagSet = component.getTagSet() |
- r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None) |
- return r, substrate |
- |
-class AnyDecoder(AbstractSimpleDecoder): |
- protoComponent = univ.Any() |
- tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed) |
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- if asn1Spec is None or \ |
- asn1Spec is not None and tagSet != asn1Spec.getTagSet(): |
- # untagged Any container, recover inner header substrate |
- length = length + len(fullSubstrate) - len(substrate) |
- substrate = fullSubstrate |
- if substrateFun: |
- return substrateFun(self._createComponent(asn1Spec, tagSet), |
- substrate, length) |
- head, tail = substrate[:length], substrate[length:] |
- return self._createComponent(asn1Spec, tagSet, value=head), tail |
- |
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
- length, state, decodeFun, substrateFun): |
- if asn1Spec is not None and tagSet == asn1Spec.getTagSet(): |
- # tagged Any type -- consume header substrate |
- header = '' |
- else: |
- # untagged Any, recover header substrate |
- header = fullSubstrate[:-len(substrate)] |
- |
- r = self._createComponent(asn1Spec, tagSet, header) |
- |
- # Any components do not inherit initial tag |
- asn1Spec = self.protoComponent |
- |
- if substrateFun: |
- return substrateFun(r, substrate, length) |
- while substrate: |
- component, substrate = decodeFun(substrate, asn1Spec) |
- if eoo.endOfOctets.isSameTypeWith(component) and \ |
- component == eoo.endOfOctets: |
- break |
- r = r + component |
- else: |
- raise error.SubstrateUnderrunError( |
- 'No EOO seen before substrate ends' |
- ) |
- return r, substrate |
- |
-# character string types |
-class UTF8StringDecoder(OctetStringDecoder): |
- protoComponent = char.UTF8String() |
-class NumericStringDecoder(OctetStringDecoder): |
- protoComponent = char.NumericString() |
-class PrintableStringDecoder(OctetStringDecoder): |
- protoComponent = char.PrintableString() |
-class TeletexStringDecoder(OctetStringDecoder): |
- protoComponent = char.TeletexString() |
-class VideotexStringDecoder(OctetStringDecoder): |
- protoComponent = char.VideotexString() |
-class IA5StringDecoder(OctetStringDecoder): |
- protoComponent = char.IA5String() |
-class GraphicStringDecoder(OctetStringDecoder): |
- protoComponent = char.GraphicString() |
-class VisibleStringDecoder(OctetStringDecoder): |
- protoComponent = char.VisibleString() |
-class GeneralStringDecoder(OctetStringDecoder): |
- protoComponent = char.GeneralString() |
-class UniversalStringDecoder(OctetStringDecoder): |
- protoComponent = char.UniversalString() |
-class BMPStringDecoder(OctetStringDecoder): |
- protoComponent = char.BMPString() |
- |
-# "useful" types |
-class GeneralizedTimeDecoder(OctetStringDecoder): |
- protoComponent = useful.GeneralizedTime() |
-class UTCTimeDecoder(OctetStringDecoder): |
- protoComponent = useful.UTCTime() |
- |
-tagMap = { |
- eoo.endOfOctets.tagSet: EndOfOctetsDecoder(), |
- univ.Integer.tagSet: IntegerDecoder(), |
- univ.Boolean.tagSet: BooleanDecoder(), |
- univ.BitString.tagSet: BitStringDecoder(), |
- univ.OctetString.tagSet: OctetStringDecoder(), |
- univ.Null.tagSet: NullDecoder(), |
- univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(), |
- univ.Enumerated.tagSet: IntegerDecoder(), |
- univ.Real.tagSet: RealDecoder(), |
- univ.Sequence.tagSet: SequenceDecoder(), # conflicts with SequenceOf |
- univ.Set.tagSet: SetDecoder(), # conflicts with SetOf |
- univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any |
- # character string types |
- char.UTF8String.tagSet: UTF8StringDecoder(), |
- char.NumericString.tagSet: NumericStringDecoder(), |
- char.PrintableString.tagSet: PrintableStringDecoder(), |
- char.TeletexString.tagSet: TeletexStringDecoder(), |
- char.VideotexString.tagSet: VideotexStringDecoder(), |
- char.IA5String.tagSet: IA5StringDecoder(), |
- char.GraphicString.tagSet: GraphicStringDecoder(), |
- char.VisibleString.tagSet: VisibleStringDecoder(), |
- char.GeneralString.tagSet: GeneralStringDecoder(), |
- char.UniversalString.tagSet: UniversalStringDecoder(), |
- char.BMPString.tagSet: BMPStringDecoder(), |
- # useful types |
- useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(), |
- useful.UTCTime.tagSet: UTCTimeDecoder() |
- } |
- |
-# Type-to-codec map for ambiguous ASN.1 types |
-typeMap = { |
- univ.Set.typeId: SetDecoder(), |
- univ.SetOf.typeId: SetOfDecoder(), |
- univ.Sequence.typeId: SequenceDecoder(), |
- univ.SequenceOf.typeId: SequenceOfDecoder(), |
- univ.Choice.typeId: ChoiceDecoder(), |
- univ.Any.typeId: AnyDecoder() |
- } |
- |
-( stDecodeTag, stDecodeLength, stGetValueDecoder, stGetValueDecoderByAsn1Spec, |
- stGetValueDecoderByTag, stTryAsExplicitTag, stDecodeValue, |
- stDumpRawValue, stErrorCondition, stStop ) = [x for x in range(10)] |
- |
-class Decoder: |
- defaultErrorState = stErrorCondition |
-# defaultErrorState = stDumpRawValue |
- defaultRawDecoder = AnyDecoder() |
- def __init__(self, tagMap, typeMap={}): |
- self.__tagMap = tagMap |
- self.__typeMap = typeMap |
- self.__endOfOctetsTagSet = eoo.endOfOctets.getTagSet() |
- # Tag & TagSet objects caches |
- self.__tagCache = {} |
- self.__tagSetCache = {} |
- |
- def __call__(self, substrate, asn1Spec=None, tagSet=None, |
- length=None, state=stDecodeTag, recursiveFlag=1, |
- substrateFun=None): |
- if debug.logger & debug.flagDecoder: |
- debug.logger('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate))) |
- fullSubstrate = substrate |
- while state != stStop: |
- if state == stDecodeTag: |
- # Decode tag |
- if not substrate: |
- raise error.SubstrateUnderrunError( |
- 'Short octet stream on tag decoding' |
- ) |
- if not isOctetsType(substrate) and \ |
- not isinstance(substrate, univ.OctetString): |
- raise error.PyAsn1Error('Bad octet stream type') |
- |
- firstOctet = substrate[0] |
- substrate = substrate[1:] |
- if firstOctet in self.__tagCache: |
- lastTag = self.__tagCache[firstOctet] |
- else: |
- t = oct2int(firstOctet) |
- tagClass = t&0xC0 |
- tagFormat = t&0x20 |
- tagId = t&0x1F |
- if tagId == 0x1F: |
- tagId = 0 |
- while 1: |
- if not substrate: |
- raise error.SubstrateUnderrunError( |
- 'Short octet stream on long tag decoding' |
- ) |
- t = oct2int(substrate[0]) |
- tagId = tagId << 7 | (t&0x7F) |
- substrate = substrate[1:] |
- if not t&0x80: |
- break |
- lastTag = tag.Tag( |
- tagClass=tagClass, tagFormat=tagFormat, tagId=tagId |
- ) |
- if tagId < 31: |
- # cache short tags |
- self.__tagCache[firstOctet] = lastTag |
- if tagSet is None: |
- if firstOctet in self.__tagSetCache: |
- tagSet = self.__tagSetCache[firstOctet] |
- else: |
- # base tag not recovered |
- tagSet = tag.TagSet((), lastTag) |
- if firstOctet in self.__tagCache: |
- self.__tagSetCache[firstOctet] = tagSet |
- else: |
- tagSet = lastTag + tagSet |
- state = stDecodeLength |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('tag decoded into %r, decoding length' % tagSet) |
- if state == stDecodeLength: |
- # Decode length |
- if not substrate: |
- raise error.SubstrateUnderrunError( |
- 'Short octet stream on length decoding' |
- ) |
- firstOctet = oct2int(substrate[0]) |
- if firstOctet == 128: |
- size = 1 |
- length = -1 |
- elif firstOctet < 128: |
- length, size = firstOctet, 1 |
- else: |
- size = firstOctet & 0x7F |
- # encoded in size bytes |
- length = 0 |
- lengthString = substrate[1:size+1] |
- # missing check on maximum size, which shouldn't be a |
- # problem, we can handle more than is possible |
- if len(lengthString) != size: |
- raise error.SubstrateUnderrunError( |
- '%s<%s at %s' % |
- (size, len(lengthString), tagSet) |
- ) |
- for char in lengthString: |
- length = (length << 8) | oct2int(char) |
- size = size + 1 |
- substrate = substrate[size:] |
- if length != -1 and len(substrate) < length: |
- raise error.SubstrateUnderrunError( |
- '%d-octet short' % (length - len(substrate)) |
- ) |
- state = stGetValueDecoder |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('value length decoded into %d, payload substrate is: %s' % (length, debug.hexdump(length == -1 and substrate or substrate[:length]))) |
- if state == stGetValueDecoder: |
- if asn1Spec is None: |
- state = stGetValueDecoderByTag |
- else: |
- state = stGetValueDecoderByAsn1Spec |
- # |
- # There're two ways of creating subtypes in ASN.1 what influences |
- # decoder operation. These methods are: |
- # 1) Either base types used in or no IMPLICIT tagging has been |
- # applied on subtyping. |
- # 2) Subtype syntax drops base type information (by means of |
- # IMPLICIT tagging. |
- # The first case allows for complete tag recovery from substrate |
- # while the second one requires original ASN.1 type spec for |
- # decoding. |
- # |
- # In either case a set of tags (tagSet) is coming from substrate |
- # in an incremental, tag-by-tag fashion (this is the case of |
- # EXPLICIT tag which is most basic). Outermost tag comes first |
- # from the wire. |
- # |
- if state == stGetValueDecoderByTag: |
- if tagSet in self.__tagMap: |
- concreteDecoder = self.__tagMap[tagSet] |
- else: |
- concreteDecoder = None |
- if concreteDecoder: |
- state = stDecodeValue |
- else: |
- _k = tagSet[:1] |
- if _k in self.__tagMap: |
- concreteDecoder = self.__tagMap[_k] |
- else: |
- concreteDecoder = None |
- if concreteDecoder: |
- state = stDecodeValue |
- else: |
- state = stTryAsExplicitTag |
- if debug.logger and debug.logger & debug.flagDecoder: |
- debug.logger('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as explicit tag')) |
- debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) |
- if state == stGetValueDecoderByAsn1Spec: |
- if isinstance(asn1Spec, (dict, tagmap.TagMap)): |
- if tagSet in asn1Spec: |
- __chosenSpec = asn1Spec[tagSet] |
- else: |
- __chosenSpec = None |
- if debug.logger and debug.logger & debug.flagDecoder: |
- debug.logger('candidate ASN.1 spec is a map of:') |
- for t, v in asn1Spec.getPosMap().items(): |
- debug.logger(' %r -> %s' % (t, v.__class__.__name__)) |
- if asn1Spec.getNegMap(): |
- debug.logger('but neither of: ') |
- for i in asn1Spec.getNegMap().items(): |
- debug.logger(' %r -> %s' % (t, v.__class__.__name__)) |
- debug.logger('new candidate ASN.1 spec is %s, chosen by %r' % (__chosenSpec is None and '<none>' or __chosenSpec.__class__.__name__, tagSet)) |
- else: |
- __chosenSpec = asn1Spec |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) |
- if __chosenSpec is not None and ( |
- tagSet == __chosenSpec.getTagSet() or \ |
- tagSet in __chosenSpec.getTagMap() |
- ): |
- # use base type for codec lookup to recover untagged types |
- baseTagSet = __chosenSpec.baseTagSet |
- if __chosenSpec.typeId is not None and \ |
- __chosenSpec.typeId in self.__typeMap: |
- # ambiguous type |
- concreteDecoder = self.__typeMap[__chosenSpec.typeId] |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('value decoder chosen for an ambiguous type by type ID %s' % (__chosenSpec.typeId,)) |
- elif baseTagSet in self.__tagMap: |
- # base type or tagged subtype |
- concreteDecoder = self.__tagMap[baseTagSet] |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('value decoder chosen by base %r' % (baseTagSet,)) |
- else: |
- concreteDecoder = None |
- if concreteDecoder: |
- asn1Spec = __chosenSpec |
- state = stDecodeValue |
- else: |
- state = stTryAsExplicitTag |
- elif tagSet == self.__endOfOctetsTagSet: |
- concreteDecoder = self.__tagMap[tagSet] |
- state = stDecodeValue |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('end-of-octets found') |
- else: |
- concreteDecoder = None |
- state = stTryAsExplicitTag |
- if debug.logger and debug.logger & debug.flagDecoder: |
- debug.logger('codec %s chosen by ASN.1 spec, decoding %s' % (state == stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as explicit tag')) |
- debug.scope.push(__chosenSpec is None and '?' or __chosenSpec.__class__.__name__) |
- if state == stTryAsExplicitTag: |
- if tagSet and \ |
- tagSet[0][1] == tag.tagFormatConstructed and \ |
- tagSet[0][0] != tag.tagClassUniversal: |
- # Assume explicit tagging |
- concreteDecoder = explicitTagDecoder |
- state = stDecodeValue |
- else: |
- concreteDecoder = None |
- state = self.defaultErrorState |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as failure')) |
- if state == stDumpRawValue: |
- concreteDecoder = self.defaultRawDecoder |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) |
- state = stDecodeValue |
- if state == stDecodeValue: |
- if recursiveFlag == 0 and not substrateFun: # legacy |
- substrateFun = lambda a,b,c: (a,b[:c]) |
- if length == -1: # indef length |
- value, substrate = concreteDecoder.indefLenValueDecoder( |
- fullSubstrate, substrate, asn1Spec, tagSet, length, |
- stGetValueDecoder, self, substrateFun |
- ) |
- else: |
- value, substrate = concreteDecoder.valueDecoder( |
- fullSubstrate, substrate, asn1Spec, tagSet, length, |
- stGetValueDecoder, self, substrateFun |
- ) |
- state = stStop |
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, value.prettyPrint(), substrate and debug.hexdump(substrate) or '<none>')) |
- if state == stErrorCondition: |
- raise error.PyAsn1Error( |
- '%r not in asn1Spec: %r' % (tagSet, asn1Spec) |
- ) |
- if debug.logger and debug.logger & debug.flagDecoder: |
- debug.scope.pop() |
- debug.logger('decoder left scope %s, call completed' % debug.scope) |
- return value, substrate |
- |
-decode = Decoder(tagMap, typeMap) |
- |
-# XXX |
-# non-recursive decoding; return position rather than substrate |