Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(409)

Unified Diff: tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py

Issue 1264873003: Add gsutil/third_party to telemetry/third_party/gsutilz/third_party. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Remove httplib2 Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py
diff --git a/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py b/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py
new file mode 100644
index 0000000000000000000000000000000000000000..be0cf49074d5d6bdfc84dbfc1cdeb321f3a3d3da
--- /dev/null
+++ b/tools/telemetry/third_party/gsutilz/third_party/pyasn1/pyasn1/codec/ber/decoder.py
@@ -0,0 +1,808 @@
+# BER decoder
+from pyasn1.type import tag, base, univ, char, useful, tagmap
+from pyasn1.codec.ber import eoo
+from pyasn1.compat.octets import oct2int, octs2ints, isOctetsType
+from pyasn1 import debug, error
+
+class AbstractDecoder:
+ protoComponent = None
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,))
+
+ def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,))
+
+class AbstractSimpleDecoder(AbstractDecoder):
+ tagFormats = (tag.tagFormatSimple,)
+ def _createComponent(self, asn1Spec, tagSet, value=None):
+ if tagSet[0][1] not in self.tagFormats:
+ raise error.PyAsn1Error('Invalid tag format %r for %r' % (tagSet[0], self.protoComponent,))
+ if asn1Spec is None:
+ return self.protoComponent.clone(value, tagSet)
+ elif value is None:
+ return asn1Spec
+ else:
+ return asn1Spec.clone(value)
+
+class AbstractConstructedDecoder(AbstractDecoder):
+ tagFormats = (tag.tagFormatConstructed,)
+ def _createComponent(self, asn1Spec, tagSet, value=None):
+ if tagSet[0][1] not in self.tagFormats:
+ raise error.PyAsn1Error('Invalid tag format %r for %r' % (tagSet[0], self.protoComponent,))
+ if asn1Spec is None:
+ return self.protoComponent.clone(tagSet)
+ else:
+ return asn1Spec.clone()
+
+class EndOfOctetsDecoder(AbstractSimpleDecoder):
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ return eoo.endOfOctets, substrate[length:]
+
+class ExplicitTagDecoder(AbstractSimpleDecoder):
+ protoComponent = univ.Any('')
+ tagFormats = (tag.tagFormatConstructed,)
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ if substrateFun:
+ return substrateFun(
+ self._createComponent(asn1Spec, tagSet, ''),
+ substrate, length
+ )
+ head, tail = substrate[:length], substrate[length:]
+ value, _ = decodeFun(head, asn1Spec, tagSet, length)
+ return value, tail
+
+ def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ if substrateFun:
+ return substrateFun(
+ self._createComponent(asn1Spec, tagSet, ''),
+ substrate, length
+ )
+ value, substrate = decodeFun(substrate, asn1Spec, tagSet, length)
+ terminator, substrate = decodeFun(substrate)
+ if eoo.endOfOctets.isSameTypeWith(terminator) and \
+ terminator == eoo.endOfOctets:
+ return value, substrate
+ else:
+ raise error.PyAsn1Error('Missing end-of-octets terminator')
+
+explicitTagDecoder = ExplicitTagDecoder()
+
+class IntegerDecoder(AbstractSimpleDecoder):
+ protoComponent = univ.Integer(0)
+ precomputedValues = {
+ '\x00': 0,
+ '\x01': 1,
+ '\x02': 2,
+ '\x03': 3,
+ '\x04': 4,
+ '\x05': 5,
+ '\x06': 6,
+ '\x07': 7,
+ '\x08': 8,
+ '\x09': 9,
+ '\xff': -1,
+ '\xfe': -2,
+ '\xfd': -3,
+ '\xfc': -4,
+ '\xfb': -5
+ }
+
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if not head:
+ return self._createComponent(asn1Spec, tagSet, 0), tail
+ if head in self.precomputedValues:
+ value = self.precomputedValues[head]
+ else:
+ firstOctet = oct2int(head[0])
+ if firstOctet & 0x80:
+ value = -1
+ else:
+ value = 0
+ for octet in head:
+ value = value << 8 | oct2int(octet)
+ return self._createComponent(asn1Spec, tagSet, value), tail
+
+class BooleanDecoder(IntegerDecoder):
+ protoComponent = univ.Boolean(0)
+ def _createComponent(self, asn1Spec, tagSet, value=None):
+ return IntegerDecoder._createComponent(self, asn1Spec, tagSet, value and 1 or 0)
+
+class BitStringDecoder(AbstractSimpleDecoder):
+ protoComponent = univ.BitString(())
+ tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed)
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check?
+ if not head:
+ raise error.PyAsn1Error('Empty substrate')
+ trailingBits = oct2int(head[0])
+ if trailingBits > 7:
+ raise error.PyAsn1Error(
+ 'Trailing bits overflow %s' % trailingBits
+ )
+ head = head[1:]
+ lsb = p = 0; l = len(head)-1; b = ()
+ while p <= l:
+ if p == l:
+ lsb = trailingBits
+ j = 7
+ o = oct2int(head[p])
+ while j >= lsb:
+ b = b + ((o>>j)&0x01,)
+ j = j - 1
+ p = p + 1
+ return self._createComponent(asn1Spec, tagSet, b), tail
+ r = self._createComponent(asn1Spec, tagSet, ())
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while head:
+ component, head = decodeFun(head)
+ r = r + component
+ return r, tail
+
+ def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ r = self._createComponent(asn1Spec, tagSet, '')
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while substrate:
+ component, substrate = decodeFun(substrate)
+ if eoo.endOfOctets.isSameTypeWith(component) and \
+ component == eoo.endOfOctets:
+ break
+ r = r + component
+ else:
+ raise error.SubstrateUnderrunError(
+ 'No EOO seen before substrate ends'
+ )
+ return r, substrate
+
+class OctetStringDecoder(AbstractSimpleDecoder):
+ protoComponent = univ.OctetString('')
+ tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed)
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check?
+ return self._createComponent(asn1Spec, tagSet, head), tail
+ r = self._createComponent(asn1Spec, tagSet, '')
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while head:
+ component, head = decodeFun(head)
+ r = r + component
+ return r, tail
+
+ def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ r = self._createComponent(asn1Spec, tagSet, '')
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while substrate:
+ component, substrate = decodeFun(substrate)
+ if eoo.endOfOctets.isSameTypeWith(component) and \
+ component == eoo.endOfOctets:
+ break
+ r = r + component
+ else:
+ raise error.SubstrateUnderrunError(
+ 'No EOO seen before substrate ends'
+ )
+ return r, substrate
+
+class NullDecoder(AbstractSimpleDecoder):
+ protoComponent = univ.Null('')
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ r = self._createComponent(asn1Spec, tagSet)
+ if head:
+ raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
+ return r, tail
+
+class ObjectIdentifierDecoder(AbstractSimpleDecoder):
+ protoComponent = univ.ObjectIdentifier(())
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if not head:
+ raise error.PyAsn1Error('Empty substrate')
+
+ # Get the first subid
+ subId = oct2int(head[0])
+ oid = divmod(subId, 40)
+
+ index = 1
+ substrateLen = len(head)
+ while index < substrateLen:
+ subId = oct2int(head[index])
+ index = index + 1
+ if subId == 128:
+ # ASN.1 spec forbids leading zeros (0x80) in sub-ID OID
+ # encoding, tolerating it opens a vulnerability.
+ # See http://www.cosic.esat.kuleuven.be/publications/article-1432.pdf page 7
+ raise error.PyAsn1Error('Invalid leading 0x80 in sub-OID')
+ elif subId > 128:
+ # Construct subid from a number of octets
+ nextSubId = subId
+ subId = 0
+ while nextSubId >= 128:
+ subId = (subId << 7) + (nextSubId & 0x7F)
+ if index >= substrateLen:
+ raise error.SubstrateUnderrunError(
+ 'Short substrate for sub-OID past %s' % (oid,)
+ )
+ nextSubId = oct2int(head[index])
+ index = index + 1
+ subId = (subId << 7) + nextSubId
+ oid = oid + (subId,)
+ return self._createComponent(asn1Spec, tagSet, oid), tail
+
+class RealDecoder(AbstractSimpleDecoder):
+ protoComponent = univ.Real()
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if not head:
+ return self._createComponent(asn1Spec, tagSet, 0.0), tail
+ fo = oct2int(head[0]); head = head[1:]
+ if fo & 0x80: # binary enoding
+ n = (fo & 0x03) + 1
+ if n == 4:
+ n = oct2int(head[0])
+ eo, head = head[:n], head[n:]
+ if not eo or not head:
+ raise error.PyAsn1Error('Real exponent screwed')
+ e = oct2int(eo[0]) & 0x80 and -1 or 0
+ while eo: # exponent
+ e <<= 8
+ e |= oct2int(eo[0])
+ eo = eo[1:]
+ p = 0
+ while head: # value
+ p <<= 8
+ p |= oct2int(head[0])
+ head = head[1:]
+ if fo & 0x40: # sign bit
+ p = -p
+ value = (p, 2, e)
+ elif fo & 0x40: # infinite value
+ value = fo & 0x01 and '-inf' or 'inf'
+ elif fo & 0xc0 == 0: # character encoding
+ try:
+ if fo & 0x3 == 0x1: # NR1
+ value = (int(head), 10, 0)
+ elif fo & 0x3 == 0x2: # NR2
+ value = float(head)
+ elif fo & 0x3 == 0x3: # NR3
+ value = float(head)
+ else:
+ raise error.SubstrateUnderrunError(
+ 'Unknown NR (tag %s)' % fo
+ )
+ except ValueError:
+ raise error.SubstrateUnderrunError(
+ 'Bad character Real syntax'
+ )
+ else:
+ raise error.SubstrateUnderrunError(
+ 'Unknown encoding (tag %s)' % fo
+ )
+ return self._createComponent(asn1Spec, tagSet, value), tail
+
+class SequenceDecoder(AbstractConstructedDecoder):
+ protoComponent = univ.Sequence()
+ def _getComponentTagMap(self, r, idx):
+ try:
+ return r.getComponentTagMapNearPosition(idx)
+ except error.PyAsn1Error:
+ return
+
+ def _getComponentPositionByType(self, r, t, idx):
+ return r.getComponentPositionNearType(t, idx)
+
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ r = self._createComponent(asn1Spec, tagSet)
+ idx = 0
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while head:
+ asn1Spec = self._getComponentTagMap(r, idx)
+ component, head = decodeFun(head, asn1Spec)
+ idx = self._getComponentPositionByType(
+ r, component.getEffectiveTagSet(), idx
+ )
+ r.setComponentByPosition(idx, component, asn1Spec is None)
+ idx = idx + 1
+ r.setDefaultComponents()
+ r.verifySizeSpec()
+ return r, tail
+
+ def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ r = self._createComponent(asn1Spec, tagSet)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ idx = 0
+ while substrate:
+ asn1Spec = self._getComponentTagMap(r, idx)
+ component, substrate = decodeFun(substrate, asn1Spec)
+ if eoo.endOfOctets.isSameTypeWith(component) and \
+ component == eoo.endOfOctets:
+ break
+ idx = self._getComponentPositionByType(
+ r, component.getEffectiveTagSet(), idx
+ )
+ r.setComponentByPosition(idx, component, asn1Spec is None)
+ idx = idx + 1
+ else:
+ raise error.SubstrateUnderrunError(
+ 'No EOO seen before substrate ends'
+ )
+ r.setDefaultComponents()
+ r.verifySizeSpec()
+ return r, substrate
+
+class SequenceOfDecoder(AbstractConstructedDecoder):
+ protoComponent = univ.SequenceOf()
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ r = self._createComponent(asn1Spec, tagSet)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ asn1Spec = r.getComponentType()
+ idx = 0
+ while head:
+ component, head = decodeFun(head, asn1Spec)
+ r.setComponentByPosition(idx, component, asn1Spec is None)
+ idx = idx + 1
+ r.verifySizeSpec()
+ return r, tail
+
+ def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ r = self._createComponent(asn1Spec, tagSet)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ asn1Spec = r.getComponentType()
+ idx = 0
+ while substrate:
+ component, substrate = decodeFun(substrate, asn1Spec)
+ if eoo.endOfOctets.isSameTypeWith(component) and \
+ component == eoo.endOfOctets:
+ break
+ r.setComponentByPosition(idx, component, asn1Spec is None)
+ idx = idx + 1
+ else:
+ raise error.SubstrateUnderrunError(
+ 'No EOO seen before substrate ends'
+ )
+ r.verifySizeSpec()
+ return r, substrate
+
+class SetDecoder(SequenceDecoder):
+ protoComponent = univ.Set()
+ def _getComponentTagMap(self, r, idx):
+ return r.getComponentTagMap()
+
+ def _getComponentPositionByType(self, r, t, idx):
+ nextIdx = r.getComponentPositionByType(t)
+ if nextIdx is None:
+ return idx
+ else:
+ return nextIdx
+
+class SetOfDecoder(SequenceOfDecoder):
+ protoComponent = univ.SetOf()
+
+class ChoiceDecoder(AbstractConstructedDecoder):
+ protoComponent = univ.Choice()
+ tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed)
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ r = self._createComponent(asn1Spec, tagSet)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ if r.getTagSet() == tagSet: # explicitly tagged Choice
+ component, head = decodeFun(
+ head, r.getComponentTagMap()
+ )
+ else:
+ component, head = decodeFun(
+ head, r.getComponentTagMap(), tagSet, length, state
+ )
+ if isinstance(component, univ.Choice):
+ effectiveTagSet = component.getEffectiveTagSet()
+ else:
+ effectiveTagSet = component.getTagSet()
+ r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None)
+ return r, tail
+
+ def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ r = self._createComponent(asn1Spec, tagSet)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ if r.getTagSet() == tagSet: # explicitly tagged Choice
+ component, substrate = decodeFun(substrate, r.getComponentTagMap())
+ eooMarker, substrate = decodeFun(substrate) # eat up EOO marker
+ if not eoo.endOfOctets.isSameTypeWith(eooMarker) or \
+ eooMarker != eoo.endOfOctets:
+ raise error.PyAsn1Error('No EOO seen before substrate ends')
+ else:
+ component, substrate= decodeFun(
+ substrate, r.getComponentTagMap(), tagSet, length, state
+ )
+ if isinstance(component, univ.Choice):
+ effectiveTagSet = component.getEffectiveTagSet()
+ else:
+ effectiveTagSet = component.getTagSet()
+ r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None)
+ return r, substrate
+
+class AnyDecoder(AbstractSimpleDecoder):
+ protoComponent = univ.Any()
+ tagFormats = (tag.tagFormatSimple, tag.tagFormatConstructed)
+ def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ if asn1Spec is None or \
+ asn1Spec is not None and tagSet != asn1Spec.getTagSet():
+ # untagged Any container, recover inner header substrate
+ length = length + len(fullSubstrate) - len(substrate)
+ substrate = fullSubstrate
+ if substrateFun:
+ return substrateFun(self._createComponent(asn1Spec, tagSet),
+ substrate, length)
+ head, tail = substrate[:length], substrate[length:]
+ return self._createComponent(asn1Spec, tagSet, value=head), tail
+
+ def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
+ length, state, decodeFun, substrateFun):
+ if asn1Spec is not None and tagSet == asn1Spec.getTagSet():
+ # tagged Any type -- consume header substrate
+ header = ''
+ else:
+ # untagged Any, recover header substrate
+ header = fullSubstrate[:-len(substrate)]
+
+ r = self._createComponent(asn1Spec, tagSet, header)
+
+ # Any components do not inherit initial tag
+ asn1Spec = self.protoComponent
+
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while substrate:
+ component, substrate = decodeFun(substrate, asn1Spec)
+ if eoo.endOfOctets.isSameTypeWith(component) and \
+ component == eoo.endOfOctets:
+ break
+ r = r + component
+ else:
+ raise error.SubstrateUnderrunError(
+ 'No EOO seen before substrate ends'
+ )
+ return r, substrate
+
+# character string types
+class UTF8StringDecoder(OctetStringDecoder):
+ protoComponent = char.UTF8String()
+class NumericStringDecoder(OctetStringDecoder):
+ protoComponent = char.NumericString()
+class PrintableStringDecoder(OctetStringDecoder):
+ protoComponent = char.PrintableString()
+class TeletexStringDecoder(OctetStringDecoder):
+ protoComponent = char.TeletexString()
+class VideotexStringDecoder(OctetStringDecoder):
+ protoComponent = char.VideotexString()
+class IA5StringDecoder(OctetStringDecoder):
+ protoComponent = char.IA5String()
+class GraphicStringDecoder(OctetStringDecoder):
+ protoComponent = char.GraphicString()
+class VisibleStringDecoder(OctetStringDecoder):
+ protoComponent = char.VisibleString()
+class GeneralStringDecoder(OctetStringDecoder):
+ protoComponent = char.GeneralString()
+class UniversalStringDecoder(OctetStringDecoder):
+ protoComponent = char.UniversalString()
+class BMPStringDecoder(OctetStringDecoder):
+ protoComponent = char.BMPString()
+
+# "useful" types
+class GeneralizedTimeDecoder(OctetStringDecoder):
+ protoComponent = useful.GeneralizedTime()
+class UTCTimeDecoder(OctetStringDecoder):
+ protoComponent = useful.UTCTime()
+
+tagMap = {
+ eoo.endOfOctets.tagSet: EndOfOctetsDecoder(),
+ univ.Integer.tagSet: IntegerDecoder(),
+ univ.Boolean.tagSet: BooleanDecoder(),
+ univ.BitString.tagSet: BitStringDecoder(),
+ univ.OctetString.tagSet: OctetStringDecoder(),
+ univ.Null.tagSet: NullDecoder(),
+ univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(),
+ univ.Enumerated.tagSet: IntegerDecoder(),
+ univ.Real.tagSet: RealDecoder(),
+ univ.Sequence.tagSet: SequenceDecoder(), # conflicts with SequenceOf
+ univ.Set.tagSet: SetDecoder(), # conflicts with SetOf
+ univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any
+ # character string types
+ char.UTF8String.tagSet: UTF8StringDecoder(),
+ char.NumericString.tagSet: NumericStringDecoder(),
+ char.PrintableString.tagSet: PrintableStringDecoder(),
+ char.TeletexString.tagSet: TeletexStringDecoder(),
+ char.VideotexString.tagSet: VideotexStringDecoder(),
+ char.IA5String.tagSet: IA5StringDecoder(),
+ char.GraphicString.tagSet: GraphicStringDecoder(),
+ char.VisibleString.tagSet: VisibleStringDecoder(),
+ char.GeneralString.tagSet: GeneralStringDecoder(),
+ char.UniversalString.tagSet: UniversalStringDecoder(),
+ char.BMPString.tagSet: BMPStringDecoder(),
+ # useful types
+ useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(),
+ useful.UTCTime.tagSet: UTCTimeDecoder()
+ }
+
+# Type-to-codec map for ambiguous ASN.1 types
+typeMap = {
+ univ.Set.typeId: SetDecoder(),
+ univ.SetOf.typeId: SetOfDecoder(),
+ univ.Sequence.typeId: SequenceDecoder(),
+ univ.SequenceOf.typeId: SequenceOfDecoder(),
+ univ.Choice.typeId: ChoiceDecoder(),
+ univ.Any.typeId: AnyDecoder()
+ }
+
+( stDecodeTag, stDecodeLength, stGetValueDecoder, stGetValueDecoderByAsn1Spec,
+ stGetValueDecoderByTag, stTryAsExplicitTag, stDecodeValue,
+ stDumpRawValue, stErrorCondition, stStop ) = [x for x in range(10)]
+
+class Decoder:
+ defaultErrorState = stErrorCondition
+# defaultErrorState = stDumpRawValue
+ defaultRawDecoder = AnyDecoder()
+ def __init__(self, tagMap, typeMap={}):
+ self.__tagMap = tagMap
+ self.__typeMap = typeMap
+ self.__endOfOctetsTagSet = eoo.endOfOctets.getTagSet()
+ # Tag & TagSet objects caches
+ self.__tagCache = {}
+ self.__tagSetCache = {}
+
+ def __call__(self, substrate, asn1Spec=None, tagSet=None,
+ length=None, state=stDecodeTag, recursiveFlag=1,
+ substrateFun=None):
+ if debug.logger & debug.flagDecoder:
+ debug.logger('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate)))
+ fullSubstrate = substrate
+ while state != stStop:
+ if state == stDecodeTag:
+ # Decode tag
+ if not substrate:
+ raise error.SubstrateUnderrunError(
+ 'Short octet stream on tag decoding'
+ )
+ if not isOctetsType(substrate) and \
+ not isinstance(substrate, univ.OctetString):
+ raise error.PyAsn1Error('Bad octet stream type')
+
+ firstOctet = substrate[0]
+ substrate = substrate[1:]
+ if firstOctet in self.__tagCache:
+ lastTag = self.__tagCache[firstOctet]
+ else:
+ t = oct2int(firstOctet)
+ tagClass = t&0xC0
+ tagFormat = t&0x20
+ tagId = t&0x1F
+ if tagId == 0x1F:
+ tagId = 0
+ while 1:
+ if not substrate:
+ raise error.SubstrateUnderrunError(
+ 'Short octet stream on long tag decoding'
+ )
+ t = oct2int(substrate[0])
+ tagId = tagId << 7 | (t&0x7F)
+ substrate = substrate[1:]
+ if not t&0x80:
+ break
+ lastTag = tag.Tag(
+ tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
+ )
+ if tagId < 31:
+ # cache short tags
+ self.__tagCache[firstOctet] = lastTag
+ if tagSet is None:
+ if firstOctet in self.__tagSetCache:
+ tagSet = self.__tagSetCache[firstOctet]
+ else:
+ # base tag not recovered
+ tagSet = tag.TagSet((), lastTag)
+ if firstOctet in self.__tagCache:
+ self.__tagSetCache[firstOctet] = tagSet
+ else:
+ tagSet = lastTag + tagSet
+ state = stDecodeLength
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('tag decoded into %r, decoding length' % tagSet)
+ if state == stDecodeLength:
+ # Decode length
+ if not substrate:
+ raise error.SubstrateUnderrunError(
+ 'Short octet stream on length decoding'
+ )
+ firstOctet = oct2int(substrate[0])
+ if firstOctet == 128:
+ size = 1
+ length = -1
+ elif firstOctet < 128:
+ length, size = firstOctet, 1
+ else:
+ size = firstOctet & 0x7F
+ # encoded in size bytes
+ length = 0
+ lengthString = substrate[1:size+1]
+ # missing check on maximum size, which shouldn't be a
+ # problem, we can handle more than is possible
+ if len(lengthString) != size:
+ raise error.SubstrateUnderrunError(
+ '%s<%s at %s' %
+ (size, len(lengthString), tagSet)
+ )
+ for char in lengthString:
+ length = (length << 8) | oct2int(char)
+ size = size + 1
+ substrate = substrate[size:]
+ if length != -1 and len(substrate) < length:
+ raise error.SubstrateUnderrunError(
+ '%d-octet short' % (length - len(substrate))
+ )
+ state = stGetValueDecoder
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('value length decoded into %d, payload substrate is: %s' % (length, debug.hexdump(length == -1 and substrate or substrate[:length])))
+ if state == stGetValueDecoder:
+ if asn1Spec is None:
+ state = stGetValueDecoderByTag
+ else:
+ state = stGetValueDecoderByAsn1Spec
+ #
+ # There're two ways of creating subtypes in ASN.1 what influences
+ # decoder operation. These methods are:
+ # 1) Either base types used in or no IMPLICIT tagging has been
+ # applied on subtyping.
+ # 2) Subtype syntax drops base type information (by means of
+ # IMPLICIT tagging.
+ # The first case allows for complete tag recovery from substrate
+ # while the second one requires original ASN.1 type spec for
+ # decoding.
+ #
+ # In either case a set of tags (tagSet) is coming from substrate
+ # in an incremental, tag-by-tag fashion (this is the case of
+ # EXPLICIT tag which is most basic). Outermost tag comes first
+ # from the wire.
+ #
+ if state == stGetValueDecoderByTag:
+ if tagSet in self.__tagMap:
+ concreteDecoder = self.__tagMap[tagSet]
+ else:
+ concreteDecoder = None
+ if concreteDecoder:
+ state = stDecodeValue
+ else:
+ _k = tagSet[:1]
+ if _k in self.__tagMap:
+ concreteDecoder = self.__tagMap[_k]
+ else:
+ concreteDecoder = None
+ if concreteDecoder:
+ state = stDecodeValue
+ else:
+ state = stTryAsExplicitTag
+ if debug.logger and debug.logger & debug.flagDecoder:
+ debug.logger('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as explicit tag'))
+ debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
+ if state == stGetValueDecoderByAsn1Spec:
+ if isinstance(asn1Spec, (dict, tagmap.TagMap)):
+ if tagSet in asn1Spec:
+ __chosenSpec = asn1Spec[tagSet]
+ else:
+ __chosenSpec = None
+ if debug.logger and debug.logger & debug.flagDecoder:
+ debug.logger('candidate ASN.1 spec is a map of:')
+ for t, v in asn1Spec.getPosMap().items():
+ debug.logger(' %r -> %s' % (t, v.__class__.__name__))
+ if asn1Spec.getNegMap():
+ debug.logger('but neither of: ')
+ for i in asn1Spec.getNegMap().items():
+ debug.logger(' %r -> %s' % (t, v.__class__.__name__))
+ debug.logger('new candidate ASN.1 spec is %s, chosen by %r' % (__chosenSpec is None and '<none>' or __chosenSpec.__class__.__name__, tagSet))
+ else:
+ __chosenSpec = asn1Spec
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
+ if __chosenSpec is not None and (
+ tagSet == __chosenSpec.getTagSet() or \
+ tagSet in __chosenSpec.getTagMap()
+ ):
+ # use base type for codec lookup to recover untagged types
+ baseTagSet = __chosenSpec.baseTagSet
+ if __chosenSpec.typeId is not None and \
+ __chosenSpec.typeId in self.__typeMap:
+ # ambiguous type
+ concreteDecoder = self.__typeMap[__chosenSpec.typeId]
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('value decoder chosen for an ambiguous type by type ID %s' % (__chosenSpec.typeId,))
+ elif baseTagSet in self.__tagMap:
+ # base type or tagged subtype
+ concreteDecoder = self.__tagMap[baseTagSet]
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('value decoder chosen by base %r' % (baseTagSet,))
+ else:
+ concreteDecoder = None
+ if concreteDecoder:
+ asn1Spec = __chosenSpec
+ state = stDecodeValue
+ else:
+ state = stTryAsExplicitTag
+ elif tagSet == self.__endOfOctetsTagSet:
+ concreteDecoder = self.__tagMap[tagSet]
+ state = stDecodeValue
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('end-of-octets found')
+ else:
+ concreteDecoder = None
+ state = stTryAsExplicitTag
+ if debug.logger and debug.logger & debug.flagDecoder:
+ debug.logger('codec %s chosen by ASN.1 spec, decoding %s' % (state == stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as explicit tag'))
+ debug.scope.push(__chosenSpec is None and '?' or __chosenSpec.__class__.__name__)
+ if state == stTryAsExplicitTag:
+ if tagSet and \
+ tagSet[0][1] == tag.tagFormatConstructed and \
+ tagSet[0][0] != tag.tagClassUniversal:
+ # Assume explicit tagging
+ concreteDecoder = explicitTagDecoder
+ state = stDecodeValue
+ else:
+ concreteDecoder = None
+ state = self.defaultErrorState
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state == stDecodeValue and 'value' or 'as failure'))
+ if state == stDumpRawValue:
+ concreteDecoder = self.defaultRawDecoder
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
+ state = stDecodeValue
+ if state == stDecodeValue:
+ if recursiveFlag == 0 and not substrateFun: # legacy
+ substrateFun = lambda a,b,c: (a,b[:c])
+ if length == -1: # indef length
+ value, substrate = concreteDecoder.indefLenValueDecoder(
+ fullSubstrate, substrate, asn1Spec, tagSet, length,
+ stGetValueDecoder, self, substrateFun
+ )
+ else:
+ value, substrate = concreteDecoder.valueDecoder(
+ fullSubstrate, substrate, asn1Spec, tagSet, length,
+ stGetValueDecoder, self, substrateFun
+ )
+ state = stStop
+ debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, value.prettyPrint(), substrate and debug.hexdump(substrate) or '<none>'))
+ if state == stErrorCondition:
+ raise error.PyAsn1Error(
+ '%r not in asn1Spec: %r' % (tagSet, asn1Spec)
+ )
+ if debug.logger and debug.logger & debug.flagDecoder:
+ debug.scope.pop()
+ debug.logger('decoder left scope %s, call completed' % debug.scope)
+ return value, substrate
+
+decode = Decoder(tagMap, typeMap)
+
+# XXX
+# non-recursive decoding; return position rather than substrate

Powered by Google App Engine
This is Rietveld 408576698