| Index: third_party/typ/typ/fakes/host_fake.py
|
| diff --git a/third_party/typ/typ/fakes/host_fake.py b/third_party/typ/typ/fakes/host_fake.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..e75814489b53d4c32aa5b515122c4a5212882bc6
|
| --- /dev/null
|
| +++ b/third_party/typ/typ/fakes/host_fake.py
|
| @@ -0,0 +1,298 @@
|
| +# Copyright 2014 Dirk Pranke. All rights reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +
|
| +import copy
|
| +import io
|
| +import sys
|
| +
|
| +
|
| +class FakeHost(object):
|
| + # "too many instance attributes" pylint: disable=R0902
|
| + # "redefining built-in" pylint: disable=W0622
|
| + # "unused arg" pylint: disable=W0613
|
| +
|
| + python_interpreter = 'python'
|
| + is_python3 = bool(sys.version_info.major == 3)
|
| +
|
| + def __init__(self):
|
| + self.stdin = io.StringIO()
|
| + self.stdout = io.StringIO()
|
| + self.stderr = io.StringIO()
|
| + self.env = {}
|
| + self.sep = '/'
|
| + self.dirs = set([])
|
| + self.files = {}
|
| + self.fetches = []
|
| + self.fetch_responses = {}
|
| + self.written_files = {}
|
| + self.last_tmpdir = None
|
| + self.current_tmpno = 0
|
| + self.mtimes = {}
|
| + self.cmds = []
|
| + self.cwd = '/tmp'
|
| +
|
| + def __getstate__(self): # pragma: untested
|
| + d = copy.copy(self.__dict__)
|
| + del d['stderr']
|
| + del d['stdout']
|
| + del d['stdin']
|
| + return d
|
| +
|
| + def __setstate__(self, d): # pragma: untested
|
| + for k, v in d.items():
|
| + setattr(self, k, v)
|
| + self.stdin = io.StringIO()
|
| + self.stdout = io.StringIO()
|
| + self.stderr = io.StringIO()
|
| +
|
| + def abspath(self, *comps):
|
| + relpath = self.join(*comps)
|
| + if relpath.startswith('/'):
|
| + return relpath
|
| + return self.join(self.cwd, relpath)
|
| +
|
| + def add_to_path(self, *comps):
|
| + absolute_path = self.abspath(*comps)
|
| + if absolute_path not in sys.path:
|
| + sys.path.append(absolute_path)
|
| +
|
| + def basename(self, path):
|
| + return path.split(self.sep)[-1]
|
| +
|
| + def call(self, argv, stdin=None, env=None):
|
| + self.cmds.append(argv)
|
| + return 0, '', ''
|
| +
|
| + def chdir(self, *comps):
|
| + path = self.join(*comps)
|
| + if not path.startswith('/'): # pragma: untested
|
| + path = self.join(self.cwd, path)
|
| + self.cwd = path
|
| +
|
| + def cpu_count(self):
|
| + return 1
|
| +
|
| + def dirname(self, path):
|
| + return '/'.join(path.split('/')[:-1])
|
| +
|
| + def exists(self, *comps):
|
| + path = self.abspath(*comps)
|
| + return ((path in self.files and self.files[path] is not None) or
|
| + path in self.dirs)
|
| +
|
| + def files_under(self, top):
|
| + files = []
|
| + top = self.abspath(top)
|
| + for f in self.files:
|
| + if self.files[f] is not None and f.startswith(top):
|
| + files.append(self.relpath(f, top))
|
| + return files
|
| +
|
| + def for_mp(self):
|
| + return self
|
| +
|
| + def getcwd(self):
|
| + return self.cwd
|
| +
|
| + def getenv(self, key, default=None):
|
| + return self.env.get(key, default)
|
| +
|
| + def getpid(self): # pragma: untested
|
| + return 1
|
| +
|
| + def isdir(self, *comps):
|
| + path = self.abspath(*comps)
|
| + return path in self.dirs
|
| +
|
| + def isfile(self, *comps):
|
| + path = self.abspath(*comps)
|
| + return path in self.files and self.files[path] is not None
|
| +
|
| + def join(self, *comps):
|
| + p = ''
|
| + for c in comps:
|
| + if c in ('', '.'): # pragma: untested
|
| + continue
|
| + elif c.startswith('/'):
|
| + p = c
|
| + elif p:
|
| + p += '/' + c
|
| + else:
|
| + p = c
|
| +
|
| + # Handle ./
|
| + p = p.replace('/./', '/')
|
| +
|
| + # Handle ../
|
| + while '/..' in p: # pragma: untested
|
| + comps = p.split('/')
|
| + idx = comps.index('..')
|
| + comps = comps[:idx-1] + comps[idx+1:]
|
| + p = '/'.join(comps)
|
| + return p
|
| +
|
| + def maybe_mkdir(self, *comps):
|
| + path = self.abspath(self.join(*comps))
|
| + if path not in self.dirs:
|
| + self.dirs.add(path)
|
| +
|
| + def mkdtemp(self, suffix='', prefix='tmp', dir=None, **_kwargs):
|
| + if dir is None:
|
| + dir = self.sep + '__im_tmp'
|
| + curno = self.current_tmpno
|
| + self.current_tmpno += 1
|
| + self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix))
|
| + self.dirs.add(self.last_tmpdir)
|
| + return self.last_tmpdir
|
| +
|
| + def mtime(self, *comps):
|
| + return self.mtimes.get(self.join(*comps), 0)
|
| +
|
| + def print_(self, msg='', end='\n', stream=None):
|
| + stream = stream or self.stdout
|
| + if not self.is_python3 and isinstance(msg, str): # pragma: untested
|
| + msg = unicode(msg)
|
| + stream.write(msg + end)
|
| + stream.flush()
|
| +
|
| + def read_binary_file(self, *comps):
|
| + return self._read(comps)
|
| +
|
| + def read_text_file(self, *comps):
|
| + return self._read(comps)
|
| +
|
| + def _read(self, comps):
|
| + return self.files[self.abspath(*comps)]
|
| +
|
| + def relpath(self, path, start):
|
| + return path.replace(start + '/', '')
|
| +
|
| + def remove(self, *comps):
|
| + path = self.abspath(*comps)
|
| + self.files[path] = None
|
| + self.written_files[path] = None
|
| +
|
| + def rmtree(self, *comps):
|
| + path = self.abspath(*comps)
|
| + for f in self.files:
|
| + if f.startswith(path):
|
| + self.files[f] = None
|
| + self.written_files[f] = None
|
| + self.dirs.remove(path)
|
| +
|
| + def terminal_width(self):
|
| + return 80
|
| +
|
| + def splitext(self, path):
|
| + idx = path.rfind('.')
|
| + if idx == -1:
|
| + return (path, '')
|
| + return (path[:idx], path[idx:])
|
| +
|
| + def time(self):
|
| + return 0
|
| +
|
| + def write_binary_file(self, path, contents):
|
| + self._write(path, contents)
|
| +
|
| + def write_text_file(self, path, contents):
|
| + self._write(path, contents)
|
| +
|
| + def _write(self, path, contents):
|
| + full_path = self.abspath(path)
|
| + self.maybe_mkdir(self.dirname(full_path))
|
| + self.files[full_path] = contents
|
| + self.written_files[full_path] = contents
|
| +
|
| + def fetch(self, url, data=None, headers=None): # pragma: untested
|
| + resp = self.fetch_responses.get(url, FakeResponse('', url))
|
| + self.fetches.append((url, data, headers, resp))
|
| + return resp
|
| +
|
| + def _tap_output(self): # pragma: untested
|
| + # TODO: assigning to sys.stdout/sys.stderr confuses the debugger
|
| + # with some sort of str/unicode problem.
|
| + self.stdout = _TeedStream(self.stdout)
|
| + self.stderr = _TeedStream(self.stderr)
|
| + if True:
|
| + sys.stdout = self.stdout
|
| + sys.stderr = self.stderr
|
| +
|
| + def _untap_output(self): # pragma: untested
|
| + assert isinstance(self.stdout, _TeedStream)
|
| + self.stdout = self.stdout.stream
|
| + self.stderr = self.stderr.stream
|
| + if True:
|
| + sys.stdout = self.stdout
|
| + sys.stderr = self.stderr
|
| +
|
| + def capture_output(self, divert=True): # pragma: untested
|
| + self._tap_output()
|
| + self.stdout.capture(divert)
|
| + self.stderr.capture(divert)
|
| +
|
| + def restore_output(self): # pragma: untested
|
| + assert isinstance(self.stdout, _TeedStream)
|
| + out, err = (self.stdout.restore(), self.stderr.restore())
|
| + self._untap_output()
|
| + return out, err
|
| +
|
| +
|
| +class _TeedStream(io.StringIO): # pragma: untested
|
| +
|
| + def __init__(self, stream):
|
| + super(_TeedStream, self).__init__()
|
| + self.stream = stream
|
| + self.capturing = False
|
| + self.diverting = False
|
| +
|
| + def write(self, msg, *args, **kwargs):
|
| + if self.capturing:
|
| + if sys.version_info.major == 2 and isinstance(msg, str):
|
| + msg = unicode(msg)
|
| + super(_TeedStream, self).write(msg, *args, **kwargs)
|
| + if not self.diverting:
|
| + self.stream.write(msg, *args, **kwargs)
|
| +
|
| + def flush(self):
|
| + if self.capturing:
|
| + super(_TeedStream, self).flush()
|
| + if not self.diverting:
|
| + self.stream.flush()
|
| +
|
| + def capture(self, divert=True):
|
| + self.truncate(0)
|
| + self.capturing = True
|
| + self.diverting = divert
|
| +
|
| + def restore(self):
|
| + msg = self.getvalue()
|
| + self.truncate(0)
|
| + self.capturing = False
|
| + self.diverting = False
|
| + return msg
|
| +
|
| +
|
| +class FakeResponse(io.StringIO): # pragma: untested
|
| +
|
| + def __init__(self, response, url, code=200):
|
| + io.StringIO.__init__(self, response)
|
| + self._url = url
|
| + self.code = code
|
| +
|
| + def geturl(self):
|
| + return self._url
|
| +
|
| + def getcode(self):
|
| + return self.code
|
|
|