OLD | NEW |
(Empty) | |
| 1 # BER encoder |
| 2 from pyasn1.type import base, tag, univ, char, useful |
| 3 from pyasn1.codec.ber import eoo |
| 4 from pyasn1.compat.octets import int2oct, oct2int, ints2octs, null, str2octs |
| 5 from pyasn1 import debug, error |
| 6 |
| 7 class Error(Exception): pass |
| 8 |
| 9 class AbstractItemEncoder: |
| 10 supportIndefLenMode = 1 |
| 11 def encodeTag(self, t, isConstructed): |
| 12 tagClass, tagFormat, tagId = t.asTuple() # this is a hotspot |
| 13 v = tagClass | tagFormat |
| 14 if isConstructed: |
| 15 v = v|tag.tagFormatConstructed |
| 16 if tagId < 31: |
| 17 return int2oct(v|tagId) |
| 18 else: |
| 19 s = int2oct(tagId&0x7f) |
| 20 tagId = tagId >> 7 |
| 21 while tagId: |
| 22 s = int2oct(0x80|(tagId&0x7f)) + s |
| 23 tagId = tagId >> 7 |
| 24 return int2oct(v|0x1F) + s |
| 25 |
| 26 def encodeLength(self, length, defMode): |
| 27 if not defMode and self.supportIndefLenMode: |
| 28 return int2oct(0x80) |
| 29 if length < 0x80: |
| 30 return int2oct(length) |
| 31 else: |
| 32 substrate = null |
| 33 while length: |
| 34 substrate = int2oct(length&0xff) + substrate |
| 35 length = length >> 8 |
| 36 substrateLen = len(substrate) |
| 37 if substrateLen > 126: |
| 38 raise Error('Length octets overflow (%d)' % substrateLen) |
| 39 return int2oct(0x80 | substrateLen) + substrate |
| 40 |
| 41 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 42 raise Error('Not implemented') |
| 43 |
| 44 def _encodeEndOfOctets(self, encodeFun, defMode): |
| 45 if defMode or not self.supportIndefLenMode: |
| 46 return null |
| 47 else: |
| 48 return encodeFun(eoo.endOfOctets, defMode) |
| 49 |
| 50 def encode(self, encodeFun, value, defMode, maxChunkSize): |
| 51 substrate, isConstructed = self.encodeValue( |
| 52 encodeFun, value, defMode, maxChunkSize |
| 53 ) |
| 54 tagSet = value.getTagSet() |
| 55 if tagSet: |
| 56 if not isConstructed: # primitive form implies definite mode |
| 57 defMode = 1 |
| 58 return self.encodeTag( |
| 59 tagSet[-1], isConstructed |
| 60 ) + self.encodeLength( |
| 61 len(substrate), defMode |
| 62 ) + substrate + self._encodeEndOfOctets(encodeFun, defMode) |
| 63 else: |
| 64 return substrate # untagged value |
| 65 |
| 66 class EndOfOctetsEncoder(AbstractItemEncoder): |
| 67 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 68 return null, 0 |
| 69 |
| 70 class ExplicitlyTaggedItemEncoder(AbstractItemEncoder): |
| 71 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 72 if isinstance(value, base.AbstractConstructedAsn1Item): |
| 73 value = value.clone(tagSet=value.getTagSet()[:-1], |
| 74 cloneValueFlag=1) |
| 75 else: |
| 76 value = value.clone(tagSet=value.getTagSet()[:-1]) |
| 77 return encodeFun(value, defMode, maxChunkSize), 1 |
| 78 |
| 79 explicitlyTaggedItemEncoder = ExplicitlyTaggedItemEncoder() |
| 80 |
| 81 class BooleanEncoder(AbstractItemEncoder): |
| 82 supportIndefLenMode = 0 |
| 83 _true = ints2octs((1,)) |
| 84 _false = ints2octs((0,)) |
| 85 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 86 return value and self._true or self._false, 0 |
| 87 |
| 88 class IntegerEncoder(AbstractItemEncoder): |
| 89 supportIndefLenMode = 0 |
| 90 supportCompactZero = False |
| 91 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 92 if value == 0: # shortcut for zero value |
| 93 if self.supportCompactZero: |
| 94 # this seems to be a correct way for encoding zeros |
| 95 return null, 0 |
| 96 else: |
| 97 # this seems to be a widespread way for encoding zeros |
| 98 return ints2octs((0,)), 0 |
| 99 octets = [] |
| 100 value = int(value) # to save on ops on asn1 type |
| 101 while 1: |
| 102 octets.insert(0, value & 0xff) |
| 103 if value == 0 or value == -1: |
| 104 break |
| 105 value = value >> 8 |
| 106 if value == 0 and octets[0] & 0x80: |
| 107 octets.insert(0, 0) |
| 108 while len(octets) > 1 and \ |
| 109 (octets[0] == 0 and octets[1] & 0x80 == 0 or \ |
| 110 octets[0] == 0xff and octets[1] & 0x80 != 0): |
| 111 del octets[0] |
| 112 return ints2octs(octets), 0 |
| 113 |
| 114 class BitStringEncoder(AbstractItemEncoder): |
| 115 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 116 if not maxChunkSize or len(value) <= maxChunkSize*8: |
| 117 out_len = (len(value) + 7) // 8 |
| 118 out_list = out_len * [0] |
| 119 j = 7 |
| 120 i = -1 |
| 121 for val in value: |
| 122 j += 1 |
| 123 if j == 8: |
| 124 i += 1 |
| 125 j = 0 |
| 126 out_list[i] = out_list[i] | val << (7-j) |
| 127 return int2oct(7-j) + ints2octs(out_list), 0 |
| 128 else: |
| 129 pos = 0; substrate = null |
| 130 while 1: |
| 131 # count in octets |
| 132 v = value.clone(value[pos*8:pos*8+maxChunkSize*8]) |
| 133 if not v: |
| 134 break |
| 135 substrate = substrate + encodeFun(v, defMode, maxChunkSize) |
| 136 pos = pos + maxChunkSize |
| 137 return substrate, 1 |
| 138 |
| 139 class OctetStringEncoder(AbstractItemEncoder): |
| 140 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 141 if not maxChunkSize or len(value) <= maxChunkSize: |
| 142 return value.asOctets(), 0 |
| 143 else: |
| 144 pos = 0; substrate = null |
| 145 while 1: |
| 146 v = value.clone(value[pos:pos+maxChunkSize]) |
| 147 if not v: |
| 148 break |
| 149 substrate = substrate + encodeFun(v, defMode, maxChunkSize) |
| 150 pos = pos + maxChunkSize |
| 151 return substrate, 1 |
| 152 |
| 153 class NullEncoder(AbstractItemEncoder): |
| 154 supportIndefLenMode = 0 |
| 155 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 156 return null, 0 |
| 157 |
| 158 class ObjectIdentifierEncoder(AbstractItemEncoder): |
| 159 supportIndefLenMode = 0 |
| 160 precomputedValues = { |
| 161 (1, 3, 6, 1, 2): (43, 6, 1, 2), |
| 162 (1, 3, 6, 1, 4): (43, 6, 1, 4) |
| 163 } |
| 164 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 165 oid = value.asTuple() |
| 166 if oid[:5] in self.precomputedValues: |
| 167 octets = self.precomputedValues[oid[:5]] |
| 168 oid = oid[5:] |
| 169 else: |
| 170 if len(oid) < 2: |
| 171 raise error.PyAsn1Error('Short OID %s' % (value,)) |
| 172 |
| 173 octets = () |
| 174 |
| 175 # Build the first twos |
| 176 if oid[0] == 0 and 0 <= oid[1] <= 39: |
| 177 oid = (oid[1],) + oid[2:] |
| 178 elif oid[0] == 1 and 0 <= oid[1] <= 39: |
| 179 oid = (oid[1] + 40,) + oid[2:] |
| 180 elif oid[0] == 2: |
| 181 oid = (oid[1] + 80,) + oid[2:] |
| 182 else: |
| 183 raise error.PyAsn1Error( |
| 184 'Impossible initial arcs %s at %s' % (oid[:2], value) |
| 185 ) |
| 186 |
| 187 # Cycle through subIds |
| 188 for subId in oid: |
| 189 if subId > -1 and subId < 128: |
| 190 # Optimize for the common case |
| 191 octets = octets + (subId & 0x7f,) |
| 192 elif subId < 0: |
| 193 raise error.PyAsn1Error( |
| 194 'Negative OID arc %s at %s' % (subId, value) |
| 195 ) |
| 196 else: |
| 197 # Pack large Sub-Object IDs |
| 198 res = (subId & 0x7f,) |
| 199 subId = subId >> 7 |
| 200 while subId > 0: |
| 201 res = (0x80 | (subId & 0x7f),) + res |
| 202 subId = subId >> 7 |
| 203 # Add packed Sub-Object ID to resulted Object ID |
| 204 octets += res |
| 205 |
| 206 return ints2octs(octets), 0 |
| 207 |
| 208 class RealEncoder(AbstractItemEncoder): |
| 209 supportIndefLenMode = 0 |
| 210 binEncBase = 2 # set to None to choose encoding base automatically |
| 211 def _dropFloatingPoint(self, m, encbase, e): |
| 212 ms, es = 1, 1 |
| 213 if m < 0: |
| 214 ms = -1 # mantissa sign |
| 215 if e < 0: |
| 216 es = -1 # exponenta sign |
| 217 m *= ms |
| 218 if encbase == 8: |
| 219 m = m*2**(abs(e) % 3 * es) |
| 220 e = abs(e) // 3 * es |
| 221 elif encbase == 16: |
| 222 m = m*2**(abs(e) % 4 * es) |
| 223 e = abs(e) // 4 * es |
| 224 |
| 225 while 1: |
| 226 if int(m) != m: |
| 227 m *= encbase |
| 228 e -= 1 |
| 229 continue |
| 230 break |
| 231 return ms, int(m), encbase, e |
| 232 |
| 233 def _chooseEncBase(self, value): |
| 234 m, b, e = value |
| 235 base = [2, 8, 16] |
| 236 if value.binEncBase in base: |
| 237 return self._dropFloatingPoint(m, value.binEncBase, e) |
| 238 elif self.binEncBase in base: |
| 239 return self._dropFloatingPoint(m, self.binEncBase, e) |
| 240 # auto choosing base 2/8/16 |
| 241 mantissa = [m, m, m] |
| 242 exponenta = [e, e, e] |
| 243 encbase = 2 |
| 244 e = float('inf') |
| 245 for i in range(3): |
| 246 sign, mantissa[i], base[i], exponenta[i] = \ |
| 247 self._dropFloatingPoint(mantissa[i], base[i], exponenta[i]) |
| 248 if abs(exponenta[i]) < abs(e) or \ |
| 249 (abs(exponenta[i]) == abs(e) and mantissa[i] < m): |
| 250 e = exponenta[i] |
| 251 m = int(mantissa[i]) |
| 252 encbase = base[i] |
| 253 return sign, m, encbase, e |
| 254 |
| 255 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 256 if value.isPlusInfinity(): |
| 257 return int2oct(0x40), 0 |
| 258 if value.isMinusInfinity(): |
| 259 return int2oct(0x41), 0 |
| 260 m, b, e = value |
| 261 if not m: |
| 262 return null, 0 |
| 263 if b == 10: |
| 264 return str2octs('\x03%dE%s%d' % (m, e == 0 and '+' or '', e)), 0 |
| 265 elif b == 2: |
| 266 fo = 0x80 # binary encoding |
| 267 ms, m, encbase, e = self._chooseEncBase(value) |
| 268 if ms < 0: # mantissa sign |
| 269 fo = fo | 0x40 # sign bit |
| 270 # exponenta & mantissa normalization |
| 271 if encbase == 2: |
| 272 while m & 0x1 == 0: |
| 273 m >>= 1 |
| 274 e += 1 |
| 275 elif encbase == 8: |
| 276 while m & 0x7 == 0: |
| 277 m >>= 3 |
| 278 e += 1 |
| 279 fo |= 0x10 |
| 280 else: # encbase = 16 |
| 281 while m & 0xf == 0: |
| 282 m >>= 4 |
| 283 e += 1 |
| 284 fo |= 0x20 |
| 285 sf = 0 # scale factor |
| 286 while m & 0x1 == 0: |
| 287 m >>= 1 |
| 288 sf += 1 |
| 289 if sf > 3: |
| 290 raise error.PyAsn1Error('Scale factor overflow') # bug if raised |
| 291 fo |= sf << 2 |
| 292 eo = null |
| 293 if e == 0 or e == -1: |
| 294 eo = int2oct(e&0xff) |
| 295 else: |
| 296 while e not in (0, -1): |
| 297 eo = int2oct(e&0xff) + eo |
| 298 e >>= 8 |
| 299 if e == 0 and eo and oct2int(eo[0]) & 0x80: |
| 300 eo = int2oct(0) + eo |
| 301 if e == -1 and eo and not (oct2int(eo[0]) & 0x80): |
| 302 eo = int2oct(0xff) + eo |
| 303 n = len(eo) |
| 304 if n > 0xff: |
| 305 raise error.PyAsn1Error('Real exponent overflow') |
| 306 if n == 1: |
| 307 pass |
| 308 elif n == 2: |
| 309 fo |= 1 |
| 310 elif n == 3: |
| 311 fo |= 2 |
| 312 else: |
| 313 fo |= 3 |
| 314 eo = int2oct(n&0xff) + eo |
| 315 po = null |
| 316 while m: |
| 317 po = int2oct(m&0xff) + po |
| 318 m >>= 8 |
| 319 substrate = int2oct(fo) + eo + po |
| 320 return substrate, 0 |
| 321 else: |
| 322 raise error.PyAsn1Error('Prohibited Real base %s' % b) |
| 323 |
| 324 class SequenceEncoder(AbstractItemEncoder): |
| 325 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 326 value.setDefaultComponents() |
| 327 value.verifySizeSpec() |
| 328 substrate = null; idx = len(value) |
| 329 while idx > 0: |
| 330 idx = idx - 1 |
| 331 if value[idx] is None: # Optional component |
| 332 continue |
| 333 component = value.getDefaultComponentByPosition(idx) |
| 334 if component is not None and component == value[idx]: |
| 335 continue |
| 336 substrate = encodeFun( |
| 337 value[idx], defMode, maxChunkSize |
| 338 ) + substrate |
| 339 return substrate, 1 |
| 340 |
| 341 class SequenceOfEncoder(AbstractItemEncoder): |
| 342 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 343 value.verifySizeSpec() |
| 344 substrate = null; idx = len(value) |
| 345 while idx > 0: |
| 346 idx = idx - 1 |
| 347 substrate = encodeFun( |
| 348 value[idx], defMode, maxChunkSize |
| 349 ) + substrate |
| 350 return substrate, 1 |
| 351 |
| 352 class ChoiceEncoder(AbstractItemEncoder): |
| 353 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 354 return encodeFun(value.getComponent(), defMode, maxChunkSize), 1 |
| 355 |
| 356 class AnyEncoder(OctetStringEncoder): |
| 357 def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
| 358 return value.asOctets(), defMode == 0 |
| 359 |
| 360 tagMap = { |
| 361 eoo.endOfOctets.tagSet: EndOfOctetsEncoder(), |
| 362 univ.Boolean.tagSet: BooleanEncoder(), |
| 363 univ.Integer.tagSet: IntegerEncoder(), |
| 364 univ.BitString.tagSet: BitStringEncoder(), |
| 365 univ.OctetString.tagSet: OctetStringEncoder(), |
| 366 univ.Null.tagSet: NullEncoder(), |
| 367 univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(), |
| 368 univ.Enumerated.tagSet: IntegerEncoder(), |
| 369 univ.Real.tagSet: RealEncoder(), |
| 370 # Sequence & Set have same tags as SequenceOf & SetOf |
| 371 univ.SequenceOf.tagSet: SequenceOfEncoder(), |
| 372 univ.SetOf.tagSet: SequenceOfEncoder(), |
| 373 univ.Choice.tagSet: ChoiceEncoder(), |
| 374 # character string types |
| 375 char.UTF8String.tagSet: OctetStringEncoder(), |
| 376 char.NumericString.tagSet: OctetStringEncoder(), |
| 377 char.PrintableString.tagSet: OctetStringEncoder(), |
| 378 char.TeletexString.tagSet: OctetStringEncoder(), |
| 379 char.VideotexString.tagSet: OctetStringEncoder(), |
| 380 char.IA5String.tagSet: OctetStringEncoder(), |
| 381 char.GraphicString.tagSet: OctetStringEncoder(), |
| 382 char.VisibleString.tagSet: OctetStringEncoder(), |
| 383 char.GeneralString.tagSet: OctetStringEncoder(), |
| 384 char.UniversalString.tagSet: OctetStringEncoder(), |
| 385 char.BMPString.tagSet: OctetStringEncoder(), |
| 386 # useful types |
| 387 useful.ObjectDescriptor.tagSet: OctetStringEncoder(), |
| 388 useful.GeneralizedTime.tagSet: OctetStringEncoder(), |
| 389 useful.UTCTime.tagSet: OctetStringEncoder() |
| 390 } |
| 391 |
| 392 # Type-to-codec map for ambiguous ASN.1 types |
| 393 typeMap = { |
| 394 univ.Set.typeId: SequenceEncoder(), |
| 395 univ.SetOf.typeId: SequenceOfEncoder(), |
| 396 univ.Sequence.typeId: SequenceEncoder(), |
| 397 univ.SequenceOf.typeId: SequenceOfEncoder(), |
| 398 univ.Choice.typeId: ChoiceEncoder(), |
| 399 univ.Any.typeId: AnyEncoder() |
| 400 } |
| 401 |
| 402 class Encoder: |
| 403 supportIndefLength = True |
| 404 def __init__(self, tagMap, typeMap={}): |
| 405 self.__tagMap = tagMap |
| 406 self.__typeMap = typeMap |
| 407 |
| 408 def __call__(self, value, defMode=True, maxChunkSize=0): |
| 409 if not defMode and not self.supportIndefLength: |
| 410 raise error.PyAsn1Error('Indefinite length encoding not supported by
this codec') |
| 411 debug.logger & debug.flagEncoder and debug.logger('encoder called in %sd
ef mode, chunk size %s for type %s, value:\n%s' % (not defMode and 'in' or '', m
axChunkSize, value.prettyPrintType(), value.prettyPrint())) |
| 412 tagSet = value.getTagSet() |
| 413 if len(tagSet) > 1: |
| 414 concreteEncoder = explicitlyTaggedItemEncoder |
| 415 else: |
| 416 if value.typeId is not None and value.typeId in self.__typeMap: |
| 417 concreteEncoder = self.__typeMap[value.typeId] |
| 418 elif tagSet in self.__tagMap: |
| 419 concreteEncoder = self.__tagMap[tagSet] |
| 420 else: |
| 421 tagSet = value.baseTagSet |
| 422 if tagSet in self.__tagMap: |
| 423 concreteEncoder = self.__tagMap[tagSet] |
| 424 else: |
| 425 raise Error('No encoder for %s' % (value,)) |
| 426 debug.logger & debug.flagEncoder and debug.logger('using value codec %s
chosen by %s' % (concreteEncoder.__class__.__name__, tagSet)) |
| 427 substrate = concreteEncoder.encode( |
| 428 self, value, defMode, maxChunkSize |
| 429 ) |
| 430 debug.logger & debug.flagEncoder and debug.logger('built %s octets of su
bstrate: %s\nencoder completed' % (len(substrate), debug.hexdump(substrate))) |
| 431 return substrate |
| 432 |
| 433 encode = Encoder(tagMap, typeMap) |
OLD | NEW |