| Index: tools/telemetry/third_party/rope/rope/refactor/importutils/module_imports.py
|
| diff --git a/tools/telemetry/third_party/rope/rope/refactor/importutils/module_imports.py b/tools/telemetry/third_party/rope/rope/refactor/importutils/module_imports.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..5e5a357f2c83382079ffdf80b18b83f144996bb3
|
| --- /dev/null
|
| +++ b/tools/telemetry/third_party/rope/rope/refactor/importutils/module_imports.py
|
| @@ -0,0 +1,493 @@
|
| +from rope.base import ast
|
| +from rope.base import pynames
|
| +from rope.base import utils
|
| +from rope.refactor.importutils import actions
|
| +from rope.refactor.importutils import importinfo
|
| +
|
| +
|
| +class ModuleImports(object):
|
| +
|
| + def __init__(self, project, pymodule, import_filter=None):
|
| + self.project = project
|
| + self.pymodule = pymodule
|
| + self.separating_lines = 0
|
| + self.filter = import_filter
|
| +
|
| + @property
|
| + @utils.saveit
|
| + def imports(self):
|
| + finder = _GlobalImportFinder(self.pymodule)
|
| + result = finder.find_import_statements()
|
| + self.separating_lines = finder.get_separating_line_count()
|
| + if self.filter is not None:
|
| + for import_stmt in result:
|
| + if not self.filter(import_stmt):
|
| + import_stmt.readonly = True
|
| + return result
|
| +
|
| + def _get_unbound_names(self, defined_pyobject):
|
| + visitor = _GlobalUnboundNameFinder(self.pymodule, defined_pyobject)
|
| + ast.walk(self.pymodule.get_ast(), visitor)
|
| + return visitor.unbound
|
| +
|
| + def remove_unused_imports(self):
|
| + can_select = _OneTimeSelector(self._get_unbound_names(self.pymodule))
|
| + visitor = actions.RemovingVisitor(
|
| + self.project, self._current_folder(), can_select)
|
| + for import_statement in self.imports:
|
| + import_statement.accept(visitor)
|
| +
|
| + def get_used_imports(self, defined_pyobject):
|
| + result = []
|
| + can_select = _OneTimeSelector(
|
| + self._get_unbound_names(defined_pyobject))
|
| + visitor = actions.FilteringVisitor(
|
| + self.project, self._current_folder(), can_select)
|
| + for import_statement in self.imports:
|
| + new_import = import_statement.accept(visitor)
|
| + if new_import is not None and not new_import.is_empty():
|
| + result.append(new_import)
|
| + return result
|
| +
|
| + def get_changed_source(self):
|
| + # Make sure we forward a removed import's preceding blank
|
| + # lines count to the following import statement.
|
| + prev_stmt = None
|
| + for stmt in self.imports:
|
| + if prev_stmt is not None and prev_stmt.import_info.is_empty():
|
| + stmt.blank_lines = max(prev_stmt.blank_lines, stmt.blank_lines)
|
| + prev_stmt = stmt
|
| + # The new list of imports.
|
| + imports = [stmt for stmt in self.imports
|
| + if not stmt.import_info.is_empty()]
|
| +
|
| + after_removing = self._remove_imports(self.imports)
|
| + first_non_blank = self._first_non_blank_line(after_removing, 0)
|
| + first_import = self._first_import_line() - 1
|
| + result = []
|
| + # Writing module docs
|
| + result.extend(after_removing[first_non_blank:first_import])
|
| + # Writing imports
|
| + sorted_imports = sorted(imports, key=self._get_location)
|
| + for stmt in sorted_imports:
|
| + if stmt != sorted_imports[0]:
|
| + result.append('\n' * stmt.blank_lines)
|
| + result.append(stmt.get_import_statement() + '\n')
|
| + if sorted_imports and first_non_blank < len(after_removing):
|
| + result.append('\n' * self.separating_lines)
|
| +
|
| + # Writing the body
|
| + first_after_imports = self._first_non_blank_line(after_removing,
|
| + first_import)
|
| + result.extend(after_removing[first_after_imports:])
|
| + return ''.join(result)
|
| +
|
| + def _get_import_location(self, stmt):
|
| + start = stmt.get_new_start()
|
| + if start is None:
|
| + start = stmt.get_old_location()[0]
|
| + return start
|
| +
|
| + def _get_location(self, stmt):
|
| + if stmt.get_new_start() is not None:
|
| + return stmt.get_new_start()
|
| + else:
|
| + return stmt.get_old_location()[0]
|
| +
|
| + def _remove_imports(self, imports):
|
| + lines = self.pymodule.source_code.splitlines(True)
|
| + after_removing = []
|
| + last_index = 0
|
| + for stmt in imports:
|
| + start, end = stmt.get_old_location()
|
| + after_removing.extend(lines[last_index:start - 1])
|
| + last_index = end - 1
|
| + for i in range(start, end):
|
| + after_removing.append('')
|
| + after_removing.extend(lines[last_index:])
|
| + return after_removing
|
| +
|
| + def _first_non_blank_line(self, lines, lineno):
|
| + result = lineno
|
| + for line in lines[lineno:]:
|
| + if line.strip() == '':
|
| + result += 1
|
| + else:
|
| + break
|
| + return result
|
| +
|
| + def add_import(self, import_info):
|
| + visitor = actions.AddingVisitor(self.project, [import_info])
|
| + for import_statement in self.imports:
|
| + if import_statement.accept(visitor):
|
| + break
|
| + else:
|
| + lineno = self._get_new_import_lineno()
|
| + blanks = self._get_new_import_blanks()
|
| + self.imports.append(importinfo.ImportStatement(
|
| + import_info, lineno, lineno,
|
| + blank_lines=blanks))
|
| +
|
| + def _get_new_import_blanks(self):
|
| + return 0
|
| +
|
| + def _get_new_import_lineno(self):
|
| + if self.imports:
|
| + return self.imports[-1].end_line
|
| + return 1
|
| +
|
| + def filter_names(self, can_select):
|
| + visitor = actions.RemovingVisitor(
|
| + self.project, self._current_folder(), can_select)
|
| + for import_statement in self.imports:
|
| + import_statement.accept(visitor)
|
| +
|
| + def expand_stars(self):
|
| + can_select = _OneTimeSelector(self._get_unbound_names(self.pymodule))
|
| + visitor = actions.ExpandStarsVisitor(
|
| + self.project, self._current_folder(), can_select)
|
| + for import_statement in self.imports:
|
| + import_statement.accept(visitor)
|
| +
|
| + def remove_duplicates(self):
|
| + added_imports = []
|
| + for import_stmt in self.imports:
|
| + visitor = actions.AddingVisitor(self.project,
|
| + [import_stmt.import_info])
|
| + for added_import in added_imports:
|
| + if added_import.accept(visitor):
|
| + import_stmt.empty_import()
|
| + else:
|
| + added_imports.append(import_stmt)
|
| +
|
| + def force_single_imports(self):
|
| + """force a single import per statement"""
|
| + for import_stmt in self.imports[:]:
|
| + import_info = import_stmt.import_info
|
| + if import_info.is_empty():
|
| + continue
|
| + if len(import_info.names_and_aliases) > 1:
|
| + for name_and_alias in import_info.names_and_aliases:
|
| + if hasattr(import_info, "module_name"):
|
| + new_import = importinfo.FromImport(
|
| + import_info.module_name, import_info.level,
|
| + [name_and_alias])
|
| + else:
|
| + new_import = importinfo.NormalImport([name_and_alias])
|
| + self.add_import(new_import)
|
| + import_stmt.empty_import()
|
| +
|
| + def get_relative_to_absolute_list(self):
|
| + visitor = actions.RelativeToAbsoluteVisitor(
|
| + self.project, self._current_folder())
|
| + for import_stmt in self.imports:
|
| + if not import_stmt.readonly:
|
| + import_stmt.accept(visitor)
|
| + return visitor.to_be_absolute
|
| +
|
| + def get_self_import_fix_and_rename_list(self):
|
| + visitor = actions.SelfImportVisitor(
|
| + self.project, self._current_folder(), self.pymodule.get_resource())
|
| + for import_stmt in self.imports:
|
| + if not import_stmt.readonly:
|
| + import_stmt.accept(visitor)
|
| + return visitor.to_be_fixed, visitor.to_be_renamed
|
| +
|
| + def _current_folder(self):
|
| + return self.pymodule.get_resource().parent
|
| +
|
| + def sort_imports(self):
|
| + if self.project.prefs.get("sort_imports_alphabetically"):
|
| + sort_kwargs = dict(key=self._get_import_name)
|
| + else:
|
| + sort_kwargs = dict(key=self._key_imports)
|
| +
|
| + # IDEA: Sort from import list
|
| + visitor = actions.SortingVisitor(self.project, self._current_folder())
|
| + for import_statement in self.imports:
|
| + import_statement.accept(visitor)
|
| + in_projects = sorted(visitor.in_project, **sort_kwargs)
|
| + third_party = sorted(visitor.third_party, **sort_kwargs)
|
| + standards = sorted(visitor.standard, **sort_kwargs)
|
| + future = sorted(visitor.future, **sort_kwargs)
|
| + last_index = self._first_import_line()
|
| + last_index = self._move_imports(future, last_index, 0)
|
| + last_index = self._move_imports(standards, last_index, 1)
|
| + last_index = self._move_imports(third_party, last_index, 1)
|
| + last_index = self._move_imports(in_projects, last_index, 1)
|
| + self.separating_lines = 2
|
| +
|
| + def _first_import_line(self):
|
| + nodes = self.pymodule.get_ast().body
|
| + lineno = 0
|
| + if self.pymodule.get_doc() is not None:
|
| + lineno = 1
|
| + if len(nodes) > lineno:
|
| + lineno = self.pymodule.logical_lines.logical_line_in(
|
| + nodes[lineno].lineno)[0]
|
| + else:
|
| + lineno = self.pymodule.lines.length()
|
| + while lineno > 1:
|
| + line = self.pymodule.lines.get_line(lineno - 1)
|
| + if line.strip() == '':
|
| + lineno -= 1
|
| + else:
|
| + break
|
| + return lineno
|
| +
|
| + def _get_import_name(self, import_stmt):
|
| + import_info = import_stmt.import_info
|
| + if hasattr(import_info, "module_name"):
|
| + return "%s.%s" % (import_info.module_name,
|
| + import_info.names_and_aliases[0][0])
|
| + else:
|
| + return import_info.names_and_aliases[0][0]
|
| +
|
| + def _key_imports(self, stm1):
|
| + str1 = stm1.get_import_statement()
|
| + return str1.startswith("from "), str1
|
| +
|
| + #str1 = stmt1.get_import_statement()
|
| + #str2 = stmt2.get_import_statement()
|
| + #if str1.startswith('from ') and not str2.startswith('from '):
|
| + # return 1
|
| + #if not str1.startswith('from ') and str2.startswith('from '):
|
| + # return -1
|
| + #return cmp(str1, str2)
|
| +
|
| + def _move_imports(self, imports, index, blank_lines):
|
| + if imports:
|
| + imports[0].move(index, blank_lines)
|
| + index += 1
|
| + if len(imports) > 1:
|
| + for stmt in imports[1:]:
|
| + stmt.move(index)
|
| + index += 1
|
| + return index
|
| +
|
| + def handle_long_imports(self, maxdots, maxlength):
|
| + visitor = actions.LongImportVisitor(
|
| + self._current_folder(), self.project, maxdots, maxlength)
|
| + for import_statement in self.imports:
|
| + if not import_statement.readonly:
|
| + import_statement.accept(visitor)
|
| + for import_info in visitor.new_imports:
|
| + self.add_import(import_info)
|
| + return visitor.to_be_renamed
|
| +
|
| + def remove_pyname(self, pyname):
|
| + """Removes pyname when imported in ``from mod import x``"""
|
| + visitor = actions.RemovePyNameVisitor(self.project, self.pymodule,
|
| + pyname, self._current_folder())
|
| + for import_stmt in self.imports:
|
| + import_stmt.accept(visitor)
|
| +
|
| +
|
| +class _OneTimeSelector(object):
|
| +
|
| + def __init__(self, names):
|
| + self.names = names
|
| + self.selected_names = set()
|
| +
|
| + def __call__(self, imported_primary):
|
| + if self._can_name_be_added(imported_primary):
|
| + for name in self._get_dotted_tokens(imported_primary):
|
| + self.selected_names.add(name)
|
| + return True
|
| + return False
|
| +
|
| + def _get_dotted_tokens(self, imported_primary):
|
| + tokens = imported_primary.split('.')
|
| + for i in range(len(tokens)):
|
| + yield '.'.join(tokens[:i + 1])
|
| +
|
| + def _can_name_be_added(self, imported_primary):
|
| + for name in self._get_dotted_tokens(imported_primary):
|
| + if name in self.names and name not in self.selected_names:
|
| + return True
|
| + return False
|
| +
|
| +
|
| +class _UnboundNameFinder(object):
|
| +
|
| + def __init__(self, pyobject):
|
| + self.pyobject = pyobject
|
| +
|
| + def _visit_child_scope(self, node):
|
| + pyobject = self.pyobject.get_module().get_scope().\
|
| + get_inner_scope_for_line(node.lineno).pyobject
|
| + visitor = _LocalUnboundNameFinder(pyobject, self)
|
| + for child in ast.get_child_nodes(node):
|
| + ast.walk(child, visitor)
|
| +
|
| + def _FunctionDef(self, node):
|
| + self._visit_child_scope(node)
|
| +
|
| + def _ClassDef(self, node):
|
| + self._visit_child_scope(node)
|
| +
|
| + def _Name(self, node):
|
| + if self._get_root()._is_node_interesting(node) and \
|
| + not self.is_bound(node.id):
|
| + self.add_unbound(node.id)
|
| +
|
| + def _Attribute(self, node):
|
| + result = []
|
| + while isinstance(node, ast.Attribute):
|
| + result.append(node.attr)
|
| + node = node.value
|
| + if isinstance(node, ast.Name):
|
| + result.append(node.id)
|
| + primary = '.'.join(reversed(result))
|
| + if self._get_root()._is_node_interesting(node) and \
|
| + not self.is_bound(primary):
|
| + self.add_unbound(primary)
|
| + else:
|
| + ast.walk(node, self)
|
| +
|
| + def _get_root(self):
|
| + pass
|
| +
|
| + def is_bound(self, name, propagated=False):
|
| + pass
|
| +
|
| + def add_unbound(self, name):
|
| + pass
|
| +
|
| +
|
| +class _GlobalUnboundNameFinder(_UnboundNameFinder):
|
| +
|
| + def __init__(self, pymodule, wanted_pyobject):
|
| + super(_GlobalUnboundNameFinder, self).__init__(pymodule)
|
| + self.unbound = set()
|
| + self.names = set()
|
| + for name, pyname in pymodule._get_structural_attributes().items():
|
| + if not isinstance(pyname, (pynames.ImportedName,
|
| + pynames.ImportedModule)):
|
| + self.names.add(name)
|
| + wanted_scope = wanted_pyobject.get_scope()
|
| + self.start = wanted_scope.get_start()
|
| + self.end = wanted_scope.get_end() + 1
|
| +
|
| + def _get_root(self):
|
| + return self
|
| +
|
| + def is_bound(self, primary, propagated=False):
|
| + name = primary.split('.')[0]
|
| + if name in self.names:
|
| + return True
|
| + return False
|
| +
|
| + def add_unbound(self, name):
|
| + names = name.split('.')
|
| + for i in range(len(names)):
|
| + self.unbound.add('.'.join(names[:i + 1]))
|
| +
|
| + def _is_node_interesting(self, node):
|
| + return self.start <= node.lineno < self.end
|
| +
|
| +
|
| +class _LocalUnboundNameFinder(_UnboundNameFinder):
|
| +
|
| + def __init__(self, pyobject, parent):
|
| + super(_LocalUnboundNameFinder, self).__init__(pyobject)
|
| + self.parent = parent
|
| +
|
| + def _get_root(self):
|
| + return self.parent._get_root()
|
| +
|
| + def is_bound(self, primary, propagated=False):
|
| + name = primary.split('.')[0]
|
| + if propagated:
|
| + names = self.pyobject.get_scope().get_propagated_names()
|
| + else:
|
| + names = self.pyobject.get_scope().get_names()
|
| + if name in names or self.parent.is_bound(name, propagated=True):
|
| + return True
|
| + return False
|
| +
|
| + def add_unbound(self, name):
|
| + self.parent.add_unbound(name)
|
| +
|
| +
|
| +class _GlobalImportFinder(object):
|
| +
|
| + def __init__(self, pymodule):
|
| + self.current_folder = None
|
| + if pymodule.get_resource():
|
| + self.current_folder = pymodule.get_resource().parent
|
| + self.pymodule = pymodule
|
| + self.imports = []
|
| + self.pymodule = pymodule
|
| + self.lines = self.pymodule.lines
|
| +
|
| + def visit_import(self, node, end_line):
|
| + start_line = node.lineno
|
| + import_statement = importinfo.ImportStatement(
|
| + importinfo.NormalImport(self._get_names(node.names)),
|
| + start_line, end_line, self._get_text(start_line, end_line),
|
| + blank_lines=self._count_empty_lines_before(start_line))
|
| + self.imports.append(import_statement)
|
| +
|
| + def _count_empty_lines_before(self, lineno):
|
| + result = 0
|
| + for current in range(lineno - 1, 0, -1):
|
| + line = self.lines.get_line(current)
|
| + if line.strip() == '':
|
| + result += 1
|
| + else:
|
| + break
|
| + return result
|
| +
|
| + def _count_empty_lines_after(self, lineno):
|
| + result = 0
|
| + for current in range(lineno + 1, self.lines.length()):
|
| + line = self.lines.get_line(current)
|
| + if line.strip() == '':
|
| + result += 1
|
| + else:
|
| + break
|
| + return result
|
| +
|
| + def get_separating_line_count(self):
|
| + if not self.imports:
|
| + return 0
|
| + return self._count_empty_lines_after(self.imports[-1].end_line - 1)
|
| +
|
| + def _get_text(self, start_line, end_line):
|
| + result = []
|
| + for index in range(start_line, end_line):
|
| + result.append(self.lines.get_line(index))
|
| + return '\n'.join(result)
|
| +
|
| + def visit_from(self, node, end_line):
|
| + level = 0
|
| + if node.level:
|
| + level = node.level
|
| + import_info = importinfo.FromImport(
|
| + node.module or '', # see comment at rope.base.ast.walk
|
| + level, self._get_names(node.names))
|
| + start_line = node.lineno
|
| + self.imports.append(importinfo.ImportStatement(
|
| + import_info, node.lineno, end_line,
|
| + self._get_text(start_line, end_line),
|
| + blank_lines=
|
| + self._count_empty_lines_before(start_line)))
|
| +
|
| + def _get_names(self, alias_names):
|
| + result = []
|
| + for alias in alias_names:
|
| + result.append((alias.name, alias.asname))
|
| + return result
|
| +
|
| + def find_import_statements(self):
|
| + nodes = self.pymodule.get_ast().body
|
| + for index, node in enumerate(nodes):
|
| + if isinstance(node, (ast.Import, ast.ImportFrom)):
|
| + lines = self.pymodule.logical_lines
|
| + end_line = lines.logical_line_in(node.lineno)[1] + 1
|
| + if isinstance(node, ast.Import):
|
| + self.visit_import(node, end_line)
|
| + if isinstance(node, ast.ImportFrom):
|
| + self.visit_from(node, end_line)
|
| + return self.imports
|
|
|