Index: third_party/google-endpoints/future/backports/email/_header_value_parser.py |
diff --git a/third_party/google-endpoints/future/backports/email/_header_value_parser.py b/third_party/google-endpoints/future/backports/email/_header_value_parser.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..43957edc12f3a39f2a81f5928371a94fcf160d61 |
--- /dev/null |
+++ b/third_party/google-endpoints/future/backports/email/_header_value_parser.py |
@@ -0,0 +1,2965 @@ |
+"""Header value parser implementing various email-related RFC parsing rules. |
+ |
+The parsing methods defined in this module implement various email related |
+parsing rules. Principal among them is RFC 5322, which is the followon |
+to RFC 2822 and primarily a clarification of the former. It also implements |
+RFC 2047 encoded word decoding. |
+ |
+RFC 5322 goes to considerable trouble to maintain backward compatibility with |
+RFC 822 in the parse phase, while cleaning up the structure on the generation |
+phase. This parser supports correct RFC 5322 generation by tagging white space |
+as folding white space only when folding is allowed in the non-obsolete rule |
+sets. Actually, the parser is even more generous when accepting input than RFC |
+5322 mandates, following the spirit of Postel's Law, which RFC 5322 encourages. |
+Where possible deviations from the standard are annotated on the 'defects' |
+attribute of tokens that deviate. |
+ |
+The general structure of the parser follows RFC 5322, and uses its terminology |
+where there is a direct correspondence. Where the implementation requires a |
+somewhat different structure than that used by the formal grammar, new terms |
+that mimic the closest existing terms are used. Thus, it really helps to have |
+a copy of RFC 5322 handy when studying this code. |
+ |
+Input to the parser is a string that has already been unfolded according to |
+RFC 5322 rules. According to the RFC this unfolding is the very first step, and |
+this parser leaves the unfolding step to a higher level message parser, which |
+will have already detected the line breaks that need unfolding while |
+determining the beginning and end of each header. |
+ |
+The output of the parser is a TokenList object, which is a list subclass. A |
+TokenList is a recursive data structure. The terminal nodes of the structure |
+are Terminal objects, which are subclasses of str. These do not correspond |
+directly to terminal objects in the formal grammar, but are instead more |
+practical higher level combinations of true terminals. |
+ |
+All TokenList and Terminal objects have a 'value' attribute, which produces the |
+semantically meaningful value of that part of the parse subtree. The value of |
+all whitespace tokens (no matter how many sub-tokens they may contain) is a |
+single space, as per the RFC rules. This includes 'CFWS', which is herein |
+included in the general class of whitespace tokens. There is one exception to |
+the rule that whitespace tokens are collapsed into single spaces in values: in |
+the value of a 'bare-quoted-string' (a quoted-string with no leading or |
+trailing whitespace), any whitespace that appeared between the quotation marks |
+is preserved in the returned value. Note that in all Terminal strings quoted |
+pairs are turned into their unquoted values. |
+ |
+All TokenList and Terminal objects also have a string value, which attempts to |
+be a "canonical" representation of the RFC-compliant form of the substring that |
+produced the parsed subtree, including minimal use of quoted pair quoting. |
+Whitespace runs are not collapsed. |
+ |
+Comment tokens also have a 'content' attribute providing the string found |
+between the parens (including any nested comments) with whitespace preserved. |
+ |
+All TokenList and Terminal objects have a 'defects' attribute which is a |
+possibly empty list all of the defects found while creating the token. Defects |
+may appear on any token in the tree, and a composite list of all defects in the |
+subtree is available through the 'all_defects' attribute of any node. (For |
+Terminal notes x.defects == x.all_defects.) |
+ |
+Each object in a parse tree is called a 'token', and each has a 'token_type' |
+attribute that gives the name from the RFC 5322 grammar that it represents. |
+Not all RFC 5322 nodes are produced, and there is one non-RFC 5322 node that |
+may be produced: 'ptext'. A 'ptext' is a string of printable ascii characters. |
+It is returned in place of lists of (ctext/quoted-pair) and |
+(qtext/quoted-pair). |
+ |
+XXX: provide complete list of token types. |
+""" |
+from __future__ import print_function |
+from __future__ import unicode_literals |
+from __future__ import division |
+from __future__ import absolute_import |
+from future.builtins import int, range, str, super, list |
+ |
+import re |
+from collections import namedtuple, OrderedDict |
+ |
+from future.backports.urllib.parse import (unquote, unquote_to_bytes) |
+from future.backports.email import _encoded_words as _ew |
+from future.backports.email import errors |
+from future.backports.email import utils |
+ |
+# |
+# Useful constants and functions |
+# |
+ |
+WSP = set(' \t') |
+CFWS_LEADER = WSP | set('(') |
+SPECIALS = set(r'()<>@,:;.\"[]') |
+ATOM_ENDS = SPECIALS | WSP |
+DOT_ATOM_ENDS = ATOM_ENDS - set('.') |
+# '.', '"', and '(' do not end phrases in order to support obs-phrase |
+PHRASE_ENDS = SPECIALS - set('."(') |
+TSPECIALS = (SPECIALS | set('/?=')) - set('.') |
+TOKEN_ENDS = TSPECIALS | WSP |
+ASPECIALS = TSPECIALS | set("*'%") |
+ATTRIBUTE_ENDS = ASPECIALS | WSP |
+EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%') |
+ |
+def quote_string(value): |
+ return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"' |
+ |
+# |
+# Accumulator for header folding |
+# |
+ |
+class _Folded(object): |
+ |
+ def __init__(self, maxlen, policy): |
+ self.maxlen = maxlen |
+ self.policy = policy |
+ self.lastlen = 0 |
+ self.stickyspace = None |
+ self.firstline = True |
+ self.done = [] |
+ self.current = list() # uses l.clear() |
+ |
+ def newline(self): |
+ self.done.extend(self.current) |
+ self.done.append(self.policy.linesep) |
+ self.current.clear() |
+ self.lastlen = 0 |
+ |
+ def finalize(self): |
+ if self.current: |
+ self.newline() |
+ |
+ def __str__(self): |
+ return ''.join(self.done) |
+ |
+ def append(self, stoken): |
+ self.current.append(stoken) |
+ |
+ def append_if_fits(self, token, stoken=None): |
+ if stoken is None: |
+ stoken = str(token) |
+ l = len(stoken) |
+ if self.stickyspace is not None: |
+ stickyspace_len = len(self.stickyspace) |
+ if self.lastlen + stickyspace_len + l <= self.maxlen: |
+ self.current.append(self.stickyspace) |
+ self.lastlen += stickyspace_len |
+ self.current.append(stoken) |
+ self.lastlen += l |
+ self.stickyspace = None |
+ self.firstline = False |
+ return True |
+ if token.has_fws: |
+ ws = token.pop_leading_fws() |
+ if ws is not None: |
+ self.stickyspace += str(ws) |
+ stickyspace_len += len(ws) |
+ token._fold(self) |
+ return True |
+ if stickyspace_len and l + 1 <= self.maxlen: |
+ margin = self.maxlen - l |
+ if 0 < margin < stickyspace_len: |
+ trim = stickyspace_len - margin |
+ self.current.append(self.stickyspace[:trim]) |
+ self.stickyspace = self.stickyspace[trim:] |
+ stickyspace_len = trim |
+ self.newline() |
+ self.current.append(self.stickyspace) |
+ self.current.append(stoken) |
+ self.lastlen = l + stickyspace_len |
+ self.stickyspace = None |
+ self.firstline = False |
+ return True |
+ if not self.firstline: |
+ self.newline() |
+ self.current.append(self.stickyspace) |
+ self.current.append(stoken) |
+ self.stickyspace = None |
+ self.firstline = False |
+ return True |
+ if self.lastlen + l <= self.maxlen: |
+ self.current.append(stoken) |
+ self.lastlen += l |
+ return True |
+ if l < self.maxlen: |
+ self.newline() |
+ self.current.append(stoken) |
+ self.lastlen = l |
+ return True |
+ return False |
+ |
+# |
+# TokenList and its subclasses |
+# |
+ |
+class TokenList(list): |
+ |
+ token_type = None |
+ |
+ def __init__(self, *args, **kw): |
+ super(TokenList, self).__init__(*args, **kw) |
+ self.defects = [] |
+ |
+ def __str__(self): |
+ return ''.join(str(x) for x in self) |
+ |
+ def __repr__(self): |
+ return '{}({})'.format(self.__class__.__name__, |
+ super(TokenList, self).__repr__()) |
+ |
+ @property |
+ def value(self): |
+ return ''.join(x.value for x in self if x.value) |
+ |
+ @property |
+ def all_defects(self): |
+ return sum((x.all_defects for x in self), self.defects) |
+ |
+ # |
+ # Folding API |
+ # |
+ # parts(): |
+ # |
+ # return a list of objects that constitute the "higher level syntactic |
+ # objects" specified by the RFC as the best places to fold a header line. |
+ # The returned objects must include leading folding white space, even if |
+ # this means mutating the underlying parse tree of the object. Each object |
+ # is only responsible for returning *its* parts, and should not drill down |
+ # to any lower level except as required to meet the leading folding white |
+ # space constraint. |
+ # |
+ # _fold(folded): |
+ # |
+ # folded: the result accumulator. This is an instance of _Folded. |
+ # (XXX: I haven't finished factoring this out yet, the folding code |
+ # pretty much uses this as a state object.) When the folded.current |
+ # contains as much text as will fit, the _fold method should call |
+ # folded.newline. |
+ # folded.lastlen: the current length of the test stored in folded.current. |
+ # folded.maxlen: The maximum number of characters that may appear on a |
+ # folded line. Differs from the policy setting in that "no limit" is |
+ # represented by +inf, which means it can be used in the trivially |
+ # logical fashion in comparisons. |
+ # |
+ # Currently no subclasses implement parts, and I think this will remain |
+ # true. A subclass only needs to implement _fold when the generic version |
+ # isn't sufficient. _fold will need to be implemented primarily when it is |
+ # possible for encoded words to appear in the specialized token-list, since |
+ # there is no generic algorithm that can know where exactly the encoded |
+ # words are allowed. A _fold implementation is responsible for filling |
+ # lines in the same general way that the top level _fold does. It may, and |
+ # should, call the _fold method of sub-objects in a similar fashion to that |
+ # of the top level _fold. |
+ # |
+ # XXX: I'm hoping it will be possible to factor the existing code further |
+ # to reduce redundancy and make the logic clearer. |
+ |
+ @property |
+ def parts(self): |
+ klass = self.__class__ |
+ this = list() |
+ for token in self: |
+ if token.startswith_fws(): |
+ if this: |
+ yield this[0] if len(this)==1 else klass(this) |
+ this.clear() |
+ end_ws = token.pop_trailing_ws() |
+ this.append(token) |
+ if end_ws: |
+ yield klass(this) |
+ this = [end_ws] |
+ if this: |
+ yield this[0] if len(this)==1 else klass(this) |
+ |
+ def startswith_fws(self): |
+ return self[0].startswith_fws() |
+ |
+ def pop_leading_fws(self): |
+ if self[0].token_type == 'fws': |
+ return self.pop(0) |
+ return self[0].pop_leading_fws() |
+ |
+ def pop_trailing_ws(self): |
+ if self[-1].token_type == 'cfws': |
+ return self.pop(-1) |
+ return self[-1].pop_trailing_ws() |
+ |
+ @property |
+ def has_fws(self): |
+ for part in self: |
+ if part.has_fws: |
+ return True |
+ return False |
+ |
+ def has_leading_comment(self): |
+ return self[0].has_leading_comment() |
+ |
+ @property |
+ def comments(self): |
+ comments = [] |
+ for token in self: |
+ comments.extend(token.comments) |
+ return comments |
+ |
+ def fold(self, **_3to2kwargs): |
+ # max_line_length 0/None means no limit, ie: infinitely long. |
+ policy = _3to2kwargs['policy']; del _3to2kwargs['policy'] |
+ maxlen = policy.max_line_length or float("+inf") |
+ folded = _Folded(maxlen, policy) |
+ self._fold(folded) |
+ folded.finalize() |
+ return str(folded) |
+ |
+ def as_encoded_word(self, charset): |
+ # This works only for things returned by 'parts', which include |
+ # the leading fws, if any, that should be used. |
+ res = [] |
+ ws = self.pop_leading_fws() |
+ if ws: |
+ res.append(ws) |
+ trailer = self.pop(-1) if self[-1].token_type=='fws' else '' |
+ res.append(_ew.encode(str(self), charset)) |
+ res.append(trailer) |
+ return ''.join(res) |
+ |
+ def cte_encode(self, charset, policy): |
+ res = [] |
+ for part in self: |
+ res.append(part.cte_encode(charset, policy)) |
+ return ''.join(res) |
+ |
+ def _fold(self, folded): |
+ for part in self.parts: |
+ tstr = str(part) |
+ tlen = len(tstr) |
+ try: |
+ str(part).encode('us-ascii') |
+ except UnicodeEncodeError: |
+ if any(isinstance(x, errors.UndecodableBytesDefect) |
+ for x in part.all_defects): |
+ charset = 'unknown-8bit' |
+ else: |
+ # XXX: this should be a policy setting |
+ charset = 'utf-8' |
+ tstr = part.cte_encode(charset, folded.policy) |
+ tlen = len(tstr) |
+ if folded.append_if_fits(part, tstr): |
+ continue |
+ # Peel off the leading whitespace if any and make it sticky, to |
+ # avoid infinite recursion. |
+ ws = part.pop_leading_fws() |
+ if ws is not None: |
+ # Peel off the leading whitespace and make it sticky, to |
+ # avoid infinite recursion. |
+ folded.stickyspace = str(part.pop(0)) |
+ if folded.append_if_fits(part): |
+ continue |
+ if part.has_fws: |
+ part._fold(folded) |
+ continue |
+ # There are no fold points in this one; it is too long for a single |
+ # line and can't be split...we just have to put it on its own line. |
+ folded.append(tstr) |
+ folded.newline() |
+ |
+ def pprint(self, indent=''): |
+ print('\n'.join(self._pp(indent=''))) |
+ |
+ def ppstr(self, indent=''): |
+ return '\n'.join(self._pp(indent='')) |
+ |
+ def _pp(self, indent=''): |
+ yield '{}{}/{}('.format( |
+ indent, |
+ self.__class__.__name__, |
+ self.token_type) |
+ for token in self: |
+ if not hasattr(token, '_pp'): |
+ yield (indent + ' !! invalid element in token ' |
+ 'list: {!r}'.format(token)) |
+ else: |
+ for line in token._pp(indent+' '): |
+ yield line |
+ if self.defects: |
+ extra = ' Defects: {}'.format(self.defects) |
+ else: |
+ extra = '' |
+ yield '{}){}'.format(indent, extra) |
+ |
+ |
+class WhiteSpaceTokenList(TokenList): |
+ |
+ @property |
+ def value(self): |
+ return ' ' |
+ |
+ @property |
+ def comments(self): |
+ return [x.content for x in self if x.token_type=='comment'] |
+ |
+ |
+class UnstructuredTokenList(TokenList): |
+ |
+ token_type = 'unstructured' |
+ |
+ def _fold(self, folded): |
+ if any(x.token_type=='encoded-word' for x in self): |
+ return self._fold_encoded(folded) |
+ # Here we can have either a pure ASCII string that may or may not |
+ # have surrogateescape encoded bytes, or a unicode string. |
+ last_ew = None |
+ for part in self.parts: |
+ tstr = str(part) |
+ is_ew = False |
+ try: |
+ str(part).encode('us-ascii') |
+ except UnicodeEncodeError: |
+ if any(isinstance(x, errors.UndecodableBytesDefect) |
+ for x in part.all_defects): |
+ charset = 'unknown-8bit' |
+ else: |
+ charset = 'utf-8' |
+ if last_ew is not None: |
+ # We've already done an EW, combine this one with it |
+ # if there's room. |
+ chunk = get_unstructured( |
+ ''.join(folded.current[last_ew:]+[tstr])).as_encoded_word(charset) |
+ oldlastlen = sum(len(x) for x in folded.current[:last_ew]) |
+ schunk = str(chunk) |
+ lchunk = len(schunk) |
+ if oldlastlen + lchunk <= folded.maxlen: |
+ del folded.current[last_ew:] |
+ folded.append(schunk) |
+ folded.lastlen = oldlastlen + lchunk |
+ continue |
+ tstr = part.as_encoded_word(charset) |
+ is_ew = True |
+ if folded.append_if_fits(part, tstr): |
+ if is_ew: |
+ last_ew = len(folded.current) - 1 |
+ continue |
+ if is_ew or last_ew: |
+ # It's too big to fit on the line, but since we've |
+ # got encoded words we can use encoded word folding. |
+ part._fold_as_ew(folded) |
+ continue |
+ # Peel off the leading whitespace if any and make it sticky, to |
+ # avoid infinite recursion. |
+ ws = part.pop_leading_fws() |
+ if ws is not None: |
+ folded.stickyspace = str(ws) |
+ if folded.append_if_fits(part): |
+ continue |
+ if part.has_fws: |
+ part.fold(folded) |
+ continue |
+ # It can't be split...we just have to put it on its own line. |
+ folded.append(tstr) |
+ folded.newline() |
+ last_ew = None |
+ |
+ def cte_encode(self, charset, policy): |
+ res = [] |
+ last_ew = None |
+ for part in self: |
+ spart = str(part) |
+ try: |
+ spart.encode('us-ascii') |
+ res.append(spart) |
+ except UnicodeEncodeError: |
+ if last_ew is None: |
+ res.append(part.cte_encode(charset, policy)) |
+ last_ew = len(res) |
+ else: |
+ tl = get_unstructured(''.join(res[last_ew:] + [spart])) |
+ res.append(tl.as_encoded_word()) |
+ return ''.join(res) |
+ |
+ |
+class Phrase(TokenList): |
+ |
+ token_type = 'phrase' |
+ |
+ def _fold(self, folded): |
+ # As with Unstructured, we can have pure ASCII with or without |
+ # surrogateescape encoded bytes, or we could have unicode. But this |
+ # case is more complicated, since we have to deal with the various |
+ # sub-token types and how they can be composed in the face of |
+ # unicode-that-needs-CTE-encoding, and the fact that if a token a |
+ # comment that becomes a barrier across which we can't compose encoded |
+ # words. |
+ last_ew = None |
+ for part in self.parts: |
+ tstr = str(part) |
+ tlen = len(tstr) |
+ has_ew = False |
+ try: |
+ str(part).encode('us-ascii') |
+ except UnicodeEncodeError: |
+ if any(isinstance(x, errors.UndecodableBytesDefect) |
+ for x in part.all_defects): |
+ charset = 'unknown-8bit' |
+ else: |
+ charset = 'utf-8' |
+ if last_ew is not None and not part.has_leading_comment(): |
+ # We've already done an EW, let's see if we can combine |
+ # this one with it. The last_ew logic ensures that all we |
+ # have at this point is atoms, no comments or quoted |
+ # strings. So we can treat the text between the last |
+ # encoded word and the content of this token as |
+ # unstructured text, and things will work correctly. But |
+ # we have to strip off any trailing comment on this token |
+ # first, and if it is a quoted string we have to pull out |
+ # the content (we're encoding it, so it no longer needs to |
+ # be quoted). |
+ if part[-1].token_type == 'cfws' and part.comments: |
+ remainder = part.pop(-1) |
+ else: |
+ remainder = '' |
+ for i, token in enumerate(part): |
+ if token.token_type == 'bare-quoted-string': |
+ part[i] = UnstructuredTokenList(token[:]) |
+ chunk = get_unstructured( |
+ ''.join(folded.current[last_ew:]+[tstr])).as_encoded_word(charset) |
+ schunk = str(chunk) |
+ lchunk = len(schunk) |
+ if last_ew + lchunk <= folded.maxlen: |
+ del folded.current[last_ew:] |
+ folded.append(schunk) |
+ folded.lastlen = sum(len(x) for x in folded.current) |
+ continue |
+ tstr = part.as_encoded_word(charset) |
+ tlen = len(tstr) |
+ has_ew = True |
+ if folded.append_if_fits(part, tstr): |
+ if has_ew and not part.comments: |
+ last_ew = len(folded.current) - 1 |
+ elif part.comments or part.token_type == 'quoted-string': |
+ # If a comment is involved we can't combine EWs. And if a |
+ # quoted string is involved, it's not worth the effort to |
+ # try to combine them. |
+ last_ew = None |
+ continue |
+ part._fold(folded) |
+ |
+ def cte_encode(self, charset, policy): |
+ res = [] |
+ last_ew = None |
+ is_ew = False |
+ for part in self: |
+ spart = str(part) |
+ try: |
+ spart.encode('us-ascii') |
+ res.append(spart) |
+ except UnicodeEncodeError: |
+ is_ew = True |
+ if last_ew is None: |
+ if not part.comments: |
+ last_ew = len(res) |
+ res.append(part.cte_encode(charset, policy)) |
+ elif not part.has_leading_comment(): |
+ if part[-1].token_type == 'cfws' and part.comments: |
+ remainder = part.pop(-1) |
+ else: |
+ remainder = '' |
+ for i, token in enumerate(part): |
+ if token.token_type == 'bare-quoted-string': |
+ part[i] = UnstructuredTokenList(token[:]) |
+ tl = get_unstructured(''.join(res[last_ew:] + [spart])) |
+ res[last_ew:] = [tl.as_encoded_word(charset)] |
+ if part.comments or (not is_ew and part.token_type == 'quoted-string'): |
+ last_ew = None |
+ return ''.join(res) |
+ |
+class Word(TokenList): |
+ |
+ token_type = 'word' |
+ |
+ |
+class CFWSList(WhiteSpaceTokenList): |
+ |
+ token_type = 'cfws' |
+ |
+ def has_leading_comment(self): |
+ return bool(self.comments) |
+ |
+ |
+class Atom(TokenList): |
+ |
+ token_type = 'atom' |
+ |
+ |
+class Token(TokenList): |
+ |
+ token_type = 'token' |
+ |
+ |
+class EncodedWord(TokenList): |
+ |
+ token_type = 'encoded-word' |
+ cte = None |
+ charset = None |
+ lang = None |
+ |
+ @property |
+ def encoded(self): |
+ if self.cte is not None: |
+ return self.cte |
+ _ew.encode(str(self), self.charset) |
+ |
+ |
+ |
+class QuotedString(TokenList): |
+ |
+ token_type = 'quoted-string' |
+ |
+ @property |
+ def content(self): |
+ for x in self: |
+ if x.token_type == 'bare-quoted-string': |
+ return x.value |
+ |
+ @property |
+ def quoted_value(self): |
+ res = [] |
+ for x in self: |
+ if x.token_type == 'bare-quoted-string': |
+ res.append(str(x)) |
+ else: |
+ res.append(x.value) |
+ return ''.join(res) |
+ |
+ @property |
+ def stripped_value(self): |
+ for token in self: |
+ if token.token_type == 'bare-quoted-string': |
+ return token.value |
+ |
+ |
+class BareQuotedString(QuotedString): |
+ |
+ token_type = 'bare-quoted-string' |
+ |
+ def __str__(self): |
+ return quote_string(''.join(str(x) for x in self)) |
+ |
+ @property |
+ def value(self): |
+ return ''.join(str(x) for x in self) |
+ |
+ |
+class Comment(WhiteSpaceTokenList): |
+ |
+ token_type = 'comment' |
+ |
+ def __str__(self): |
+ return ''.join(sum([ |
+ ["("], |
+ [self.quote(x) for x in self], |
+ [")"], |
+ ], [])) |
+ |
+ def quote(self, value): |
+ if value.token_type == 'comment': |
+ return str(value) |
+ return str(value).replace('\\', '\\\\').replace( |
+ '(', '\(').replace( |
+ ')', '\)') |
+ |
+ @property |
+ def content(self): |
+ return ''.join(str(x) for x in self) |
+ |
+ @property |
+ def comments(self): |
+ return [self.content] |
+ |
+class AddressList(TokenList): |
+ |
+ token_type = 'address-list' |
+ |
+ @property |
+ def addresses(self): |
+ return [x for x in self if x.token_type=='address'] |
+ |
+ @property |
+ def mailboxes(self): |
+ return sum((x.mailboxes |
+ for x in self if x.token_type=='address'), []) |
+ |
+ @property |
+ def all_mailboxes(self): |
+ return sum((x.all_mailboxes |
+ for x in self if x.token_type=='address'), []) |
+ |
+ |
+class Address(TokenList): |
+ |
+ token_type = 'address' |
+ |
+ @property |
+ def display_name(self): |
+ if self[0].token_type == 'group': |
+ return self[0].display_name |
+ |
+ @property |
+ def mailboxes(self): |
+ if self[0].token_type == 'mailbox': |
+ return [self[0]] |
+ elif self[0].token_type == 'invalid-mailbox': |
+ return [] |
+ return self[0].mailboxes |
+ |
+ @property |
+ def all_mailboxes(self): |
+ if self[0].token_type == 'mailbox': |
+ return [self[0]] |
+ elif self[0].token_type == 'invalid-mailbox': |
+ return [self[0]] |
+ return self[0].all_mailboxes |
+ |
+class MailboxList(TokenList): |
+ |
+ token_type = 'mailbox-list' |
+ |
+ @property |
+ def mailboxes(self): |
+ return [x for x in self if x.token_type=='mailbox'] |
+ |
+ @property |
+ def all_mailboxes(self): |
+ return [x for x in self |
+ if x.token_type in ('mailbox', 'invalid-mailbox')] |
+ |
+ |
+class GroupList(TokenList): |
+ |
+ token_type = 'group-list' |
+ |
+ @property |
+ def mailboxes(self): |
+ if not self or self[0].token_type != 'mailbox-list': |
+ return [] |
+ return self[0].mailboxes |
+ |
+ @property |
+ def all_mailboxes(self): |
+ if not self or self[0].token_type != 'mailbox-list': |
+ return [] |
+ return self[0].all_mailboxes |
+ |
+ |
+class Group(TokenList): |
+ |
+ token_type = "group" |
+ |
+ @property |
+ def mailboxes(self): |
+ if self[2].token_type != 'group-list': |
+ return [] |
+ return self[2].mailboxes |
+ |
+ @property |
+ def all_mailboxes(self): |
+ if self[2].token_type != 'group-list': |
+ return [] |
+ return self[2].all_mailboxes |
+ |
+ @property |
+ def display_name(self): |
+ return self[0].display_name |
+ |
+ |
+class NameAddr(TokenList): |
+ |
+ token_type = 'name-addr' |
+ |
+ @property |
+ def display_name(self): |
+ if len(self) == 1: |
+ return None |
+ return self[0].display_name |
+ |
+ @property |
+ def local_part(self): |
+ return self[-1].local_part |
+ |
+ @property |
+ def domain(self): |
+ return self[-1].domain |
+ |
+ @property |
+ def route(self): |
+ return self[-1].route |
+ |
+ @property |
+ def addr_spec(self): |
+ return self[-1].addr_spec |
+ |
+ |
+class AngleAddr(TokenList): |
+ |
+ token_type = 'angle-addr' |
+ |
+ @property |
+ def local_part(self): |
+ for x in self: |
+ if x.token_type == 'addr-spec': |
+ return x.local_part |
+ |
+ @property |
+ def domain(self): |
+ for x in self: |
+ if x.token_type == 'addr-spec': |
+ return x.domain |
+ |
+ @property |
+ def route(self): |
+ for x in self: |
+ if x.token_type == 'obs-route': |
+ return x.domains |
+ |
+ @property |
+ def addr_spec(self): |
+ for x in self: |
+ if x.token_type == 'addr-spec': |
+ return x.addr_spec |
+ else: |
+ return '<>' |
+ |
+ |
+class ObsRoute(TokenList): |
+ |
+ token_type = 'obs-route' |
+ |
+ @property |
+ def domains(self): |
+ return [x.domain for x in self if x.token_type == 'domain'] |
+ |
+ |
+class Mailbox(TokenList): |
+ |
+ token_type = 'mailbox' |
+ |
+ @property |
+ def display_name(self): |
+ if self[0].token_type == 'name-addr': |
+ return self[0].display_name |
+ |
+ @property |
+ def local_part(self): |
+ return self[0].local_part |
+ |
+ @property |
+ def domain(self): |
+ return self[0].domain |
+ |
+ @property |
+ def route(self): |
+ if self[0].token_type == 'name-addr': |
+ return self[0].route |
+ |
+ @property |
+ def addr_spec(self): |
+ return self[0].addr_spec |
+ |
+ |
+class InvalidMailbox(TokenList): |
+ |
+ token_type = 'invalid-mailbox' |
+ |
+ @property |
+ def display_name(self): |
+ return None |
+ |
+ local_part = domain = route = addr_spec = display_name |
+ |
+ |
+class Domain(TokenList): |
+ |
+ token_type = 'domain' |
+ |
+ @property |
+ def domain(self): |
+ return ''.join(super(Domain, self).value.split()) |
+ |
+ |
+class DotAtom(TokenList): |
+ |
+ token_type = 'dot-atom' |
+ |
+ |
+class DotAtomText(TokenList): |
+ |
+ token_type = 'dot-atom-text' |
+ |
+ |
+class AddrSpec(TokenList): |
+ |
+ token_type = 'addr-spec' |
+ |
+ @property |
+ def local_part(self): |
+ return self[0].local_part |
+ |
+ @property |
+ def domain(self): |
+ if len(self) < 3: |
+ return None |
+ return self[-1].domain |
+ |
+ @property |
+ def value(self): |
+ if len(self) < 3: |
+ return self[0].value |
+ return self[0].value.rstrip()+self[1].value+self[2].value.lstrip() |
+ |
+ @property |
+ def addr_spec(self): |
+ nameset = set(self.local_part) |
+ if len(nameset) > len(nameset-DOT_ATOM_ENDS): |
+ lp = quote_string(self.local_part) |
+ else: |
+ lp = self.local_part |
+ if self.domain is not None: |
+ return lp + '@' + self.domain |
+ return lp |
+ |
+ |
+class ObsLocalPart(TokenList): |
+ |
+ token_type = 'obs-local-part' |
+ |
+ |
+class DisplayName(Phrase): |
+ |
+ token_type = 'display-name' |
+ |
+ @property |
+ def display_name(self): |
+ res = TokenList(self) |
+ if res[0].token_type == 'cfws': |
+ res.pop(0) |
+ else: |
+ if res[0][0].token_type == 'cfws': |
+ res[0] = TokenList(res[0][1:]) |
+ if res[-1].token_type == 'cfws': |
+ res.pop() |
+ else: |
+ if res[-1][-1].token_type == 'cfws': |
+ res[-1] = TokenList(res[-1][:-1]) |
+ return res.value |
+ |
+ @property |
+ def value(self): |
+ quote = False |
+ if self.defects: |
+ quote = True |
+ else: |
+ for x in self: |
+ if x.token_type == 'quoted-string': |
+ quote = True |
+ if quote: |
+ pre = post = '' |
+ if self[0].token_type=='cfws' or self[0][0].token_type=='cfws': |
+ pre = ' ' |
+ if self[-1].token_type=='cfws' or self[-1][-1].token_type=='cfws': |
+ post = ' ' |
+ return pre+quote_string(self.display_name)+post |
+ else: |
+ return super(DisplayName, self).value |
+ |
+ |
+class LocalPart(TokenList): |
+ |
+ token_type = 'local-part' |
+ |
+ @property |
+ def value(self): |
+ if self[0].token_type == "quoted-string": |
+ return self[0].quoted_value |
+ else: |
+ return self[0].value |
+ |
+ @property |
+ def local_part(self): |
+ # Strip whitespace from front, back, and around dots. |
+ res = [DOT] |
+ last = DOT |
+ last_is_tl = False |
+ for tok in self[0] + [DOT]: |
+ if tok.token_type == 'cfws': |
+ continue |
+ if (last_is_tl and tok.token_type == 'dot' and |
+ last[-1].token_type == 'cfws'): |
+ res[-1] = TokenList(last[:-1]) |
+ is_tl = isinstance(tok, TokenList) |
+ if (is_tl and last.token_type == 'dot' and |
+ tok[0].token_type == 'cfws'): |
+ res.append(TokenList(tok[1:])) |
+ else: |
+ res.append(tok) |
+ last = res[-1] |
+ last_is_tl = is_tl |
+ res = TokenList(res[1:-1]) |
+ return res.value |
+ |
+ |
+class DomainLiteral(TokenList): |
+ |
+ token_type = 'domain-literal' |
+ |
+ @property |
+ def domain(self): |
+ return ''.join(super(DomainLiteral, self).value.split()) |
+ |
+ @property |
+ def ip(self): |
+ for x in self: |
+ if x.token_type == 'ptext': |
+ return x.value |
+ |
+ |
+class MIMEVersion(TokenList): |
+ |
+ token_type = 'mime-version' |
+ major = None |
+ minor = None |
+ |
+ |
+class Parameter(TokenList): |
+ |
+ token_type = 'parameter' |
+ sectioned = False |
+ extended = False |
+ charset = 'us-ascii' |
+ |
+ @property |
+ def section_number(self): |
+ # Because the first token, the attribute (name) eats CFWS, the second |
+ # token is always the section if there is one. |
+ return self[1].number if self.sectioned else 0 |
+ |
+ @property |
+ def param_value(self): |
+ # This is part of the "handle quoted extended parameters" hack. |
+ for token in self: |
+ if token.token_type == 'value': |
+ return token.stripped_value |
+ if token.token_type == 'quoted-string': |
+ for token in token: |
+ if token.token_type == 'bare-quoted-string': |
+ for token in token: |
+ if token.token_type == 'value': |
+ return token.stripped_value |
+ return '' |
+ |
+ |
+class InvalidParameter(Parameter): |
+ |
+ token_type = 'invalid-parameter' |
+ |
+ |
+class Attribute(TokenList): |
+ |
+ token_type = 'attribute' |
+ |
+ @property |
+ def stripped_value(self): |
+ for token in self: |
+ if token.token_type.endswith('attrtext'): |
+ return token.value |
+ |
+class Section(TokenList): |
+ |
+ token_type = 'section' |
+ number = None |
+ |
+ |
+class Value(TokenList): |
+ |
+ token_type = 'value' |
+ |
+ @property |
+ def stripped_value(self): |
+ token = self[0] |
+ if token.token_type == 'cfws': |
+ token = self[1] |
+ if token.token_type.endswith( |
+ ('quoted-string', 'attribute', 'extended-attribute')): |
+ return token.stripped_value |
+ return self.value |
+ |
+ |
+class MimeParameters(TokenList): |
+ |
+ token_type = 'mime-parameters' |
+ |
+ @property |
+ def params(self): |
+ # The RFC specifically states that the ordering of parameters is not |
+ # guaranteed and may be reordered by the transport layer. So we have |
+ # to assume the RFC 2231 pieces can come in any order. However, we |
+ # output them in the order that we first see a given name, which gives |
+ # us a stable __str__. |
+ params = OrderedDict() |
+ for token in self: |
+ if not token.token_type.endswith('parameter'): |
+ continue |
+ if token[0].token_type != 'attribute': |
+ continue |
+ name = token[0].value.strip() |
+ if name not in params: |
+ params[name] = [] |
+ params[name].append((token.section_number, token)) |
+ for name, parts in params.items(): |
+ parts = sorted(parts) |
+ # XXX: there might be more recovery we could do here if, for |
+ # example, this is really a case of a duplicate attribute name. |
+ value_parts = [] |
+ charset = parts[0][1].charset |
+ for i, (section_number, param) in enumerate(parts): |
+ if section_number != i: |
+ param.defects.append(errors.InvalidHeaderDefect( |
+ "inconsistent multipart parameter numbering")) |
+ value = param.param_value |
+ if param.extended: |
+ try: |
+ value = unquote_to_bytes(value) |
+ except UnicodeEncodeError: |
+ # source had surrogate escaped bytes. What we do now |
+ # is a bit of an open question. I'm not sure this is |
+ # the best choice, but it is what the old algorithm did |
+ value = unquote(value, encoding='latin-1') |
+ else: |
+ try: |
+ value = value.decode(charset, 'surrogateescape') |
+ except LookupError: |
+ # XXX: there should really be a custom defect for |
+ # unknown character set to make it easy to find, |
+ # because otherwise unknown charset is a silent |
+ # failure. |
+ value = value.decode('us-ascii', 'surrogateescape') |
+ if utils._has_surrogates(value): |
+ param.defects.append(errors.UndecodableBytesDefect()) |
+ value_parts.append(value) |
+ value = ''.join(value_parts) |
+ yield name, value |
+ |
+ def __str__(self): |
+ params = [] |
+ for name, value in self.params: |
+ if value: |
+ params.append('{}={}'.format(name, quote_string(value))) |
+ else: |
+ params.append(name) |
+ params = '; '.join(params) |
+ return ' ' + params if params else '' |
+ |
+ |
+class ParameterizedHeaderValue(TokenList): |
+ |
+ @property |
+ def params(self): |
+ for token in reversed(self): |
+ if token.token_type == 'mime-parameters': |
+ return token.params |
+ return {} |
+ |
+ @property |
+ def parts(self): |
+ if self and self[-1].token_type == 'mime-parameters': |
+ # We don't want to start a new line if all of the params don't fit |
+ # after the value, so unwrap the parameter list. |
+ return TokenList(self[:-1] + self[-1]) |
+ return TokenList(self).parts |
+ |
+ |
+class ContentType(ParameterizedHeaderValue): |
+ |
+ token_type = 'content-type' |
+ maintype = 'text' |
+ subtype = 'plain' |
+ |
+ |
+class ContentDisposition(ParameterizedHeaderValue): |
+ |
+ token_type = 'content-disposition' |
+ content_disposition = None |
+ |
+ |
+class ContentTransferEncoding(TokenList): |
+ |
+ token_type = 'content-transfer-encoding' |
+ cte = '7bit' |
+ |
+ |
+class HeaderLabel(TokenList): |
+ |
+ token_type = 'header-label' |
+ |
+ |
+class Header(TokenList): |
+ |
+ token_type = 'header' |
+ |
+ def _fold(self, folded): |
+ folded.append(str(self.pop(0))) |
+ folded.lastlen = len(folded.current[0]) |
+ # The first line of the header is different from all others: we don't |
+ # want to start a new object on a new line if it has any fold points in |
+ # it that would allow part of it to be on the first header line. |
+ # Further, if the first fold point would fit on the new line, we want |
+ # to do that, but if it doesn't we want to put it on the first line. |
+ # Folded supports this via the stickyspace attribute. If this |
+ # attribute is not None, it does the special handling. |
+ folded.stickyspace = str(self.pop(0)) if self[0].token_type == 'cfws' else '' |
+ rest = self.pop(0) |
+ if self: |
+ raise ValueError("Malformed Header token list") |
+ rest._fold(folded) |
+ |
+ |
+# |
+# Terminal classes and instances |
+# |
+ |
+class Terminal(str): |
+ |
+ def __new__(cls, value, token_type): |
+ self = super(Terminal, cls).__new__(cls, value) |
+ self.token_type = token_type |
+ self.defects = [] |
+ return self |
+ |
+ def __repr__(self): |
+ return "{}({})".format(self.__class__.__name__, super(Terminal, self).__repr__()) |
+ |
+ @property |
+ def all_defects(self): |
+ return list(self.defects) |
+ |
+ def _pp(self, indent=''): |
+ return ["{}{}/{}({}){}".format( |
+ indent, |
+ self.__class__.__name__, |
+ self.token_type, |
+ super(Terminal, self).__repr__(), |
+ '' if not self.defects else ' {}'.format(self.defects), |
+ )] |
+ |
+ def cte_encode(self, charset, policy): |
+ value = str(self) |
+ try: |
+ value.encode('us-ascii') |
+ return value |
+ except UnicodeEncodeError: |
+ return _ew.encode(value, charset) |
+ |
+ def pop_trailing_ws(self): |
+ # This terminates the recursion. |
+ return None |
+ |
+ def pop_leading_fws(self): |
+ # This terminates the recursion. |
+ return None |
+ |
+ @property |
+ def comments(self): |
+ return [] |
+ |
+ def has_leading_comment(self): |
+ return False |
+ |
+ def __getnewargs__(self): |
+ return(str(self), self.token_type) |
+ |
+ |
+class WhiteSpaceTerminal(Terminal): |
+ |
+ @property |
+ def value(self): |
+ return ' ' |
+ |
+ def startswith_fws(self): |
+ return True |
+ |
+ has_fws = True |
+ |
+ |
+class ValueTerminal(Terminal): |
+ |
+ @property |
+ def value(self): |
+ return self |
+ |
+ def startswith_fws(self): |
+ return False |
+ |
+ has_fws = False |
+ |
+ def as_encoded_word(self, charset): |
+ return _ew.encode(str(self), charset) |
+ |
+ |
+class EWWhiteSpaceTerminal(WhiteSpaceTerminal): |
+ |
+ @property |
+ def value(self): |
+ return '' |
+ |
+ @property |
+ def encoded(self): |
+ return self[:] |
+ |
+ def __str__(self): |
+ return '' |
+ |
+ has_fws = True |
+ |
+ |
+# XXX these need to become classes and used as instances so |
+# that a program can't change them in a parse tree and screw |
+# up other parse trees. Maybe should have tests for that, too. |
+DOT = ValueTerminal('.', 'dot') |
+ListSeparator = ValueTerminal(',', 'list-separator') |
+RouteComponentMarker = ValueTerminal('@', 'route-component-marker') |
+ |
+# |
+# Parser |
+# |
+ |
+"""Parse strings according to RFC822/2047/2822/5322 rules. |
+ |
+This is a stateless parser. Each get_XXX function accepts a string and |
+returns either a Terminal or a TokenList representing the RFC object named |
+by the method and a string containing the remaining unparsed characters |
+from the input. Thus a parser method consumes the next syntactic construct |
+of a given type and returns a token representing the construct plus the |
+unparsed remainder of the input string. |
+ |
+For example, if the first element of a structured header is a 'phrase', |
+then: |
+ |
+ phrase, value = get_phrase(value) |
+ |
+returns the complete phrase from the start of the string value, plus any |
+characters left in the string after the phrase is removed. |
+ |
+""" |
+ |
+_wsp_splitter = re.compile(r'([{}]+)'.format(''.join(WSP))).split |
+_non_atom_end_matcher = re.compile(r"[^{}]+".format( |
+ ''.join(ATOM_ENDS).replace('\\','\\\\').replace(']','\]'))).match |
+_non_printable_finder = re.compile(r"[\x00-\x20\x7F]").findall |
+_non_token_end_matcher = re.compile(r"[^{}]+".format( |
+ ''.join(TOKEN_ENDS).replace('\\','\\\\').replace(']','\]'))).match |
+_non_attribute_end_matcher = re.compile(r"[^{}]+".format( |
+ ''.join(ATTRIBUTE_ENDS).replace('\\','\\\\').replace(']','\]'))).match |
+_non_extended_attribute_end_matcher = re.compile(r"[^{}]+".format( |
+ ''.join(EXTENDED_ATTRIBUTE_ENDS).replace( |
+ '\\','\\\\').replace(']','\]'))).match |
+ |
+def _validate_xtext(xtext): |
+ """If input token contains ASCII non-printables, register a defect.""" |
+ |
+ non_printables = _non_printable_finder(xtext) |
+ if non_printables: |
+ xtext.defects.append(errors.NonPrintableDefect(non_printables)) |
+ if utils._has_surrogates(xtext): |
+ xtext.defects.append(errors.UndecodableBytesDefect( |
+ "Non-ASCII characters found in header token")) |
+ |
+def _get_ptext_to_endchars(value, endchars): |
+ """Scan printables/quoted-pairs until endchars and return unquoted ptext. |
+ |
+ This function turns a run of qcontent, ccontent-without-comments, or |
+ dtext-with-quoted-printables into a single string by unquoting any |
+ quoted printables. It returns the string, the remaining value, and |
+ a flag that is True iff there were any quoted printables decoded. |
+ |
+ """ |
+ _3to2list = list(_wsp_splitter(value, 1)) |
+ fragment, remainder, = _3to2list[:1] + [_3to2list[1:]] |
+ vchars = [] |
+ escape = False |
+ had_qp = False |
+ for pos in range(len(fragment)): |
+ if fragment[pos] == '\\': |
+ if escape: |
+ escape = False |
+ had_qp = True |
+ else: |
+ escape = True |
+ continue |
+ if escape: |
+ escape = False |
+ elif fragment[pos] in endchars: |
+ break |
+ vchars.append(fragment[pos]) |
+ else: |
+ pos = pos + 1 |
+ return ''.join(vchars), ''.join([fragment[pos:]] + remainder), had_qp |
+ |
+def _decode_ew_run(value): |
+ """ Decode a run of RFC2047 encoded words. |
+ |
+ _decode_ew_run(value) -> (text, value, defects) |
+ |
+ Scans the supplied value for a run of tokens that look like they are RFC |
+ 2047 encoded words, decodes those words into text according to RFC 2047 |
+ rules (whitespace between encoded words is discarded), and returns the text |
+ and the remaining value (including any leading whitespace on the remaining |
+ value), as well as a list of any defects encountered while decoding. The |
+ input value may not have any leading whitespace. |
+ |
+ """ |
+ res = [] |
+ defects = [] |
+ last_ws = '' |
+ while value: |
+ try: |
+ tok, ws, value = _wsp_splitter(value, 1) |
+ except ValueError: |
+ tok, ws, value = value, '', '' |
+ if not (tok.startswith('=?') and tok.endswith('?=')): |
+ return ''.join(res), last_ws + tok + ws + value, defects |
+ text, charset, lang, new_defects = _ew.decode(tok) |
+ res.append(text) |
+ defects.extend(new_defects) |
+ last_ws = ws |
+ return ''.join(res), last_ws, defects |
+ |
+def get_fws(value): |
+ """FWS = 1*WSP |
+ |
+ This isn't the RFC definition. We're using fws to represent tokens where |
+ folding can be done, but when we are parsing the *un*folding has already |
+ been done so we don't need to watch out for CRLF. |
+ |
+ """ |
+ newvalue = value.lstrip() |
+ fws = WhiteSpaceTerminal(value[:len(value)-len(newvalue)], 'fws') |
+ return fws, newvalue |
+ |
+def get_encoded_word(value): |
+ """ encoded-word = "=?" charset "?" encoding "?" encoded-text "?=" |
+ |
+ """ |
+ ew = EncodedWord() |
+ if not value.startswith('=?'): |
+ raise errors.HeaderParseError( |
+ "expected encoded word but found {}".format(value)) |
+ _3to2list1 = list(value[2:].split('?=', 1)) |
+ tok, remainder, = _3to2list1[:1] + [_3to2list1[1:]] |
+ if tok == value[2:]: |
+ raise errors.HeaderParseError( |
+ "expected encoded word but found {}".format(value)) |
+ remstr = ''.join(remainder) |
+ if remstr[:2].isdigit(): |
+ _3to2list3 = list(remstr.split('?=', 1)) |
+ rest, remainder, = _3to2list3[:1] + [_3to2list3[1:]] |
+ tok = tok + '?=' + rest |
+ if len(tok.split()) > 1: |
+ ew.defects.append(errors.InvalidHeaderDefect( |
+ "whitespace inside encoded word")) |
+ ew.cte = value |
+ value = ''.join(remainder) |
+ try: |
+ text, charset, lang, defects = _ew.decode('=?' + tok + '?=') |
+ except ValueError: |
+ raise errors.HeaderParseError( |
+ "encoded word format invalid: '{}'".format(ew.cte)) |
+ ew.charset = charset |
+ ew.lang = lang |
+ ew.defects.extend(defects) |
+ while text: |
+ if text[0] in WSP: |
+ token, text = get_fws(text) |
+ ew.append(token) |
+ continue |
+ _3to2list5 = list(_wsp_splitter(text, 1)) |
+ chars, remainder, = _3to2list5[:1] + [_3to2list5[1:]] |
+ vtext = ValueTerminal(chars, 'vtext') |
+ _validate_xtext(vtext) |
+ ew.append(vtext) |
+ text = ''.join(remainder) |
+ return ew, value |
+ |
+def get_unstructured(value): |
+ """unstructured = (*([FWS] vchar) *WSP) / obs-unstruct |
+ obs-unstruct = *((*LF *CR *(obs-utext) *LF *CR)) / FWS) |
+ obs-utext = %d0 / obs-NO-WS-CTL / LF / CR |
+ |
+ obs-NO-WS-CTL is control characters except WSP/CR/LF. |
+ |
+ So, basically, we have printable runs, plus control characters or nulls in |
+ the obsolete syntax, separated by whitespace. Since RFC 2047 uses the |
+ obsolete syntax in its specification, but requires whitespace on either |
+ side of the encoded words, I can see no reason to need to separate the |
+ non-printable-non-whitespace from the printable runs if they occur, so we |
+ parse this into xtext tokens separated by WSP tokens. |
+ |
+ Because an 'unstructured' value must by definition constitute the entire |
+ value, this 'get' routine does not return a remaining value, only the |
+ parsed TokenList. |
+ |
+ """ |
+ # XXX: but what about bare CR and LF? They might signal the start or |
+ # end of an encoded word. YAGNI for now, since out current parsers |
+ # will never send us strings with bard CR or LF. |
+ |
+ unstructured = UnstructuredTokenList() |
+ while value: |
+ if value[0] in WSP: |
+ token, value = get_fws(value) |
+ unstructured.append(token) |
+ continue |
+ if value.startswith('=?'): |
+ try: |
+ token, value = get_encoded_word(value) |
+ except errors.HeaderParseError: |
+ pass |
+ else: |
+ have_ws = True |
+ if len(unstructured) > 0: |
+ if unstructured[-1].token_type != 'fws': |
+ unstructured.defects.append(errors.InvalidHeaderDefect( |
+ "missing whitespace before encoded word")) |
+ have_ws = False |
+ if have_ws and len(unstructured) > 1: |
+ if unstructured[-2].token_type == 'encoded-word': |
+ unstructured[-1] = EWWhiteSpaceTerminal( |
+ unstructured[-1], 'fws') |
+ unstructured.append(token) |
+ continue |
+ _3to2list7 = list(_wsp_splitter(value, 1)) |
+ tok, remainder, = _3to2list7[:1] + [_3to2list7[1:]] |
+ vtext = ValueTerminal(tok, 'vtext') |
+ _validate_xtext(vtext) |
+ unstructured.append(vtext) |
+ value = ''.join(remainder) |
+ return unstructured |
+ |
+def get_qp_ctext(value): |
+ """ctext = <printable ascii except \ ( )> |
+ |
+ This is not the RFC ctext, since we are handling nested comments in comment |
+ and unquoting quoted-pairs here. We allow anything except the '()' |
+ characters, but if we find any ASCII other than the RFC defined printable |
+ ASCII an NonPrintableDefect is added to the token's defects list. Since |
+ quoted pairs are converted to their unquoted values, what is returned is |
+ a 'ptext' token. In this case it is a WhiteSpaceTerminal, so it's value |
+ is ' '. |
+ |
+ """ |
+ ptext, value, _ = _get_ptext_to_endchars(value, '()') |
+ ptext = WhiteSpaceTerminal(ptext, 'ptext') |
+ _validate_xtext(ptext) |
+ return ptext, value |
+ |
+def get_qcontent(value): |
+ """qcontent = qtext / quoted-pair |
+ |
+ We allow anything except the DQUOTE character, but if we find any ASCII |
+ other than the RFC defined printable ASCII an NonPrintableDefect is |
+ added to the token's defects list. Any quoted pairs are converted to their |
+ unquoted values, so what is returned is a 'ptext' token. In this case it |
+ is a ValueTerminal. |
+ |
+ """ |
+ ptext, value, _ = _get_ptext_to_endchars(value, '"') |
+ ptext = ValueTerminal(ptext, 'ptext') |
+ _validate_xtext(ptext) |
+ return ptext, value |
+ |
+def get_atext(value): |
+ """atext = <matches _atext_matcher> |
+ |
+ We allow any non-ATOM_ENDS in atext, but add an InvalidATextDefect to |
+ the token's defects list if we find non-atext characters. |
+ """ |
+ m = _non_atom_end_matcher(value) |
+ if not m: |
+ raise errors.HeaderParseError( |
+ "expected atext but found '{}'".format(value)) |
+ atext = m.group() |
+ value = value[len(atext):] |
+ atext = ValueTerminal(atext, 'atext') |
+ _validate_xtext(atext) |
+ return atext, value |
+ |
+def get_bare_quoted_string(value): |
+ """bare-quoted-string = DQUOTE *([FWS] qcontent) [FWS] DQUOTE |
+ |
+ A quoted-string without the leading or trailing white space. Its |
+ value is the text between the quote marks, with whitespace |
+ preserved and quoted pairs decoded. |
+ """ |
+ if value[0] != '"': |
+ raise errors.HeaderParseError( |
+ "expected '\"' but found '{}'".format(value)) |
+ bare_quoted_string = BareQuotedString() |
+ value = value[1:] |
+ while value and value[0] != '"': |
+ if value[0] in WSP: |
+ token, value = get_fws(value) |
+ else: |
+ token, value = get_qcontent(value) |
+ bare_quoted_string.append(token) |
+ if not value: |
+ bare_quoted_string.defects.append(errors.InvalidHeaderDefect( |
+ "end of header inside quoted string")) |
+ return bare_quoted_string, value |
+ return bare_quoted_string, value[1:] |
+ |
+def get_comment(value): |
+ """comment = "(" *([FWS] ccontent) [FWS] ")" |
+ ccontent = ctext / quoted-pair / comment |
+ |
+ We handle nested comments here, and quoted-pair in our qp-ctext routine. |
+ """ |
+ if value and value[0] != '(': |
+ raise errors.HeaderParseError( |
+ "expected '(' but found '{}'".format(value)) |
+ comment = Comment() |
+ value = value[1:] |
+ while value and value[0] != ")": |
+ if value[0] in WSP: |
+ token, value = get_fws(value) |
+ elif value[0] == '(': |
+ token, value = get_comment(value) |
+ else: |
+ token, value = get_qp_ctext(value) |
+ comment.append(token) |
+ if not value: |
+ comment.defects.append(errors.InvalidHeaderDefect( |
+ "end of header inside comment")) |
+ return comment, value |
+ return comment, value[1:] |
+ |
+def get_cfws(value): |
+ """CFWS = (1*([FWS] comment) [FWS]) / FWS |
+ |
+ """ |
+ cfws = CFWSList() |
+ while value and value[0] in CFWS_LEADER: |
+ if value[0] in WSP: |
+ token, value = get_fws(value) |
+ else: |
+ token, value = get_comment(value) |
+ cfws.append(token) |
+ return cfws, value |
+ |
+def get_quoted_string(value): |
+ """quoted-string = [CFWS] <bare-quoted-string> [CFWS] |
+ |
+ 'bare-quoted-string' is an intermediate class defined by this |
+ parser and not by the RFC grammar. It is the quoted string |
+ without any attached CFWS. |
+ """ |
+ quoted_string = QuotedString() |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ quoted_string.append(token) |
+ token, value = get_bare_quoted_string(value) |
+ quoted_string.append(token) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ quoted_string.append(token) |
+ return quoted_string, value |
+ |
+def get_atom(value): |
+ """atom = [CFWS] 1*atext [CFWS] |
+ |
+ """ |
+ atom = Atom() |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ atom.append(token) |
+ if value and value[0] in ATOM_ENDS: |
+ raise errors.HeaderParseError( |
+ "expected atom but found '{}'".format(value)) |
+ token, value = get_atext(value) |
+ atom.append(token) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ atom.append(token) |
+ return atom, value |
+ |
+def get_dot_atom_text(value): |
+ """ dot-text = 1*atext *("." 1*atext) |
+ |
+ """ |
+ dot_atom_text = DotAtomText() |
+ if not value or value[0] in ATOM_ENDS: |
+ raise errors.HeaderParseError("expected atom at a start of " |
+ "dot-atom-text but found '{}'".format(value)) |
+ while value and value[0] not in ATOM_ENDS: |
+ token, value = get_atext(value) |
+ dot_atom_text.append(token) |
+ if value and value[0] == '.': |
+ dot_atom_text.append(DOT) |
+ value = value[1:] |
+ if dot_atom_text[-1] is DOT: |
+ raise errors.HeaderParseError("expected atom at end of dot-atom-text " |
+ "but found '{}'".format('.'+value)) |
+ return dot_atom_text, value |
+ |
+def get_dot_atom(value): |
+ """ dot-atom = [CFWS] dot-atom-text [CFWS] |
+ |
+ """ |
+ dot_atom = DotAtom() |
+ if value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ dot_atom.append(token) |
+ token, value = get_dot_atom_text(value) |
+ dot_atom.append(token) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ dot_atom.append(token) |
+ return dot_atom, value |
+ |
+def get_word(value): |
+ """word = atom / quoted-string |
+ |
+ Either atom or quoted-string may start with CFWS. We have to peel off this |
+ CFWS first to determine which type of word to parse. Afterward we splice |
+ the leading CFWS, if any, into the parsed sub-token. |
+ |
+ If neither an atom or a quoted-string is found before the next special, a |
+ HeaderParseError is raised. |
+ |
+ The token returned is either an Atom or a QuotedString, as appropriate. |
+ This means the 'word' level of the formal grammar is not represented in the |
+ parse tree; this is because having that extra layer when manipulating the |
+ parse tree is more confusing than it is helpful. |
+ |
+ """ |
+ if value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ else: |
+ leader = None |
+ if value[0]=='"': |
+ token, value = get_quoted_string(value) |
+ elif value[0] in SPECIALS: |
+ raise errors.HeaderParseError("Expected 'atom' or 'quoted-string' " |
+ "but found '{}'".format(value)) |
+ else: |
+ token, value = get_atom(value) |
+ if leader is not None: |
+ token[:0] = [leader] |
+ return token, value |
+ |
+def get_phrase(value): |
+ """ phrase = 1*word / obs-phrase |
+ obs-phrase = word *(word / "." / CFWS) |
+ |
+ This means a phrase can be a sequence of words, periods, and CFWS in any |
+ order as long as it starts with at least one word. If anything other than |
+ words is detected, an ObsoleteHeaderDefect is added to the token's defect |
+ list. We also accept a phrase that starts with CFWS followed by a dot; |
+ this is registered as an InvalidHeaderDefect, since it is not supported by |
+ even the obsolete grammar. |
+ |
+ """ |
+ phrase = Phrase() |
+ try: |
+ token, value = get_word(value) |
+ phrase.append(token) |
+ except errors.HeaderParseError: |
+ phrase.defects.append(errors.InvalidHeaderDefect( |
+ "phrase does not start with word")) |
+ while value and value[0] not in PHRASE_ENDS: |
+ if value[0]=='.': |
+ phrase.append(DOT) |
+ phrase.defects.append(errors.ObsoleteHeaderDefect( |
+ "period in 'phrase'")) |
+ value = value[1:] |
+ else: |
+ try: |
+ token, value = get_word(value) |
+ except errors.HeaderParseError: |
+ if value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ phrase.defects.append(errors.ObsoleteHeaderDefect( |
+ "comment found without atom")) |
+ else: |
+ raise |
+ phrase.append(token) |
+ return phrase, value |
+ |
+def get_local_part(value): |
+ """ local-part = dot-atom / quoted-string / obs-local-part |
+ |
+ """ |
+ local_part = LocalPart() |
+ leader = None |
+ if value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ if not value: |
+ raise errors.HeaderParseError( |
+ "expected local-part but found '{}'".format(value)) |
+ try: |
+ token, value = get_dot_atom(value) |
+ except errors.HeaderParseError: |
+ try: |
+ token, value = get_word(value) |
+ except errors.HeaderParseError: |
+ if value[0] != '\\' and value[0] in PHRASE_ENDS: |
+ raise |
+ token = TokenList() |
+ if leader is not None: |
+ token[:0] = [leader] |
+ local_part.append(token) |
+ if value and (value[0]=='\\' or value[0] not in PHRASE_ENDS): |
+ obs_local_part, value = get_obs_local_part(str(local_part) + value) |
+ if obs_local_part.token_type == 'invalid-obs-local-part': |
+ local_part.defects.append(errors.InvalidHeaderDefect( |
+ "local-part is not dot-atom, quoted-string, or obs-local-part")) |
+ else: |
+ local_part.defects.append(errors.ObsoleteHeaderDefect( |
+ "local-part is not a dot-atom (contains CFWS)")) |
+ local_part[0] = obs_local_part |
+ try: |
+ local_part.value.encode('ascii') |
+ except UnicodeEncodeError: |
+ local_part.defects.append(errors.NonASCIILocalPartDefect( |
+ "local-part contains non-ASCII characters)")) |
+ return local_part, value |
+ |
+def get_obs_local_part(value): |
+ """ obs-local-part = word *("." word) |
+ """ |
+ obs_local_part = ObsLocalPart() |
+ last_non_ws_was_dot = False |
+ while value and (value[0]=='\\' or value[0] not in PHRASE_ENDS): |
+ if value[0] == '.': |
+ if last_non_ws_was_dot: |
+ obs_local_part.defects.append(errors.InvalidHeaderDefect( |
+ "invalid repeated '.'")) |
+ obs_local_part.append(DOT) |
+ last_non_ws_was_dot = True |
+ value = value[1:] |
+ continue |
+ elif value[0]=='\\': |
+ obs_local_part.append(ValueTerminal(value[0], |
+ 'misplaced-special')) |
+ value = value[1:] |
+ obs_local_part.defects.append(errors.InvalidHeaderDefect( |
+ "'\\' character outside of quoted-string/ccontent")) |
+ last_non_ws_was_dot = False |
+ continue |
+ if obs_local_part and obs_local_part[-1].token_type != 'dot': |
+ obs_local_part.defects.append(errors.InvalidHeaderDefect( |
+ "missing '.' between words")) |
+ try: |
+ token, value = get_word(value) |
+ last_non_ws_was_dot = False |
+ except errors.HeaderParseError: |
+ if value[0] not in CFWS_LEADER: |
+ raise |
+ token, value = get_cfws(value) |
+ obs_local_part.append(token) |
+ if (obs_local_part[0].token_type == 'dot' or |
+ obs_local_part[0].token_type=='cfws' and |
+ obs_local_part[1].token_type=='dot'): |
+ obs_local_part.defects.append(errors.InvalidHeaderDefect( |
+ "Invalid leading '.' in local part")) |
+ if (obs_local_part[-1].token_type == 'dot' or |
+ obs_local_part[-1].token_type=='cfws' and |
+ obs_local_part[-2].token_type=='dot'): |
+ obs_local_part.defects.append(errors.InvalidHeaderDefect( |
+ "Invalid trailing '.' in local part")) |
+ if obs_local_part.defects: |
+ obs_local_part.token_type = 'invalid-obs-local-part' |
+ return obs_local_part, value |
+ |
+def get_dtext(value): |
+ """ dtext = <printable ascii except \ [ ]> / obs-dtext |
+ obs-dtext = obs-NO-WS-CTL / quoted-pair |
+ |
+ We allow anything except the excluded characters, but if we find any |
+ ASCII other than the RFC defined printable ASCII an NonPrintableDefect is |
+ added to the token's defects list. Quoted pairs are converted to their |
+ unquoted values, so what is returned is a ptext token, in this case a |
+ ValueTerminal. If there were quoted-printables, an ObsoleteHeaderDefect is |
+ added to the returned token's defect list. |
+ |
+ """ |
+ ptext, value, had_qp = _get_ptext_to_endchars(value, '[]') |
+ ptext = ValueTerminal(ptext, 'ptext') |
+ if had_qp: |
+ ptext.defects.append(errors.ObsoleteHeaderDefect( |
+ "quoted printable found in domain-literal")) |
+ _validate_xtext(ptext) |
+ return ptext, value |
+ |
+def _check_for_early_dl_end(value, domain_literal): |
+ if value: |
+ return False |
+ domain_literal.append(errors.InvalidHeaderDefect( |
+ "end of input inside domain-literal")) |
+ domain_literal.append(ValueTerminal(']', 'domain-literal-end')) |
+ return True |
+ |
+def get_domain_literal(value): |
+ """ domain-literal = [CFWS] "[" *([FWS] dtext) [FWS] "]" [CFWS] |
+ |
+ """ |
+ domain_literal = DomainLiteral() |
+ if value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ domain_literal.append(token) |
+ if not value: |
+ raise errors.HeaderParseError("expected domain-literal") |
+ if value[0] != '[': |
+ raise errors.HeaderParseError("expected '[' at start of domain-literal " |
+ "but found '{}'".format(value)) |
+ value = value[1:] |
+ if _check_for_early_dl_end(value, domain_literal): |
+ return domain_literal, value |
+ domain_literal.append(ValueTerminal('[', 'domain-literal-start')) |
+ if value[0] in WSP: |
+ token, value = get_fws(value) |
+ domain_literal.append(token) |
+ token, value = get_dtext(value) |
+ domain_literal.append(token) |
+ if _check_for_early_dl_end(value, domain_literal): |
+ return domain_literal, value |
+ if value[0] in WSP: |
+ token, value = get_fws(value) |
+ domain_literal.append(token) |
+ if _check_for_early_dl_end(value, domain_literal): |
+ return domain_literal, value |
+ if value[0] != ']': |
+ raise errors.HeaderParseError("expected ']' at end of domain-literal " |
+ "but found '{}'".format(value)) |
+ domain_literal.append(ValueTerminal(']', 'domain-literal-end')) |
+ value = value[1:] |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ domain_literal.append(token) |
+ return domain_literal, value |
+ |
+def get_domain(value): |
+ """ domain = dot-atom / domain-literal / obs-domain |
+ obs-domain = atom *("." atom)) |
+ |
+ """ |
+ domain = Domain() |
+ leader = None |
+ if value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ if not value: |
+ raise errors.HeaderParseError( |
+ "expected domain but found '{}'".format(value)) |
+ if value[0] == '[': |
+ token, value = get_domain_literal(value) |
+ if leader is not None: |
+ token[:0] = [leader] |
+ domain.append(token) |
+ return domain, value |
+ try: |
+ token, value = get_dot_atom(value) |
+ except errors.HeaderParseError: |
+ token, value = get_atom(value) |
+ if leader is not None: |
+ token[:0] = [leader] |
+ domain.append(token) |
+ if value and value[0] == '.': |
+ domain.defects.append(errors.ObsoleteHeaderDefect( |
+ "domain is not a dot-atom (contains CFWS)")) |
+ if domain[0].token_type == 'dot-atom': |
+ domain[:] = domain[0] |
+ while value and value[0] == '.': |
+ domain.append(DOT) |
+ token, value = get_atom(value[1:]) |
+ domain.append(token) |
+ return domain, value |
+ |
+def get_addr_spec(value): |
+ """ addr-spec = local-part "@" domain |
+ |
+ """ |
+ addr_spec = AddrSpec() |
+ token, value = get_local_part(value) |
+ addr_spec.append(token) |
+ if not value or value[0] != '@': |
+ addr_spec.defects.append(errors.InvalidHeaderDefect( |
+ "add-spec local part with no domain")) |
+ return addr_spec, value |
+ addr_spec.append(ValueTerminal('@', 'address-at-symbol')) |
+ token, value = get_domain(value[1:]) |
+ addr_spec.append(token) |
+ return addr_spec, value |
+ |
+def get_obs_route(value): |
+ """ obs-route = obs-domain-list ":" |
+ obs-domain-list = *(CFWS / ",") "@" domain *("," [CFWS] ["@" domain]) |
+ |
+ Returns an obs-route token with the appropriate sub-tokens (that is, |
+ there is no obs-domain-list in the parse tree). |
+ """ |
+ obs_route = ObsRoute() |
+ while value and (value[0]==',' or value[0] in CFWS_LEADER): |
+ if value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ obs_route.append(token) |
+ elif value[0] == ',': |
+ obs_route.append(ListSeparator) |
+ value = value[1:] |
+ if not value or value[0] != '@': |
+ raise errors.HeaderParseError( |
+ "expected obs-route domain but found '{}'".format(value)) |
+ obs_route.append(RouteComponentMarker) |
+ token, value = get_domain(value[1:]) |
+ obs_route.append(token) |
+ while value and value[0]==',': |
+ obs_route.append(ListSeparator) |
+ value = value[1:] |
+ if not value: |
+ break |
+ if value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ obs_route.append(token) |
+ if value[0] == '@': |
+ obs_route.append(RouteComponentMarker) |
+ token, value = get_domain(value[1:]) |
+ obs_route.append(token) |
+ if not value: |
+ raise errors.HeaderParseError("end of header while parsing obs-route") |
+ if value[0] != ':': |
+ raise errors.HeaderParseError( "expected ':' marking end of " |
+ "obs-route but found '{}'".format(value)) |
+ obs_route.append(ValueTerminal(':', 'end-of-obs-route-marker')) |
+ return obs_route, value[1:] |
+ |
+def get_angle_addr(value): |
+ """ angle-addr = [CFWS] "<" addr-spec ">" [CFWS] / obs-angle-addr |
+ obs-angle-addr = [CFWS] "<" obs-route addr-spec ">" [CFWS] |
+ |
+ """ |
+ angle_addr = AngleAddr() |
+ if value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ angle_addr.append(token) |
+ if not value or value[0] != '<': |
+ raise errors.HeaderParseError( |
+ "expected angle-addr but found '{}'".format(value)) |
+ angle_addr.append(ValueTerminal('<', 'angle-addr-start')) |
+ value = value[1:] |
+ # Although it is not legal per RFC5322, SMTP uses '<>' in certain |
+ # circumstances. |
+ if value[0] == '>': |
+ angle_addr.append(ValueTerminal('>', 'angle-addr-end')) |
+ angle_addr.defects.append(errors.InvalidHeaderDefect( |
+ "null addr-spec in angle-addr")) |
+ value = value[1:] |
+ return angle_addr, value |
+ try: |
+ token, value = get_addr_spec(value) |
+ except errors.HeaderParseError: |
+ try: |
+ token, value = get_obs_route(value) |
+ angle_addr.defects.append(errors.ObsoleteHeaderDefect( |
+ "obsolete route specification in angle-addr")) |
+ except errors.HeaderParseError: |
+ raise errors.HeaderParseError( |
+ "expected addr-spec or obs-route but found '{}'".format(value)) |
+ angle_addr.append(token) |
+ token, value = get_addr_spec(value) |
+ angle_addr.append(token) |
+ if value and value[0] == '>': |
+ value = value[1:] |
+ else: |
+ angle_addr.defects.append(errors.InvalidHeaderDefect( |
+ "missing trailing '>' on angle-addr")) |
+ angle_addr.append(ValueTerminal('>', 'angle-addr-end')) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ angle_addr.append(token) |
+ return angle_addr, value |
+ |
+def get_display_name(value): |
+ """ display-name = phrase |
+ |
+ Because this is simply a name-rule, we don't return a display-name |
+ token containing a phrase, but rather a display-name token with |
+ the content of the phrase. |
+ |
+ """ |
+ display_name = DisplayName() |
+ token, value = get_phrase(value) |
+ display_name.extend(token[:]) |
+ display_name.defects = token.defects[:] |
+ return display_name, value |
+ |
+ |
+def get_name_addr(value): |
+ """ name-addr = [display-name] angle-addr |
+ |
+ """ |
+ name_addr = NameAddr() |
+ # Both the optional display name and the angle-addr can start with cfws. |
+ leader = None |
+ if value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ if not value: |
+ raise errors.HeaderParseError( |
+ "expected name-addr but found '{}'".format(leader)) |
+ if value[0] != '<': |
+ if value[0] in PHRASE_ENDS: |
+ raise errors.HeaderParseError( |
+ "expected name-addr but found '{}'".format(value)) |
+ token, value = get_display_name(value) |
+ if not value: |
+ raise errors.HeaderParseError( |
+ "expected name-addr but found '{}'".format(token)) |
+ if leader is not None: |
+ token[0][:0] = [leader] |
+ leader = None |
+ name_addr.append(token) |
+ token, value = get_angle_addr(value) |
+ if leader is not None: |
+ token[:0] = [leader] |
+ name_addr.append(token) |
+ return name_addr, value |
+ |
+def get_mailbox(value): |
+ """ mailbox = name-addr / addr-spec |
+ |
+ """ |
+ # The only way to figure out if we are dealing with a name-addr or an |
+ # addr-spec is to try parsing each one. |
+ mailbox = Mailbox() |
+ try: |
+ token, value = get_name_addr(value) |
+ except errors.HeaderParseError: |
+ try: |
+ token, value = get_addr_spec(value) |
+ except errors.HeaderParseError: |
+ raise errors.HeaderParseError( |
+ "expected mailbox but found '{}'".format(value)) |
+ if any(isinstance(x, errors.InvalidHeaderDefect) |
+ for x in token.all_defects): |
+ mailbox.token_type = 'invalid-mailbox' |
+ mailbox.append(token) |
+ return mailbox, value |
+ |
+def get_invalid_mailbox(value, endchars): |
+ """ Read everything up to one of the chars in endchars. |
+ |
+ This is outside the formal grammar. The InvalidMailbox TokenList that is |
+ returned acts like a Mailbox, but the data attributes are None. |
+ |
+ """ |
+ invalid_mailbox = InvalidMailbox() |
+ while value and value[0] not in endchars: |
+ if value[0] in PHRASE_ENDS: |
+ invalid_mailbox.append(ValueTerminal(value[0], |
+ 'misplaced-special')) |
+ value = value[1:] |
+ else: |
+ token, value = get_phrase(value) |
+ invalid_mailbox.append(token) |
+ return invalid_mailbox, value |
+ |
+def get_mailbox_list(value): |
+ """ mailbox-list = (mailbox *("," mailbox)) / obs-mbox-list |
+ obs-mbox-list = *([CFWS] ",") mailbox *("," [mailbox / CFWS]) |
+ |
+ For this routine we go outside the formal grammar in order to improve error |
+ handling. We recognize the end of the mailbox list only at the end of the |
+ value or at a ';' (the group terminator). This is so that we can turn |
+ invalid mailboxes into InvalidMailbox tokens and continue parsing any |
+ remaining valid mailboxes. We also allow all mailbox entries to be null, |
+ and this condition is handled appropriately at a higher level. |
+ |
+ """ |
+ mailbox_list = MailboxList() |
+ while value and value[0] != ';': |
+ try: |
+ token, value = get_mailbox(value) |
+ mailbox_list.append(token) |
+ except errors.HeaderParseError: |
+ leader = None |
+ if value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ if not value or value[0] in ',;': |
+ mailbox_list.append(leader) |
+ mailbox_list.defects.append(errors.ObsoleteHeaderDefect( |
+ "empty element in mailbox-list")) |
+ else: |
+ token, value = get_invalid_mailbox(value, ',;') |
+ if leader is not None: |
+ token[:0] = [leader] |
+ mailbox_list.append(token) |
+ mailbox_list.defects.append(errors.InvalidHeaderDefect( |
+ "invalid mailbox in mailbox-list")) |
+ elif value[0] == ',': |
+ mailbox_list.defects.append(errors.ObsoleteHeaderDefect( |
+ "empty element in mailbox-list")) |
+ else: |
+ token, value = get_invalid_mailbox(value, ',;') |
+ if leader is not None: |
+ token[:0] = [leader] |
+ mailbox_list.append(token) |
+ mailbox_list.defects.append(errors.InvalidHeaderDefect( |
+ "invalid mailbox in mailbox-list")) |
+ if value and value[0] not in ',;': |
+ # Crap after mailbox; treat it as an invalid mailbox. |
+ # The mailbox info will still be available. |
+ mailbox = mailbox_list[-1] |
+ mailbox.token_type = 'invalid-mailbox' |
+ token, value = get_invalid_mailbox(value, ',;') |
+ mailbox.extend(token) |
+ mailbox_list.defects.append(errors.InvalidHeaderDefect( |
+ "invalid mailbox in mailbox-list")) |
+ if value and value[0] == ',': |
+ mailbox_list.append(ListSeparator) |
+ value = value[1:] |
+ return mailbox_list, value |
+ |
+ |
+def get_group_list(value): |
+ """ group-list = mailbox-list / CFWS / obs-group-list |
+ obs-group-list = 1*([CFWS] ",") [CFWS] |
+ |
+ """ |
+ group_list = GroupList() |
+ if not value: |
+ group_list.defects.append(errors.InvalidHeaderDefect( |
+ "end of header before group-list")) |
+ return group_list, value |
+ leader = None |
+ if value and value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ if not value: |
+ # This should never happen in email parsing, since CFWS-only is a |
+ # legal alternative to group-list in a group, which is the only |
+ # place group-list appears. |
+ group_list.defects.append(errors.InvalidHeaderDefect( |
+ "end of header in group-list")) |
+ group_list.append(leader) |
+ return group_list, value |
+ if value[0] == ';': |
+ group_list.append(leader) |
+ return group_list, value |
+ token, value = get_mailbox_list(value) |
+ if len(token.all_mailboxes)==0: |
+ if leader is not None: |
+ group_list.append(leader) |
+ group_list.extend(token) |
+ group_list.defects.append(errors.ObsoleteHeaderDefect( |
+ "group-list with empty entries")) |
+ return group_list, value |
+ if leader is not None: |
+ token[:0] = [leader] |
+ group_list.append(token) |
+ return group_list, value |
+ |
+def get_group(value): |
+ """ group = display-name ":" [group-list] ";" [CFWS] |
+ |
+ """ |
+ group = Group() |
+ token, value = get_display_name(value) |
+ if not value or value[0] != ':': |
+ raise errors.HeaderParseError("expected ':' at end of group " |
+ "display name but found '{}'".format(value)) |
+ group.append(token) |
+ group.append(ValueTerminal(':', 'group-display-name-terminator')) |
+ value = value[1:] |
+ if value and value[0] == ';': |
+ group.append(ValueTerminal(';', 'group-terminator')) |
+ return group, value[1:] |
+ token, value = get_group_list(value) |
+ group.append(token) |
+ if not value: |
+ group.defects.append(errors.InvalidHeaderDefect( |
+ "end of header in group")) |
+ if value[0] != ';': |
+ raise errors.HeaderParseError( |
+ "expected ';' at end of group but found {}".format(value)) |
+ group.append(ValueTerminal(';', 'group-terminator')) |
+ value = value[1:] |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ group.append(token) |
+ return group, value |
+ |
+def get_address(value): |
+ """ address = mailbox / group |
+ |
+ Note that counter-intuitively, an address can be either a single address or |
+ a list of addresses (a group). This is why the returned Address object has |
+ a 'mailboxes' attribute which treats a single address as a list of length |
+ one. When you need to differentiate between to two cases, extract the single |
+ element, which is either a mailbox or a group token. |
+ |
+ """ |
+ # The formal grammar isn't very helpful when parsing an address. mailbox |
+ # and group, especially when allowing for obsolete forms, start off very |
+ # similarly. It is only when you reach one of @, <, or : that you know |
+ # what you've got. So, we try each one in turn, starting with the more |
+ # likely of the two. We could perhaps make this more efficient by looking |
+ # for a phrase and then branching based on the next character, but that |
+ # would be a premature optimization. |
+ address = Address() |
+ try: |
+ token, value = get_group(value) |
+ except errors.HeaderParseError: |
+ try: |
+ token, value = get_mailbox(value) |
+ except errors.HeaderParseError: |
+ raise errors.HeaderParseError( |
+ "expected address but found '{}'".format(value)) |
+ address.append(token) |
+ return address, value |
+ |
+def get_address_list(value): |
+ """ address_list = (address *("," address)) / obs-addr-list |
+ obs-addr-list = *([CFWS] ",") address *("," [address / CFWS]) |
+ |
+ We depart from the formal grammar here by continuing to parse until the end |
+ of the input, assuming the input to be entirely composed of an |
+ address-list. This is always true in email parsing, and allows us |
+ to skip invalid addresses to parse additional valid ones. |
+ |
+ """ |
+ address_list = AddressList() |
+ while value: |
+ try: |
+ token, value = get_address(value) |
+ address_list.append(token) |
+ except errors.HeaderParseError as err: |
+ leader = None |
+ if value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ if not value or value[0] == ',': |
+ address_list.append(leader) |
+ address_list.defects.append(errors.ObsoleteHeaderDefect( |
+ "address-list entry with no content")) |
+ else: |
+ token, value = get_invalid_mailbox(value, ',') |
+ if leader is not None: |
+ token[:0] = [leader] |
+ address_list.append(Address([token])) |
+ address_list.defects.append(errors.InvalidHeaderDefect( |
+ "invalid address in address-list")) |
+ elif value[0] == ',': |
+ address_list.defects.append(errors.ObsoleteHeaderDefect( |
+ "empty element in address-list")) |
+ else: |
+ token, value = get_invalid_mailbox(value, ',') |
+ if leader is not None: |
+ token[:0] = [leader] |
+ address_list.append(Address([token])) |
+ address_list.defects.append(errors.InvalidHeaderDefect( |
+ "invalid address in address-list")) |
+ if value and value[0] != ',': |
+ # Crap after address; treat it as an invalid mailbox. |
+ # The mailbox info will still be available. |
+ mailbox = address_list[-1][0] |
+ mailbox.token_type = 'invalid-mailbox' |
+ token, value = get_invalid_mailbox(value, ',') |
+ mailbox.extend(token) |
+ address_list.defects.append(errors.InvalidHeaderDefect( |
+ "invalid address in address-list")) |
+ if value: # Must be a , at this point. |
+ address_list.append(ValueTerminal(',', 'list-separator')) |
+ value = value[1:] |
+ return address_list, value |
+ |
+# |
+# XXX: As I begin to add additional header parsers, I'm realizing we probably |
+# have two level of parser routines: the get_XXX methods that get a token in |
+# the grammar, and parse_XXX methods that parse an entire field value. So |
+# get_address_list above should really be a parse_ method, as probably should |
+# be get_unstructured. |
+# |
+ |
+def parse_mime_version(value): |
+ """ mime-version = [CFWS] 1*digit [CFWS] "." [CFWS] 1*digit [CFWS] |
+ |
+ """ |
+ # The [CFWS] is implicit in the RFC 2045 BNF. |
+ # XXX: This routine is a bit verbose, should factor out a get_int method. |
+ mime_version = MIMEVersion() |
+ if not value: |
+ mime_version.defects.append(errors.HeaderMissingRequiredValue( |
+ "Missing MIME version number (eg: 1.0)")) |
+ return mime_version |
+ if value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ mime_version.append(token) |
+ if not value: |
+ mime_version.defects.append(errors.HeaderMissingRequiredValue( |
+ "Expected MIME version number but found only CFWS")) |
+ digits = '' |
+ while value and value[0] != '.' and value[0] not in CFWS_LEADER: |
+ digits += value[0] |
+ value = value[1:] |
+ if not digits.isdigit(): |
+ mime_version.defects.append(errors.InvalidHeaderDefect( |
+ "Expected MIME major version number but found {!r}".format(digits))) |
+ mime_version.append(ValueTerminal(digits, 'xtext')) |
+ else: |
+ mime_version.major = int(digits) |
+ mime_version.append(ValueTerminal(digits, 'digits')) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ mime_version.append(token) |
+ if not value or value[0] != '.': |
+ if mime_version.major is not None: |
+ mime_version.defects.append(errors.InvalidHeaderDefect( |
+ "Incomplete MIME version; found only major number")) |
+ if value: |
+ mime_version.append(ValueTerminal(value, 'xtext')) |
+ return mime_version |
+ mime_version.append(ValueTerminal('.', 'version-separator')) |
+ value = value[1:] |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ mime_version.append(token) |
+ if not value: |
+ if mime_version.major is not None: |
+ mime_version.defects.append(errors.InvalidHeaderDefect( |
+ "Incomplete MIME version; found only major number")) |
+ return mime_version |
+ digits = '' |
+ while value and value[0] not in CFWS_LEADER: |
+ digits += value[0] |
+ value = value[1:] |
+ if not digits.isdigit(): |
+ mime_version.defects.append(errors.InvalidHeaderDefect( |
+ "Expected MIME minor version number but found {!r}".format(digits))) |
+ mime_version.append(ValueTerminal(digits, 'xtext')) |
+ else: |
+ mime_version.minor = int(digits) |
+ mime_version.append(ValueTerminal(digits, 'digits')) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ mime_version.append(token) |
+ if value: |
+ mime_version.defects.append(errors.InvalidHeaderDefect( |
+ "Excess non-CFWS text after MIME version")) |
+ mime_version.append(ValueTerminal(value, 'xtext')) |
+ return mime_version |
+ |
+def get_invalid_parameter(value): |
+ """ Read everything up to the next ';'. |
+ |
+ This is outside the formal grammar. The InvalidParameter TokenList that is |
+ returned acts like a Parameter, but the data attributes are None. |
+ |
+ """ |
+ invalid_parameter = InvalidParameter() |
+ while value and value[0] != ';': |
+ if value[0] in PHRASE_ENDS: |
+ invalid_parameter.append(ValueTerminal(value[0], |
+ 'misplaced-special')) |
+ value = value[1:] |
+ else: |
+ token, value = get_phrase(value) |
+ invalid_parameter.append(token) |
+ return invalid_parameter, value |
+ |
+def get_ttext(value): |
+ """ttext = <matches _ttext_matcher> |
+ |
+ We allow any non-TOKEN_ENDS in ttext, but add defects to the token's |
+ defects list if we find non-ttext characters. We also register defects for |
+ *any* non-printables even though the RFC doesn't exclude all of them, |
+ because we follow the spirit of RFC 5322. |
+ |
+ """ |
+ m = _non_token_end_matcher(value) |
+ if not m: |
+ raise errors.HeaderParseError( |
+ "expected ttext but found '{}'".format(value)) |
+ ttext = m.group() |
+ value = value[len(ttext):] |
+ ttext = ValueTerminal(ttext, 'ttext') |
+ _validate_xtext(ttext) |
+ return ttext, value |
+ |
+def get_token(value): |
+ """token = [CFWS] 1*ttext [CFWS] |
+ |
+ The RFC equivalent of ttext is any US-ASCII chars except space, ctls, or |
+ tspecials. We also exclude tabs even though the RFC doesn't. |
+ |
+ The RFC implies the CFWS but is not explicit about it in the BNF. |
+ |
+ """ |
+ mtoken = Token() |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ mtoken.append(token) |
+ if value and value[0] in TOKEN_ENDS: |
+ raise errors.HeaderParseError( |
+ "expected token but found '{}'".format(value)) |
+ token, value = get_ttext(value) |
+ mtoken.append(token) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ mtoken.append(token) |
+ return mtoken, value |
+ |
+def get_attrtext(value): |
+ """attrtext = 1*(any non-ATTRIBUTE_ENDS character) |
+ |
+ We allow any non-ATTRIBUTE_ENDS in attrtext, but add defects to the |
+ token's defects list if we find non-attrtext characters. We also register |
+ defects for *any* non-printables even though the RFC doesn't exclude all of |
+ them, because we follow the spirit of RFC 5322. |
+ |
+ """ |
+ m = _non_attribute_end_matcher(value) |
+ if not m: |
+ raise errors.HeaderParseError( |
+ "expected attrtext but found {!r}".format(value)) |
+ attrtext = m.group() |
+ value = value[len(attrtext):] |
+ attrtext = ValueTerminal(attrtext, 'attrtext') |
+ _validate_xtext(attrtext) |
+ return attrtext, value |
+ |
+def get_attribute(value): |
+ """ [CFWS] 1*attrtext [CFWS] |
+ |
+ This version of the BNF makes the CFWS explicit, and as usual we use a |
+ value terminal for the actual run of characters. The RFC equivalent of |
+ attrtext is the token characters, with the subtraction of '*', "'", and '%'. |
+ We include tab in the excluded set just as we do for token. |
+ |
+ """ |
+ attribute = Attribute() |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ attribute.append(token) |
+ if value and value[0] in ATTRIBUTE_ENDS: |
+ raise errors.HeaderParseError( |
+ "expected token but found '{}'".format(value)) |
+ token, value = get_attrtext(value) |
+ attribute.append(token) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ attribute.append(token) |
+ return attribute, value |
+ |
+def get_extended_attrtext(value): |
+ """attrtext = 1*(any non-ATTRIBUTE_ENDS character plus '%') |
+ |
+ This is a special parsing routine so that we get a value that |
+ includes % escapes as a single string (which we decode as a single |
+ string later). |
+ |
+ """ |
+ m = _non_extended_attribute_end_matcher(value) |
+ if not m: |
+ raise errors.HeaderParseError( |
+ "expected extended attrtext but found {!r}".format(value)) |
+ attrtext = m.group() |
+ value = value[len(attrtext):] |
+ attrtext = ValueTerminal(attrtext, 'extended-attrtext') |
+ _validate_xtext(attrtext) |
+ return attrtext, value |
+ |
+def get_extended_attribute(value): |
+ """ [CFWS] 1*extended_attrtext [CFWS] |
+ |
+ This is like the non-extended version except we allow % characters, so that |
+ we can pick up an encoded value as a single string. |
+ |
+ """ |
+ # XXX: should we have an ExtendedAttribute TokenList? |
+ attribute = Attribute() |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ attribute.append(token) |
+ if value and value[0] in EXTENDED_ATTRIBUTE_ENDS: |
+ raise errors.HeaderParseError( |
+ "expected token but found '{}'".format(value)) |
+ token, value = get_extended_attrtext(value) |
+ attribute.append(token) |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ attribute.append(token) |
+ return attribute, value |
+ |
+def get_section(value): |
+ """ '*' digits |
+ |
+ The formal BNF is more complicated because leading 0s are not allowed. We |
+ check for that and add a defect. We also assume no CFWS is allowed between |
+ the '*' and the digits, though the RFC is not crystal clear on that. |
+ The caller should already have dealt with leading CFWS. |
+ |
+ """ |
+ section = Section() |
+ if not value or value[0] != '*': |
+ raise errors.HeaderParseError("Expected section but found {}".format( |
+ value)) |
+ section.append(ValueTerminal('*', 'section-marker')) |
+ value = value[1:] |
+ if not value or not value[0].isdigit(): |
+ raise errors.HeaderParseError("Expected section number but " |
+ "found {}".format(value)) |
+ digits = '' |
+ while value and value[0].isdigit(): |
+ digits += value[0] |
+ value = value[1:] |
+ if digits[0] == '0' and digits != '0': |
+ section.defects.append(errors.InvalidHeaderError("section number" |
+ "has an invalid leading 0")) |
+ section.number = int(digits) |
+ section.append(ValueTerminal(digits, 'digits')) |
+ return section, value |
+ |
+ |
+def get_value(value): |
+ """ quoted-string / attribute |
+ |
+ """ |
+ v = Value() |
+ if not value: |
+ raise errors.HeaderParseError("Expected value but found end of string") |
+ leader = None |
+ if value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ if not value: |
+ raise errors.HeaderParseError("Expected value but found " |
+ "only {}".format(leader)) |
+ if value[0] == '"': |
+ token, value = get_quoted_string(value) |
+ else: |
+ token, value = get_extended_attribute(value) |
+ if leader is not None: |
+ token[:0] = [leader] |
+ v.append(token) |
+ return v, value |
+ |
+def get_parameter(value): |
+ """ attribute [section] ["*"] [CFWS] "=" value |
+ |
+ The CFWS is implied by the RFC but not made explicit in the BNF. This |
+ simplified form of the BNF from the RFC is made to conform with the RFC BNF |
+ through some extra checks. We do it this way because it makes both error |
+ recovery and working with the resulting parse tree easier. |
+ """ |
+ # It is possible CFWS would also be implicitly allowed between the section |
+ # and the 'extended-attribute' marker (the '*') , but we've never seen that |
+ # in the wild and we will therefore ignore the possibility. |
+ param = Parameter() |
+ token, value = get_attribute(value) |
+ param.append(token) |
+ if not value or value[0] == ';': |
+ param.defects.append(errors.InvalidHeaderDefect("Parameter contains " |
+ "name ({}) but no value".format(token))) |
+ return param, value |
+ if value[0] == '*': |
+ try: |
+ token, value = get_section(value) |
+ param.sectioned = True |
+ param.append(token) |
+ except errors.HeaderParseError: |
+ pass |
+ if not value: |
+ raise errors.HeaderParseError("Incomplete parameter") |
+ if value[0] == '*': |
+ param.append(ValueTerminal('*', 'extended-parameter-marker')) |
+ value = value[1:] |
+ param.extended = True |
+ if value[0] != '=': |
+ raise errors.HeaderParseError("Parameter not followed by '='") |
+ param.append(ValueTerminal('=', 'parameter-separator')) |
+ value = value[1:] |
+ leader = None |
+ if value and value[0] in CFWS_LEADER: |
+ token, value = get_cfws(value) |
+ param.append(token) |
+ remainder = None |
+ appendto = param |
+ if param.extended and value and value[0] == '"': |
+ # Now for some serious hackery to handle the common invalid case of |
+ # double quotes around an extended value. We also accept (with defect) |
+ # a value marked as encoded that isn't really. |
+ qstring, remainder = get_quoted_string(value) |
+ inner_value = qstring.stripped_value |
+ semi_valid = False |
+ if param.section_number == 0: |
+ if inner_value and inner_value[0] == "'": |
+ semi_valid = True |
+ else: |
+ token, rest = get_attrtext(inner_value) |
+ if rest and rest[0] == "'": |
+ semi_valid = True |
+ else: |
+ try: |
+ token, rest = get_extended_attrtext(inner_value) |
+ except: |
+ pass |
+ else: |
+ if not rest: |
+ semi_valid = True |
+ if semi_valid: |
+ param.defects.append(errors.InvalidHeaderDefect( |
+ "Quoted string value for extended parameter is invalid")) |
+ param.append(qstring) |
+ for t in qstring: |
+ if t.token_type == 'bare-quoted-string': |
+ t[:] = [] |
+ appendto = t |
+ break |
+ value = inner_value |
+ else: |
+ remainder = None |
+ param.defects.append(errors.InvalidHeaderDefect( |
+ "Parameter marked as extended but appears to have a " |
+ "quoted string value that is non-encoded")) |
+ if value and value[0] == "'": |
+ token = None |
+ else: |
+ token, value = get_value(value) |
+ if not param.extended or param.section_number > 0: |
+ if not value or value[0] != "'": |
+ appendto.append(token) |
+ if remainder is not None: |
+ assert not value, value |
+ value = remainder |
+ return param, value |
+ param.defects.append(errors.InvalidHeaderDefect( |
+ "Apparent initial-extended-value but attribute " |
+ "was not marked as extended or was not initial section")) |
+ if not value: |
+ # Assume the charset/lang is missing and the token is the value. |
+ param.defects.append(errors.InvalidHeaderDefect( |
+ "Missing required charset/lang delimiters")) |
+ appendto.append(token) |
+ if remainder is None: |
+ return param, value |
+ else: |
+ if token is not None: |
+ for t in token: |
+ if t.token_type == 'extended-attrtext': |
+ break |
+ t.token_type == 'attrtext' |
+ appendto.append(t) |
+ param.charset = t.value |
+ if value[0] != "'": |
+ raise errors.HeaderParseError("Expected RFC2231 char/lang encoding " |
+ "delimiter, but found {!r}".format(value)) |
+ appendto.append(ValueTerminal("'", 'RFC2231 delimiter')) |
+ value = value[1:] |
+ if value and value[0] != "'": |
+ token, value = get_attrtext(value) |
+ appendto.append(token) |
+ param.lang = token.value |
+ if not value or value[0] != "'": |
+ raise errors.HeaderParseError("Expected RFC2231 char/lang encoding " |
+ "delimiter, but found {}".format(value)) |
+ appendto.append(ValueTerminal("'", 'RFC2231 delimiter')) |
+ value = value[1:] |
+ if remainder is not None: |
+ # Treat the rest of value as bare quoted string content. |
+ v = Value() |
+ while value: |
+ if value[0] in WSP: |
+ token, value = get_fws(value) |
+ else: |
+ token, value = get_qcontent(value) |
+ v.append(token) |
+ token = v |
+ else: |
+ token, value = get_value(value) |
+ appendto.append(token) |
+ if remainder is not None: |
+ assert not value, value |
+ value = remainder |
+ return param, value |
+ |
+def parse_mime_parameters(value): |
+ """ parameter *( ";" parameter ) |
+ |
+ That BNF is meant to indicate this routine should only be called after |
+ finding and handling the leading ';'. There is no corresponding rule in |
+ the formal RFC grammar, but it is more convenient for us for the set of |
+ parameters to be treated as its own TokenList. |
+ |
+ This is 'parse' routine because it consumes the reminaing value, but it |
+ would never be called to parse a full header. Instead it is called to |
+ parse everything after the non-parameter value of a specific MIME header. |
+ |
+ """ |
+ mime_parameters = MimeParameters() |
+ while value: |
+ try: |
+ token, value = get_parameter(value) |
+ mime_parameters.append(token) |
+ except errors.HeaderParseError as err: |
+ leader = None |
+ if value[0] in CFWS_LEADER: |
+ leader, value = get_cfws(value) |
+ if not value: |
+ mime_parameters.append(leader) |
+ return mime_parameters |
+ if value[0] == ';': |
+ if leader is not None: |
+ mime_parameters.append(leader) |
+ mime_parameters.defects.append(errors.InvalidHeaderDefect( |
+ "parameter entry with no content")) |
+ else: |
+ token, value = get_invalid_parameter(value) |
+ if leader: |
+ token[:0] = [leader] |
+ mime_parameters.append(token) |
+ mime_parameters.defects.append(errors.InvalidHeaderDefect( |
+ "invalid parameter {!r}".format(token))) |
+ if value and value[0] != ';': |
+ # Junk after the otherwise valid parameter. Mark it as |
+ # invalid, but it will have a value. |
+ param = mime_parameters[-1] |
+ param.token_type = 'invalid-parameter' |
+ token, value = get_invalid_parameter(value) |
+ param.extend(token) |
+ mime_parameters.defects.append(errors.InvalidHeaderDefect( |
+ "parameter with invalid trailing text {!r}".format(token))) |
+ if value: |
+ # Must be a ';' at this point. |
+ mime_parameters.append(ValueTerminal(';', 'parameter-separator')) |
+ value = value[1:] |
+ return mime_parameters |
+ |
+def _find_mime_parameters(tokenlist, value): |
+ """Do our best to find the parameters in an invalid MIME header |
+ |
+ """ |
+ while value and value[0] != ';': |
+ if value[0] in PHRASE_ENDS: |
+ tokenlist.append(ValueTerminal(value[0], 'misplaced-special')) |
+ value = value[1:] |
+ else: |
+ token, value = get_phrase(value) |
+ tokenlist.append(token) |
+ if not value: |
+ return |
+ tokenlist.append(ValueTerminal(';', 'parameter-separator')) |
+ tokenlist.append(parse_mime_parameters(value[1:])) |
+ |
+def parse_content_type_header(value): |
+ """ maintype "/" subtype *( ";" parameter ) |
+ |
+ The maintype and substype are tokens. Theoretically they could |
+ be checked against the official IANA list + x-token, but we |
+ don't do that. |
+ """ |
+ ctype = ContentType() |
+ recover = False |
+ if not value: |
+ ctype.defects.append(errors.HeaderMissingRequiredValue( |
+ "Missing content type specification")) |
+ return ctype |
+ try: |
+ token, value = get_token(value) |
+ except errors.HeaderParseError: |
+ ctype.defects.append(errors.InvalidHeaderDefect( |
+ "Expected content maintype but found {!r}".format(value))) |
+ _find_mime_parameters(ctype, value) |
+ return ctype |
+ ctype.append(token) |
+ # XXX: If we really want to follow the formal grammer we should make |
+ # mantype and subtype specialized TokenLists here. Probably not worth it. |
+ if not value or value[0] != '/': |
+ ctype.defects.append(errors.InvalidHeaderDefect( |
+ "Invalid content type")) |
+ if value: |
+ _find_mime_parameters(ctype, value) |
+ return ctype |
+ ctype.maintype = token.value.strip().lower() |
+ ctype.append(ValueTerminal('/', 'content-type-separator')) |
+ value = value[1:] |
+ try: |
+ token, value = get_token(value) |
+ except errors.HeaderParseError: |
+ ctype.defects.append(errors.InvalidHeaderDefect( |
+ "Expected content subtype but found {!r}".format(value))) |
+ _find_mime_parameters(ctype, value) |
+ return ctype |
+ ctype.append(token) |
+ ctype.subtype = token.value.strip().lower() |
+ if not value: |
+ return ctype |
+ if value[0] != ';': |
+ ctype.defects.append(errors.InvalidHeaderDefect( |
+ "Only parameters are valid after content type, but " |
+ "found {!r}".format(value))) |
+ # The RFC requires that a syntactically invalid content-type be treated |
+ # as text/plain. Perhaps we should postel this, but we should probably |
+ # only do that if we were checking the subtype value against IANA. |
+ del ctype.maintype, ctype.subtype |
+ _find_mime_parameters(ctype, value) |
+ return ctype |
+ ctype.append(ValueTerminal(';', 'parameter-separator')) |
+ ctype.append(parse_mime_parameters(value[1:])) |
+ return ctype |
+ |
+def parse_content_disposition_header(value): |
+ """ disposition-type *( ";" parameter ) |
+ |
+ """ |
+ disp_header = ContentDisposition() |
+ if not value: |
+ disp_header.defects.append(errors.HeaderMissingRequiredValue( |
+ "Missing content disposition")) |
+ return disp_header |
+ try: |
+ token, value = get_token(value) |
+ except errors.HeaderParseError: |
+ ctype.defects.append(errors.InvalidHeaderDefect( |
+ "Expected content disposition but found {!r}".format(value))) |
+ _find_mime_parameters(disp_header, value) |
+ return disp_header |
+ disp_header.append(token) |
+ disp_header.content_disposition = token.value.strip().lower() |
+ if not value: |
+ return disp_header |
+ if value[0] != ';': |
+ disp_header.defects.append(errors.InvalidHeaderDefect( |
+ "Only parameters are valid after content disposition, but " |
+ "found {!r}".format(value))) |
+ _find_mime_parameters(disp_header, value) |
+ return disp_header |
+ disp_header.append(ValueTerminal(';', 'parameter-separator')) |
+ disp_header.append(parse_mime_parameters(value[1:])) |
+ return disp_header |
+ |
+def parse_content_transfer_encoding_header(value): |
+ """ mechanism |
+ |
+ """ |
+ # We should probably validate the values, since the list is fixed. |
+ cte_header = ContentTransferEncoding() |
+ if not value: |
+ cte_header.defects.append(errors.HeaderMissingRequiredValue( |
+ "Missing content transfer encoding")) |
+ return cte_header |
+ try: |
+ token, value = get_token(value) |
+ except errors.HeaderParseError: |
+ ctype.defects.append(errors.InvalidHeaderDefect( |
+ "Expected content trnasfer encoding but found {!r}".format(value))) |
+ else: |
+ cte_header.append(token) |
+ cte_header.cte = token.value.strip().lower() |
+ if not value: |
+ return cte_header |
+ while value: |
+ cte_header.defects.append(errors.InvalidHeaderDefect( |
+ "Extra text after content transfer encoding")) |
+ if value[0] in PHRASE_ENDS: |
+ cte_header.append(ValueTerminal(value[0], 'misplaced-special')) |
+ value = value[1:] |
+ else: |
+ token, value = get_phrase(value) |
+ cte_header.append(token) |
+ return cte_header |