Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Unified Diff: tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py

Issue 1493973002: Remove telemetry/third_party/gsutilz (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@gsutil_changes
Patch Set: rebase Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py
diff --git a/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py b/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py
deleted file mode 100644
index be0cf49074d5d6bdfc84dbfc1cdeb321f3a3d3da..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py
+++ /dev/null
@@ -1,808 +0,0 @@
-# BER decoder
-from pyasn1.type import tag, base, univ, char, useful, tagmap
-from pyasn1.codec.ber import eoo
-from pyasn1.compat.octets import oct2int, octs2ints, isOctetsType
-from pyasn1 import debug, error
-
-class AbstractDecoder:
- protoComponent = None
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,))
-
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,))
-
-class AbstractSimpleDecoder(AbstractDecoder):
- tagFormats = (tag.tagFormatSimple,)
- def _createComponent(self, asn1Spec, tagSet, value=None):
- if tagSet[0][1] not in self.tagFormats:
- raise error.PyAsn1Error('Invalid tag format %r for %r' % (tagSet[0], self.protoComponent,))
- if asn1Spec is None:
- return self.protoComponent.clone(value, tagSet)
- elif value is None:
- return asn1Spec
- else:
- return asn1Spec.clone(value)
-
-class AbstractConstructedDecoder(AbstractDecoder):
- tagFormats = (tag.tagFormatConstructed,)
- def _createComponent(self, asn1Spec, tagSet, value=None):
- if tagSet[0][1] not in self.tagFormats:
- raise error.PyAsn1Error('Invalid tag format %r for %r' % (tagSet[0], self.protoComponent,))
- if asn1Spec is None:
- return self.protoComponent.clone(tagSet)
- else:
- return asn1Spec.clone()
-
-class EndOfOctetsDecoder(AbstractSimpleDecoder):
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- return eoo.endOfOctets, substrate[length:]
-
-class ExplicitTagDecoder(AbstractSimpleDecoder):
- protoComponent = univ.Any('')
- tagFormats = (tag.tagFormatConstructed,)
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- if substrateFun:
- return substrateFun(
- self._createComponent(asn1Spec, tagSet, ''),
- substrate, length
- )
- head, tail = substrate[:length], substrate[length:]
- value, _ = decodeFun(head, asn1Spec, tagSet, length)
- return value, tail
-
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- if substrateFun:
- return substrateFun(
- self._createComponent(asn1Spec, tagSet, ''),
- substrate, length
- )
- value, substrate = decodeFun(substrate, asn1Spec, tagSet, length)
- terminator, substrate = decodeFun(substrate)
- if eoo.endOfOctets.isSameTypeWith(terminator) and \
- terminator == eoo.endOfOctets:
- return value, substrate
- else:
- raise error.PyAsn1Error('Missing end-of-octets terminator')
-
-explicitTagDecoder = ExplicitTagDecoder()
-
-class IntegerDecoder(AbstractSimpleDecoder):
- protoComponent = univ.Integer(0)
- precomputedValues = {
- '\x00': 0,
- '\x01': 1,
- '\x02': 2,
- '\x03': 3,
- '\x04': 4,
- '\x05': 5,
- '\x06': 6,
- '\x07': 7,
- '\x08': 8,
- '\x09': 9,
- '\xff': -1,
- '\xfe': -2,
- '\xfd': -3,
- '\xfc': -4,
- '\xfb': -5
- }
-
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- if not head:
- return self._createComponent(asn1Spec, tagSet, 0), tail
- if head in self.precomputedValues:
- value = self.precomputedValues[head]
- else:
- firstOctet = oct2int(head[0])
- if firstOctet & 0x80:
- value = -1
- else:
- value = 0
- for octet in head:
- value = value << 8 | oct2int(octet)
- return self._createComponent(asn1Spec, tagSet, value), tail
-
-class BooleanDecoder(IntegerDecoder):
- protoComponent = univ.Boolean(0)
- def _createComponent(self, asn1Spec, tagSet, value=None):
- return IntegerDecoder._createComponent(self, asn1Spec, tagSet, value and 1 or 0)
-
-class BitStringDecoder(AbstractSimpleDecoder):
- protoComponent = univ.BitString(())
- tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed)
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check?
- if not head:
- raise error.PyAsn1Error('Empty substrate')
- trailingBits = oct2int(head[0])
- if trailingBits > 7:
- raise error.PyAsn1Error(
- 'Trailing bits overflow %s' % trailingBits
- )
- head = head[1:]
- lsb = p = 0; l = len(head)-1; b = ()
- while p <= l:
- if p == l:
- lsb = trailingBits
- j = 7
- o = oct2int(head[p])
- while j >= lsb:
- b = b + ((o>>j)&0x01,)
- j = j - 1
- p = p + 1
- return self._createComponent(asn1Spec, tagSet, b), tail
- r = self._createComponent(asn1Spec, tagSet, ())
- if substrateFun:
- return substrateFun(r, substrate, length)
- while head:
- component, head = decodeFun(head)
- r = r + component
- return r, tail
-
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- r = self._createComponent(asn1Spec, tagSet, '')
- if substrateFun:
- return substrateFun(r, substrate, length)
- while substrate:
- component, substrate = decodeFun(substrate)
- if eoo.endOfOctets.isSameTypeWith(component) and \
- component == eoo.endOfOctets:
- break
- r = r + component
- else:
- raise error.SubstrateUnderrunError(
- 'No EOO seen before substrate ends'
- )
- return r, substrate
-
-class OctetStringDecoder(AbstractSimpleDecoder):
- protoComponent = univ.OctetString('')
- tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed)
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check?
- return self._createComponent(asn1Spec, tagSet, head), tail
- r = self._createComponent(asn1Spec, tagSet, '')
- if substrateFun:
- return substrateFun(r, substrate, length)
- while head:
- component, head = decodeFun(head)
- r = r + component
- return r, tail
-
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- r = self._createComponent(asn1Spec, tagSet, '')
- if substrateFun:
- return substrateFun(r, substrate, length)
- while substrate:
- component, substrate = decodeFun(substrate)
- if eoo.endOfOctets.isSameTypeWith(component) and \
- component == eoo.endOfOctets:
- break
- r = r + component
- else:
- raise error.SubstrateUnderrunError(
- 'No EOO seen before substrate ends'
- )
- return r, substrate
-
-class NullDecoder(AbstractSimpleDecoder):
- protoComponent = univ.Null('')
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- r = self._createComponent(asn1Spec, tagSet)
- if head:
- raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
- return r, tail
-
-class ObjectIdentifierDecoder(AbstractSimpleDecoder):
- protoComponent = univ.ObjectIdentifier(())
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- if not head:
- raise error.PyAsn1Error('Empty substrate')
-
- # Get the first subid
- subId = oct2int(head[0])
- oid = divmod(subId, 40)
-
- index = 1
- substrateLen = len(head)
- while index < substrateLen:
- subId = oct2int(head[index])
- index = index + 1
- if subId == 128:
- # ASN.1 spec forbids leading zeros (0x80) in sub-ID OID
- # encoding, tolerating it opens a vulnerability.
- # See http://www.cosic.esat.kuleuven.be/publications/article-1432.pdf page 7
- raise error.PyAsn1Error('Invalid leading 0x80 in sub-OID')
- elif subId > 128:
- # Construct subid from a number of octets
- nextSubId = subId
- subId = 0
- while nextSubId >= 128:
- subId = (subId << 7) + (nextSubId & 0x7F)
- if index >= substrateLen:
- raise error.SubstrateUnderrunError(
- 'Short substrate for sub-OID past %s' % (oid,)
- )
- nextSubId = oct2int(head[index])
- index = index + 1
- subId = (subId << 7) + nextSubId
- oid = oid + (subId,)
- return self._createComponent(asn1Spec, tagSet, oid), tail
-
-class RealDecoder(AbstractSimpleDecoder):
- protoComponent = univ.Real()
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- if not head:
- return self._createComponent(asn1Spec, tagSet, 0.0), tail
- fo = oct2int(head[0]); head = head[1:]
- if fo & 0x80: # binary enoding
- n = (fo & 0x03) + 1
- if n == 4:
- n = oct2int(head[0])
- eo, head = head[:n], head[n:]
- if not eo or not head:
- raise error.PyAsn1Error('Real exponent screwed')
- e = oct2int(eo[0]) & 0x80 and -1 or 0
- while eo: # exponent
- e <<= 8
- e |= oct2int(eo[0])
- eo = eo[1:]
- p = 0
- while head: # value
- p <<= 8
- p |= oct2int(head[0])
- head = head[1:]
- if fo & 0x40: # sign bit
- p = -p
- value = (p, 2, e)
- elif fo & 0x40: # infinite value
- value = fo & 0x01 and '-inf' or 'inf'
- elif fo & 0xc0 == 0: # character encoding
- try:
- if fo & 0x3 == 0x1: # NR1
- value = (int(head), 10, 0)
- elif fo & 0x3 == 0x2: # NR2
- value = float(head)
- elif fo & 0x3 == 0x3: # NR3
- value = float(head)
- else:
- raise error.SubstrateUnderrunError(
- 'Unknown NR (tag %s)' % fo
- )
- except ValueError:
- raise error.SubstrateUnderrunError(
- 'Bad character Real syntax'
- )
- else:
- raise error.SubstrateUnderrunError(
- 'Unknown encoding (tag %s)' % fo
- )
- return self._createComponent(asn1Spec, tagSet, value), tail
-
-class SequenceDecoder(AbstractConstructedDecoder):
- protoComponent = univ.Sequence()
- def _getComponentTagMap(self, r, idx):
- try:
- return r.getComponentTagMapNearPosition(idx)
- except error.PyAsn1Error:
- return
-
- def _getComponentPositionByType(self, r, t, idx):
- return r.getComponentPositionNearType(t, idx)
-
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- r = self._createComponent(asn1Spec, tagSet)
- idx = 0
- if substrateFun:
- return substrateFun(r, substrate, length)
- while head:
- asn1Spec = self._getComponentTagMap(r, idx)
- component, head = decodeFun(head, asn1Spec)
- idx = self._getComponentPositionByType(
- r, component.getEffectiveTagSet(), idx
- )
- r.setComponentByPosition(idx, component, asn1Spec is None)
- idx = idx + 1
- r.setDefaultComponents()
- r.verifySizeSpec()
- return r, tail
-
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- r = self._createComponent(asn1Spec, tagSet)
- if substrateFun:
- return substrateFun(r, substrate, length)
- idx = 0
- while substrate:
- asn1Spec = self._getComponentTagMap(r, idx)
- component, substrate = decodeFun(substrate, asn1Spec)
- if eoo.endOfOctets.isSameTypeWith(component) and \
- component == eoo.endOfOctets:
- break
- idx = self._getComponentPositionByType(
- r, component.getEffectiveTagSet(), idx
- )
- r.setComponentByPosition(idx, component, asn1Spec is None)
- idx = idx + 1
- else:
- raise error.SubstrateUnderrunError(
- 'No EOO seen before substrate ends'
- )
- r.setDefaultComponents()
- r.verifySizeSpec()
- return r, substrate
-
-class SequenceOfDecoder(AbstractConstructedDecoder):
- protoComponent = univ.SequenceOf()
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- r = self._createComponent(asn1Spec, tagSet)
- if substrateFun:
- return substrateFun(r, substrate, length)
- asn1Spec = r.getComponentType()
- idx = 0
- while head:
- component, head = decodeFun(head, asn1Spec)
- r.setComponentByPosition(idx, component, asn1Spec is None)
- idx = idx + 1
- r.verifySizeSpec()
- return r, tail
-
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- r = self._createComponent(asn1Spec, tagSet)
- if substrateFun:
- return substrateFun(r, substrate, length)
- asn1Spec = r.getComponentType()
- idx = 0
- while substrate:
- component, substrate = decodeFun(substrate, asn1Spec)
- if eoo.endOfOctets.isSameTypeWith(component) and \
- component == eoo.endOfOctets:
- break
- r.setComponentByPosition(idx, component, asn1Spec is None)
- idx = idx + 1
- else:
- raise error.SubstrateUnderrunError(
- 'No EOO seen before substrate ends'
- )
- r.verifySizeSpec()
- return r, substrate
-
-class SetDecoder(SequenceDecoder):
- protoComponent = univ.Set()
- def _getComponentTagMap(self, r, idx):
- return r.getComponentTagMap()
-
- def _getComponentPositionByType(self, r, t, idx):
- nextIdx = r.getComponentPositionByType(t)
- if nextIdx is None:
- return idx
- else:
- return nextIdx
-
-class SetOfDecoder(SequenceOfDecoder):
- protoComponent = univ.SetOf()
-
-class ChoiceDecoder(AbstractConstructedDecoder):
- protoComponent = univ.Choice()
- tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed)
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- head, tail = substrate[:length], substrate[length:]
- r = self._createComponent(asn1Spec, tagSet)
- if substrateFun:
- return substrateFun(r, substrate, length)
- if r.getTagSet() == tagSet: # explicitly tagged Choice
- component, head = decodeFun(
- head, r.getComponentTagMap()
- )
- else:
- component, head = decodeFun(
- head, r.getComponentTagMap(), tagSet, length, state
- )
- if isinstance(component, univ.Choice):
- effectiveTagSet = component.getEffectiveTagSet()
- else:
- effectiveTagSet = component.getTagSet()
- r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None)
- return r, tail
-
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- r = self._createComponent(asn1Spec, tagSet)
- if substrateFun:
- return substrateFun(r, substrate, length)
- if r.getTagSet() == tagSet: # explicitly tagged Choice
- component, substrate = decodeFun(substrate, r.getComponentTagMap())
- eooMarker, substrate = decodeFun(substrate) # eat up EOO marker
- if not eoo.endOfOctets.isSameTypeWith(eooMarker) or \
- eooMarker != eoo.endOfOctets:
- raise error.PyAsn1Error('No EOO seen before substrate ends')
- else:
- component, substrate= decodeFun(
- substrate, r.getComponentTagMap(), tagSet, length, state
- )
- if isinstance(component, univ.Choice):
- effectiveTagSet = component.getEffectiveTagSet()
- else:
- effectiveTagSet = component.getTagSet()
- r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None)
- return r, substrate
-
-class AnyDecoder(AbstractSimpleDecoder):
- protoComponent = univ.Any()
- tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed)
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- if asn1Spec is None or \
- asn1Spec is not None and tagSet != asn1Spec.getTagSet():
- # untagged Any container, recover inner header substrate
- length = length + len(fullSubstrate) - len(substrate)
- substrate = fullSubstrate
- if substrateFun:
- return substrateFun(self._createComponent(asn1Spec, tagSet),
- substrate, length)
- head, tail = substrate[:length], substrate[length:]
- return self._createComponent(asn1Spec, tagSet, value=head), tail
-
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
- if asn1Spec is not None and tagSet == asn1Spec.getTagSet():
- # tagged Any type -- consume header substrate
- header = ''
- else:
- # untagged Any, recover header substrate
- header = fullSubstrate[:-len(substrate)]
-
- r = self._createComponent(asn1Spec, tagSet, header)
-
- # Any components do not inherit initial tag
- asn1Spec = self.protoComponent
-
- if substrateFun:
- return substrateFun(r, substrate, length)
- while substrate:
- component, substrate = decodeFun(substrate, asn1Spec)
- if eoo.endOfOctets.isSameTypeWith(component) and \
- component == eoo.endOfOctets:
- break
- r = r + component
- else:
- raise error.SubstrateUnderrunError(
- 'No EOO seen before substrate ends'
- )
- return r, substrate
-
-# character string types
-class UTF8StringDecoder(OctetStringDecoder):
- protoComponent = char.UTF8String()
-class NumericStringDecoder(OctetStringDecoder):
- protoComponent = char.NumericString()
-class PrintableStringDecoder(OctetStringDecoder):
- protoComponent = char.PrintableString()
-class TeletexStringDecoder(OctetStringDecoder):
- protoComponent = char.TeletexString()
-class VideotexStringDecoder(OctetStringDecoder):
- protoComponent = char.VideotexString()
-class IA5StringDecoder(OctetStringDecoder):
- protoComponent = char.IA5String()
-class GraphicStringDecoder(OctetStringDecoder):
- protoComponent = char.GraphicString()
-class VisibleStringDecoder(OctetStringDecoder):
- protoComponent = char.VisibleString()
-class GeneralStringDecoder(OctetStringDecoder):
- protoComponent = char.GeneralString()
-class UniversalStringDecoder(OctetStringDecoder):
- protoComponent = char.UniversalString()
-class BMPStringDecoder(OctetStringDecoder):
- protoComponent = char.BMPString()
-
-# "useful" types
-class GeneralizedTimeDecoder(OctetStringDecoder):
- protoComponent = useful.GeneralizedTime()
-class UTCTimeDecoder(OctetStringDecoder):
- protoComponent = useful.UTCTime()
-
-tagMap = {
- eoo.endOfOctets.tagSet: EndOfOctetsDecoder(),
- univ.Integer.tagSet: IntegerDecoder(),
- univ.Boolean.tagSet: BooleanDecoder(),
- univ.BitString.tagSet: BitStringDecoder(),
- univ.OctetString.tagSet: OctetStringDecoder(),
- univ.Null.tagSet: NullDecoder(),
- univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(),
- univ.Enumerated.tagSet: IntegerDecoder(),
- univ.Real.tagSet: RealDecoder(),
- univ.Sequence.tagSet: SequenceDecoder(), # conflicts with SequenceOf
- univ.Set.tagSet: SetDecoder(), # conflicts with SetOf
- univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any
- # character string types
- char.UTF8String.tagSet: UTF8StringDecoder(),
- char.NumericString.tagSet: NumericStringDecoder(),
- char.PrintableString.tagSet: PrintableStringDecoder(),
- char.TeletexString.tagSet: TeletexStringDecoder(),
- char.VideotexString.tagSet: VideotexStringDecoder(),
- char.IA5String.tagSet: IA5StringDecoder(),
- char.GraphicString.tagSet: GraphicStringDecoder(),
- char.VisibleString.tagSet: VisibleStringDecoder(),
- char.GeneralString.tagSet: GeneralStringDecoder(),
- char.UniversalString.tagSet: UniversalStringDecoder(),
- char.BMPString.tagSet: BMPStringDecoder(),
- # useful types
- useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(),
- useful.UTCTime.tagSet: UTCTimeDecoder()
- }
-
-# Type-to-codec map for ambiguous ASN.1 types
-typeMap = {
- univ.Set.typeId: SetDecoder(),
- univ.SetOf.typeId: SetOfDecoder(),
- univ.Sequence.typeId: SequenceDecoder(),
- univ.SequenceOf.typeId: SequenceOfDecoder(),
- univ.Choice.typeId: ChoiceDecoder(),
- univ.Any.typeId: AnyDecoder()
- }
-
-( stDecodeTag, stDecodeLength, stGetValueDecoder, stGetValueDecoderByAsn1Spec,
- stGetValueDecoderByTag, stTryAsExplicitTag, stDecodeValue,
- stDumpRawValue, stErrorCondition, stStop ) = [x for x in range(10)]
-
-class Decoder:
- defaultErrorState = stErrorCondition
-# defaultErrorState = stDumpRawValue
- defaultRawDecoder = AnyDecoder()
- def __init__(self, tagMap, typeMap={}):
- self.__tagMap = tagMap
- self.__typeMap = typeMap
- self.__endOfOctetsTagSet = eoo.endOfOctets.getTagSet()
- # Tag & TagSet objects caches
- self.__tagCache = {}
- self.__tagSetCache = {}
-
- def __call__(self, substrate, asn1Spec=None, tagSet=None,
- length=None, state=stDecodeTag, recursiveFlag=1,
- substrateFun=None):
- if debug.logger & debug.flagDecoder:
- debug.logger('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate)))
- fullSubstrate = substrate
- while state != stStop:
- if state == stDecodeTag:
- # Decode tag
- if not substrate:
- raise error.SubstrateUnderrunError(
- 'Short octet stream on tag decoding'
- )
- if not isOctetsType(substrate) and \
- not isinstance(substrate, univ.OctetString):
- raise error.PyAsn1Error('Bad octet stream type')
-
- firstOctet = substrate[0]
- substrate = substrate[1:]
- if firstOctet in self.__tagCache:
- lastTag = self.__tagCache[firstOctet]
- else:
- t = oct2int(firstOctet)
- tagClass = t&0xC0
- tagFormat = t&0x20
- tagId = t&0x1F
- if tagId == 0x1F:
- tagId = 0
- while 1:
- if not substrate:
- raise error.SubstrateUnderrunError(
- 'Short octet stream on long tag decoding'
- )
- t = oct2int(substrate[0])
- tagId = tagId << 7 | (t&0x7F)
- substrate = substrate[1:]
- if not t&0x80:
- break
- lastTag = tag.Tag(
- tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
- )
- if tagId < 31:
- # cache short tags
- self.__tagCache[firstOctet] = lastTag
- if tagSet is None:
- if firstOctet in self.__tagSetCache:
- tagSet = self.__tagSetCache[firstOctet]
- else:
- # base tag not recovered
- tagSet = tag.TagSet((), lastTag)
- if firstOctet in self.__tagCache:
- self.__tagSetCache[firstOctet] = tagSet
- else:
- tagSet = lastTag + tagSet
- state = stDecodeLength
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('tag decoded into %r, decoding length' % tagSet)
- if state == stDecodeLength:
- # Decode length
- if not substrate:
- raise error.SubstrateUnderrunError(
- 'Short octet stream on length decoding'
- )
- firstOctet = oct2int(substrate[0])
- if firstOctet == 128:
- size = 1
- length = -1
- elif firstOctet < 128:
- length, size = firstOctet, 1
- else:
- size = firstOctet & 0x7F
- # encoded in size bytes
- length = 0
- lengthString = substrate[1:size+1]
- # missing check on maximum size, which shouldn't be a
- # problem, we can handle more than is possible
- if len(lengthString) != size:
- raise error.SubstrateUnderrunError(
- '%s<%s at %s' %
- (size, len(lengthString), tagSet)
- )
- for char in lengthString:
- length = (length << 8) | oct2int(char)
- size = size + 1
- substrate = substrate[size:]
- if length != -1 and len(substrate) < length:
- raise error.SubstrateUnderrunError(
- '%d-octet short' % (length - len(substrate))
- )
- state = stGetValueDecoder
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('value length decoded into %d, payload substrate is: %s' % (length, debug.hexdump(length == -1 and substrate or substrate[:length])))
- if state == stGetValueDecoder:
- if asn1Spec is None:
- state = stGetValueDecoderByTag
- else:
- state = stGetValueDecoderByAsn1Spec
- #
- # There're two ways of creating subtypes in ASN.1 what influences
- # decoder operation. These methods are:
- # 1) Either base types used in or no IMPLICIT tagging has been
- # applied on subtyping.
- # 2) Subtype syntax drops base type information (by means of
- # IMPLICIT tagging.
- # The first case allows for complete tag recovery from substrate
- # while the second one requires original ASN.1 type spec for
- # decoding.
- #
- # In either case a set of tags (tagSet) is coming from substrate
- # in an incremental, tag-by-tag fashion (this is the case of
- # EXPLICIT tag which is most basic). Outermost tag comes first
- # from the wire.
- #
- if state == stGetValueDecoderByTag:
- if tagSet in self.__tagMap:
- concreteDecoder = self.__tagMap[tagSet]
- else:
- concreteDecoder = None
- if concreteDecoder:
- state = stDecodeValue
- else:
- _k = tagSet[:1]
- if _k in self.__tagMap:
- concreteDecoder = self.__tagMap[_k]
- else:
- concreteDecoder = None
- if concreteDecoder:
- state = stDecodeValue
- else:
- state = stTryAsExplicitTag
- if debug.logger and debug.logger & debug.flagDecoder:
- debug.logger('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as explicit tag'))
- debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
- if state == stGetValueDecoderByAsn1Spec:
- if isinstance(asn1Spec, (dict, tagmap.TagMap)):
- if tagSet in asn1Spec:
- __chosenSpec = asn1Spec[tagSet]
- else:
- __chosenSpec = None
- if debug.logger and debug.logger & debug.flagDecoder:
- debug.logger('candidate ASN.1 spec is a map of:')
- for t, v in asn1Spec.getPosMap().items():
- debug.logger(' %r -> %s' % (t, v.__class__.__name__))
- if asn1Spec.getNegMap():
- debug.logger('but neither of: ')
- for i in asn1Spec.getNegMap().items():
- debug.logger(' %r -> %s' % (t, v.__class__.__name__))
- debug.logger('new candidate ASN.1 spec is %s, chosen by %r' % (__chosenSpec is None and '<none>' or __chosenSpec.__class__.__name__, tagSet))
- else:
- __chosenSpec = asn1Spec
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
- if __chosenSpec is not None and (
- tagSet == __chosenSpec.getTagSet() or \
- tagSet in __chosenSpec.getTagMap()
- ):
- # use base type for codec lookup to recover untagged types
- baseTagSet = __chosenSpec.baseTagSet
- if __chosenSpec.typeId is not None and \
- __chosenSpec.typeId in self.__typeMap:
- # ambiguous type
- concreteDecoder = self.__typeMap[__chosenSpec.typeId]
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('value decoder chosen for an ambiguous type by type ID %s' % (__chosenSpec.typeId,))
- elif baseTagSet in self.__tagMap:
- # base type or tagged subtype
- concreteDecoder = self.__tagMap[baseTagSet]
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('value decoder chosen by base %r' % (baseTagSet,))
- else:
- concreteDecoder = None
- if concreteDecoder:
- asn1Spec = __chosenSpec
- state = stDecodeValue
- else:
- state = stTryAsExplicitTag
- elif tagSet == self.__endOfOctetsTagSet:
- concreteDecoder = self.__tagMap[tagSet]
- state = stDecodeValue
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('end-of-octets found')
- else:
- concreteDecoder = None
- state = stTryAsExplicitTag
- if debug.logger and debug.logger & debug.flagDecoder:
- debug.logger('codec %s chosen by ASN.1 spec, decoding %s' % (state == stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as explicit tag'))
- debug.scope.push(__chosenSpec is None and '?' or __chosenSpec.__class__.__name__)
- if state == stTryAsExplicitTag:
- if tagSet and \
- tagSet[0][1] == tag.tagFormatConstructed and \
- tagSet[0][0] != tag.tagClassUniversal:
- # Assume explicit tagging
- concreteDecoder = explicitTagDecoder
- state = stDecodeValue
- else:
- concreteDecoder = None
- state = self.defaultErrorState
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as failure'))
- if state == stDumpRawValue:
- concreteDecoder = self.defaultRawDecoder
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
- state = stDecodeValue
- if state == stDecodeValue:
- if recursiveFlag == 0 and not substrateFun: # legacy
- substrateFun = lambda a,b,c: (a,b[:c])
- if length == -1: # indef length
- value, substrate = concreteDecoder.indefLenValueDecoder(
- fullSubstrate, substrate, asn1Spec, tagSet, length,
- stGetValueDecoder, self, substrateFun
- )
- else:
- value, substrate = concreteDecoder.valueDecoder(
- fullSubstrate, substrate, asn1Spec, tagSet, length,
- stGetValueDecoder, self, substrateFun
- )
- state = stStop
- debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, value.prettyPrint(), substrate and debug.hexdump(substrate) or '<none>'))
- if state == stErrorCondition:
- raise error.PyAsn1Error(
- '%r not in asn1Spec: %r' % (tagSet, asn1Spec)
- )
- if debug.logger and debug.logger & debug.flagDecoder:
- debug.scope.pop()
- debug.logger('decoder left scope %s, call completed' % debug.scope)
- return value, substrate
-
-decode = Decoder(tagMap, typeMap)
-
-# XXX
-# non-recursive decoding; return position rather than substrate

Powered by Google App Engine
This is Rietveld 408576698