| Index: tools/telemetry/third_party/typ/typ/fakes/host_fake.py
|
| diff --git a/tools/telemetry/third_party/typ/typ/fakes/host_fake.py b/tools/telemetry/third_party/typ/typ/fakes/host_fake.py
|
| deleted file mode 100644
|
| index 89c007506fe6cad2e7cb38808728d727ba8e0a7d..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/typ/typ/fakes/host_fake.py
|
| +++ /dev/null
|
| @@ -1,292 +0,0 @@
|
| -# Copyright 2014 Dirk Pranke. All rights reserved.
|
| -#
|
| -# Licensed under the Apache License, Version 2.0 (the "License");
|
| -# you may not use this file except in compliance with the License.
|
| -# You may obtain a copy of the License at
|
| -#
|
| -# http://www.apache.org/licenses/LICENSE-2.0
|
| -#
|
| -# Unless required by applicable law or agreed to in writing, software
|
| -# distributed under the License is distributed on an "AS IS" BASIS,
|
| -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -# See the License for the specific language governing permissions and
|
| -# limitations under the License.
|
| -
|
| -import copy
|
| -import io
|
| -import logging
|
| -import sys
|
| -
|
| -from typ.host import _TeedStream
|
| -
|
| -
|
| -is_python3 = bool(sys.version_info.major == 3)
|
| -
|
| -if is_python3: # pragma: python3
|
| - # pylint: disable=redefined-builtin,invalid-name
|
| - unicode = str
|
| -
|
| -
|
| -class FakeHost(object):
|
| - # "too many instance attributes" pylint: disable=R0902
|
| - # "redefining built-in" pylint: disable=W0622
|
| - # "unused arg" pylint: disable=W0613
|
| -
|
| - python_interpreter = 'python'
|
| - is_python3 = bool(sys.version_info.major == 3)
|
| -
|
| - def __init__(self):
|
| - self.logger = logging.getLogger()
|
| - self.stdin = io.StringIO()
|
| - self.stdout = io.StringIO()
|
| - self.stderr = io.StringIO()
|
| - self.platform = 'linux2'
|
| - self.env = {}
|
| - self.sep = '/'
|
| - self.dirs = set([])
|
| - self.files = {}
|
| - self.fetches = []
|
| - self.fetch_responses = {}
|
| - self.written_files = {}
|
| - self.last_tmpdir = None
|
| - self.current_tmpno = 0
|
| - self.mtimes = {}
|
| - self.cmds = []
|
| - self.cwd = '/tmp'
|
| - self._orig_logging_handlers = []
|
| -
|
| - def __getstate__(self):
|
| - d = copy.copy(self.__dict__)
|
| - del d['stderr']
|
| - del d['stdout']
|
| - del d['stdin']
|
| - del d['logger']
|
| - del d['_orig_logging_handlers']
|
| - return d
|
| -
|
| - def __setstate__(self, d):
|
| - for k, v in d.items():
|
| - setattr(self, k, v)
|
| - self.logger = logging.getLogger()
|
| - self.stdin = io.StringIO()
|
| - self.stdout = io.StringIO()
|
| - self.stderr = io.StringIO()
|
| -
|
| - def abspath(self, *comps):
|
| - relpath = self.join(*comps)
|
| - if relpath.startswith('/'):
|
| - return relpath
|
| - return self.join(self.cwd, relpath)
|
| -
|
| - def add_to_path(self, *comps):
|
| - absolute_path = self.abspath(*comps)
|
| - if absolute_path not in sys.path:
|
| - sys.path.append(absolute_path)
|
| -
|
| - def basename(self, path):
|
| - return path.split(self.sep)[-1]
|
| -
|
| - def call(self, argv, stdin=None, env=None):
|
| - self.cmds.append(argv)
|
| - return 0, '', ''
|
| -
|
| - def call_inline(self, argv):
|
| - return self.call(argv)[0]
|
| -
|
| - def chdir(self, *comps):
|
| - path = self.join(*comps)
|
| - if not path.startswith('/'):
|
| - path = self.join(self.cwd, path)
|
| - self.cwd = path
|
| -
|
| - def cpu_count(self):
|
| - return 1
|
| -
|
| - def dirname(self, path):
|
| - return '/'.join(path.split('/')[:-1])
|
| -
|
| - def exists(self, *comps):
|
| - path = self.abspath(*comps)
|
| - return ((path in self.files and self.files[path] is not None) or
|
| - path in self.dirs)
|
| -
|
| - def files_under(self, top):
|
| - files = []
|
| - top = self.abspath(top)
|
| - for f in self.files:
|
| - if self.files[f] is not None and f.startswith(top):
|
| - files.append(self.relpath(f, top))
|
| - return files
|
| -
|
| - def for_mp(self):
|
| - return self
|
| -
|
| - def getcwd(self):
|
| - return self.cwd
|
| -
|
| - def getenv(self, key, default=None):
|
| - return self.env.get(key, default)
|
| -
|
| - def getpid(self):
|
| - return 1
|
| -
|
| - def isdir(self, *comps):
|
| - path = self.abspath(*comps)
|
| - return path in self.dirs
|
| -
|
| - def isfile(self, *comps):
|
| - path = self.abspath(*comps)
|
| - return path in self.files and self.files[path] is not None
|
| -
|
| - def join(self, *comps):
|
| - p = ''
|
| - for c in comps:
|
| - if c in ('', '.'):
|
| - continue
|
| - elif c.startswith('/'):
|
| - p = c
|
| - elif p:
|
| - p += '/' + c
|
| - else:
|
| - p = c
|
| -
|
| - # Handle ./
|
| - p = p.replace('/./', '/')
|
| -
|
| - # Handle ../
|
| - while '/..' in p:
|
| - comps = p.split('/')
|
| - idx = comps.index('..')
|
| - comps = comps[:idx-1] + comps[idx+1:]
|
| - p = '/'.join(comps)
|
| - return p
|
| -
|
| - def maybe_mkdir(self, *comps):
|
| - path = self.abspath(self.join(*comps))
|
| - if path not in self.dirs:
|
| - self.dirs.add(path)
|
| -
|
| - def mktempfile(self, delete=True):
|
| - curno = self.current_tmpno
|
| - self.current_tmpno += 1
|
| - f = io.StringIO()
|
| - f.name = '__im_tmp/tmpfile_%u' % curno
|
| - return f
|
| -
|
| - def mkdtemp(self, suffix='', prefix='tmp', dir=None, **_kwargs):
|
| - if dir is None:
|
| - dir = self.sep + '__im_tmp'
|
| - curno = self.current_tmpno
|
| - self.current_tmpno += 1
|
| - self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix))
|
| - self.dirs.add(self.last_tmpdir)
|
| - return self.last_tmpdir
|
| -
|
| - def mtime(self, *comps):
|
| - return self.mtimes.get(self.join(*comps), 0)
|
| -
|
| - def print_(self, msg='', end='\n', stream=None):
|
| - stream = stream or self.stdout
|
| - stream.write(msg + end)
|
| - stream.flush()
|
| -
|
| - def read_binary_file(self, *comps):
|
| - return self._read(comps)
|
| -
|
| - def read_text_file(self, *comps):
|
| - return self._read(comps)
|
| -
|
| - def _read(self, comps):
|
| - return self.files[self.abspath(*comps)]
|
| -
|
| - def realpath(self, *comps):
|
| - return self.abspath(*comps)
|
| -
|
| - def relpath(self, path, start):
|
| - return path.replace(start + '/', '')
|
| -
|
| - def remove(self, *comps):
|
| - path = self.abspath(*comps)
|
| - self.files[path] = None
|
| - self.written_files[path] = None
|
| -
|
| - def rmtree(self, *comps):
|
| - path = self.abspath(*comps)
|
| - for f in self.files:
|
| - if f.startswith(path):
|
| - self.files[f] = None
|
| - self.written_files[f] = None
|
| - self.dirs.remove(path)
|
| -
|
| - def terminal_width(self):
|
| - return 80
|
| -
|
| - def splitext(self, path):
|
| - idx = path.rfind('.')
|
| - if idx == -1:
|
| - return (path, '')
|
| - return (path[:idx], path[idx:])
|
| -
|
| - def time(self):
|
| - return 0
|
| -
|
| - def write_binary_file(self, path, contents):
|
| - self._write(path, contents)
|
| -
|
| - def write_text_file(self, path, contents):
|
| - self._write(path, contents)
|
| -
|
| - def _write(self, path, contents):
|
| - full_path = self.abspath(path)
|
| - self.maybe_mkdir(self.dirname(full_path))
|
| - self.files[full_path] = contents
|
| - self.written_files[full_path] = contents
|
| -
|
| - def fetch(self, url, data=None, headers=None):
|
| - resp = self.fetch_responses.get(url, FakeResponse(unicode(''), url))
|
| - self.fetches.append((url, data, headers, resp))
|
| - return resp
|
| -
|
| - def _tap_output(self):
|
| - self.stdout = _TeedStream(self.stdout)
|
| - self.stderr = _TeedStream(self.stderr)
|
| - if True:
|
| - sys.stdout = self.stdout
|
| - sys.stderr = self.stderr
|
| -
|
| - def _untap_output(self):
|
| - assert isinstance(self.stdout, _TeedStream)
|
| - self.stdout = self.stdout.stream
|
| - self.stderr = self.stderr.stream
|
| - if True:
|
| - sys.stdout = self.stdout
|
| - sys.stderr = self.stderr
|
| -
|
| - def capture_output(self, divert=True):
|
| - self._tap_output()
|
| - self._orig_logging_handlers = self.logger.handlers
|
| - if self._orig_logging_handlers:
|
| - self.logger.handlers = [logging.StreamHandler(self.stderr)]
|
| - self.stdout.capture(divert=divert)
|
| - self.stderr.capture(divert=divert)
|
| -
|
| - def restore_output(self):
|
| - assert isinstance(self.stdout, _TeedStream)
|
| - out, err = (self.stdout.restore(), self.stderr.restore())
|
| - self.logger.handlers = self._orig_logging_handlers
|
| - self._untap_output()
|
| - return out, err
|
| -
|
| -
|
| -class FakeResponse(io.StringIO):
|
| -
|
| - def __init__(self, response, url, code=200):
|
| - io.StringIO.__init__(self, response)
|
| - self._url = url
|
| - self.code = code
|
| -
|
| - def geturl(self):
|
| - return self._url
|
| -
|
| - def getcode(self):
|
| - return self.code
|
|
|