Index: third_party/logilab/astroid/scoped_nodes.py |
diff --git a/third_party/logilab/astroid/scoped_nodes.py b/third_party/logilab/astroid/scoped_nodes.py |
index f9ec7b774f86c4821ff457b8eb19100ab3217d62..db39b8b42161097cbf05f500752b38fdfedb26a6 100644 |
--- a/third_party/logilab/astroid/scoped_nodes.py |
+++ b/third_party/logilab/astroid/scoped_nodes.py |
@@ -24,6 +24,7 @@ from __future__ import with_statement |
__doctype__ = "restructuredtext en" |
import sys |
+import warnings |
from itertools import chain |
try: |
from io import BytesIO |
@@ -35,7 +36,7 @@ from logilab.common.compat import builtins |
from logilab.common.decorators import cached, cachedproperty |
from astroid.exceptions import NotFoundError, \ |
- AstroidBuildingException, InferenceError |
+ AstroidBuildingException, InferenceError, ResolveError |
from astroid.node_classes import Const, DelName, DelAttr, \ |
Dict, From, List, Pass, Raise, Return, Tuple, Yield, YieldFrom, \ |
LookupMixIn, const_factory as cf, unpack_infer, Name, CallFunc |
@@ -49,6 +50,46 @@ from astroid.manager import AstroidManager |
ITER_METHODS = ('__iter__', '__getitem__') |
PY3K = sys.version_info >= (3, 0) |
+def _c3_merge(sequences): |
+ """Merges MROs in *sequences* to a single MRO using the C3 algorithm. |
+ |
+ Adapted from http://www.python.org/download/releases/2.3/mro/. |
+ |
+ """ |
+ result = [] |
+ while True: |
+ sequences = [s for s in sequences if s] # purge empty sequences |
+ if not sequences: |
+ return result |
+ for s1 in sequences: # find merge candidates among seq heads |
+ candidate = s1[0] |
+ for s2 in sequences: |
+ if candidate in s2[1:]: |
+ candidate = None |
+ break # reject the current head, it appears later |
+ else: |
+ break |
+ if not candidate: |
+ # Show all the remaining bases, which were considered as |
+ # candidates for the next mro sequence. |
+ bases = ["({})".format(", ".join(base.name |
+ for base in subsequence)) |
+ for subsequence in sequences] |
+ raise ResolveError("Cannot create a consistent method resolution " |
+ "order for bases %s" % ", ".join(bases)) |
+ result.append(candidate) |
+ # remove the chosen candidate |
+ for seq in sequences: |
+ if seq[0] == candidate: |
+ del seq[0] |
+ |
+ |
+def _verify_duplicates_mro(sequences): |
+ for sequence in sequences: |
+ names = [node.qname() for node in sequence] |
+ if len(names) != len(set(names)): |
+ raise ResolveError('Duplicates found in the mro.') |
+ |
def remove_nodes(func, cls): |
def wrapper(*args, **kwargs): |
@@ -257,14 +298,37 @@ class Module(LocalsDictNodeNG): |
self.body = [] |
self.future_imports = set() |
- @cachedproperty |
- def file_stream(self): |
+ def _get_stream(self): |
if self.file_bytes is not None: |
return BytesIO(self.file_bytes) |
if self.file is not None: |
- return open(self.file, 'rb') |
+ stream = open(self.file, 'rb') |
+ return stream |
return None |
+ @property |
+ def file_stream(self): |
+ warnings.warn("file_stream property is deprecated and " |
+ "it is slated for removal in astroid 1.6." |
+ "Use the new method 'stream' instead.", |
+ PendingDeprecationWarning, |
+ stacklevel=2) |
+ return self._get_stream() |
+ |
+ def stream(self): |
+ """Get a stream to the underlying file or bytes.""" |
+ return self._get_stream() |
+ |
+ def close(self): |
+ """Close the underlying file streams.""" |
+ warnings.warn("close method is deprecated and it is " |
+ "slated for removal in astroid 1.6, along " |
+ "with 'file_stream' property. " |
+ "Its behaviour is replaced by managing each " |
+ "file stream returned by the 'stream' method.", |
+ PendingDeprecationWarning, |
+ stacklevel=2) |
+ |
def block_range(self, lineno): |
"""return block line numbers. |
@@ -505,50 +569,28 @@ else: |
# Function ################################################################### |
def _infer_decorator_callchain(node): |
- """ Detect decorator call chaining and see if the |
- end result is a static or a classmethod. |
+ """Detect decorator call chaining and see if the end result is a |
+ static or a classmethod. |
""" |
- current = node |
- while True: |
- if isinstance(current, CallFunc): |
- try: |
- current = next(current.func.infer()) |
- except InferenceError: |
- return |
- elif isinstance(current, Function): |
- if not current.parent: |
- return |
- try: |
- # TODO: We don't handle multiple inference results right now, |
- # because there's no flow to reason when the return |
- # is what we are looking for, a static or a class method. |
- result = next(current.infer_call_result(current.parent)) |
- if current is result: |
- # This will lead to an infinite loop, where a decorator |
- # returns itself. |
- return |
- except (StopIteration, InferenceError): |
- return |
- if isinstance(result, (Function, CallFunc)): |
- current = result |
- else: |
- if isinstance(result, Instance): |
- result = result._proxied |
- if isinstance(result, Class): |
- if (result.name == 'classmethod' and |
- result.root().name == BUILTINS): |
- return 'classmethod' |
- elif (result.name == 'staticmethod' and |
- result.root().name == BUILTINS): |
- return 'staticmethod' |
- else: |
- return |
- else: |
- # We aren't interested in anything else returned, |
- # so go back to the function type inference. |
- return |
- else: |
- return |
+ if not isinstance(node, Function): |
+ return |
+ if not node.parent: |
+ return |
+ try: |
+ # TODO: We don't handle multiple inference results right now, |
+ # because there's no flow to reason when the return |
+ # is what we are looking for, a static or a class method. |
+ result = next(node.infer_call_result(node.parent)) |
+ except (StopIteration, InferenceError): |
+ return |
+ if isinstance(result, Instance): |
+ result = result._proxied |
+ if isinstance(result, Class): |
+ if result.is_subtype_of('%s.classmethod' % BUILTINS): |
+ return 'classmethod' |
+ if result.is_subtype_of('%s.staticmethod' % BUILTINS): |
+ return 'staticmethod' |
+ |
def _function_type(self): |
""" |
@@ -561,25 +603,34 @@ def _function_type(self): |
if self.decorators: |
for node in self.decorators.nodes: |
if isinstance(node, CallFunc): |
- _type = _infer_decorator_callchain(node) |
- if _type is None: |
+ # Handle the following case: |
+ # @some_decorator(arg1, arg2) |
+ # def func(...) |
+ # |
+ try: |
+ current = next(node.func.infer()) |
+ except InferenceError: |
continue |
- else: |
+ _type = _infer_decorator_callchain(current) |
+ if _type is not None: |
return _type |
- if not isinstance(node, Name): |
- continue |
+ |
try: |
for infered in node.infer(): |
+ # Check to see if this returns a static or a class method. |
+ _type = _infer_decorator_callchain(infered) |
+ if _type is not None: |
+ return _type |
+ |
if not isinstance(infered, Class): |
continue |
for ancestor in infered.ancestors(): |
- if isinstance(ancestor, Class): |
- if (ancestor.name == 'classmethod' and |
- ancestor.root().name == BUILTINS): |
- return 'classmethod' |
- elif (ancestor.name == 'staticmethod' and |
- ancestor.root().name == BUILTINS): |
- return 'staticmethod' |
+ if not isinstance(ancestor, Class): |
+ continue |
+ if ancestor.is_subtype_of('%s.classmethod' % BUILTINS): |
+ return 'classmethod' |
+ elif ancestor.is_subtype_of('%s.staticmethod' % BUILTINS): |
+ return 'staticmethod' |
except InferenceError: |
pass |
return self._type |
@@ -763,8 +814,8 @@ class Function(Statement, Lambda): |
# but does not contribute to the inheritance structure itself. We inject |
# a fake class into the hierarchy here for several well-known metaclass |
# generators, and filter it out later. |
- if (self.name == 'with_metaclass' and |
- len(self.args.args) == 1 and |
+ if (self.name == 'with_metaclass' and |
+ len(self.args.args) == 1 and |
self.args.vararg is not None): |
metaclass = next(caller.args[0].infer(context)) |
if isinstance(metaclass, Class): |
@@ -1328,7 +1379,8 @@ class Class(Statement, LocalsDictNodeNG, FilterStmtsMixin): |
if infered is YES: |
continue |
if (not isinstance(infered, Const) or |
- not isinstance(infered.value, str)): |
+ not isinstance(infered.value, |
+ six.string_types)): |
continue |
if not infered.value: |
continue |
@@ -1339,5 +1391,69 @@ class Class(Statement, LocalsDictNodeNG, FilterStmtsMixin): |
# Cached, because inferring them all the time is expensive |
@cached |
def slots(self): |
- """ Return all the slots for this node. """ |
- return list(self._islots()) |
+ """Get all the slots for this node. |
+ |
+ If the class doesn't define any slot, through `__slots__` |
+ variable, then this function will return a None. |
+ Also, it will return None in the case the slots weren't inferred. |
+ Otherwise, it will return a list of slot names. |
+ """ |
+ slots = self._islots() |
+ try: |
+ first = next(slots) |
+ except StopIteration: |
+ # The class doesn't have a __slots__ definition. |
+ return None |
+ return [first] + list(slots) |
+ |
+ def _inferred_bases(self, recurs=True, context=None): |
+ # TODO(cpopa): really similar with .ancestors, |
+ # but the difference is when one base is inferred, |
+ # only the first object is wanted. That's because |
+ # we aren't interested in superclasses, as in the following |
+ # example: |
+ # |
+ # class SomeSuperClass(object): pass |
+ # class SomeClass(SomeSuperClass): pass |
+ # class Test(SomeClass): pass |
+ # |
+ # Inferring SomeClass from the Test's bases will give |
+ # us both SomeClass and SomeSuperClass, but we are interested |
+ # only in SomeClass. |
+ |
+ if context is None: |
+ context = InferenceContext() |
+ if sys.version_info[0] >= 3: |
+ if not self.bases and self.qname() != 'builtins.object': |
+ yield builtin_lookup("object")[1][0] |
+ return |
+ |
+ for stmt in self.bases: |
+ try: |
+ baseobj = next(stmt.infer(context=context)) |
+ except InferenceError: |
+ # XXX log error ? |
+ continue |
+ if isinstance(baseobj, Instance): |
+ baseobj = baseobj._proxied |
+ if not isinstance(baseobj, Class): |
+ continue |
+ if not baseobj.hide: |
+ yield baseobj |
+ |
+ def mro(self, context=None): |
+ """Get the method resolution order, using C3 linearization. |
+ |
+ It returns the list of ancestors sorted by the mro. |
+ This will raise `NotImplementedError` for old-style classes, since |
+ they don't have the concept of MRO. |
+ """ |
+ if not self.newstyle: |
+ raise NotImplementedError( |
+ "Could not obtain mro for old-style classes.") |
+ |
+ bases = list(self._inferred_bases(context=context)) |
+ unmerged_mro = [[self]] + [base.mro() for base in bases] + [bases] |
+ |
+ _verify_duplicates_mro(unmerged_mro) |
+ return _c3_merge(unmerged_mro) |