Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(250)

Unified Diff: third_party/typ/typ/fakes/host_fake.py

Issue 627763002: Add new 'typ' python testing framework to third_party/. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: upload to typ v0.8.1, update README.chromium Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/typ/typ/fakes/host_fake.py
diff --git a/third_party/typ/typ/fakes/host_fake.py b/third_party/typ/typ/fakes/host_fake.py
new file mode 100644
index 0000000000000000000000000000000000000000..e75814489b53d4c32aa5b515122c4a5212882bc6
--- /dev/null
+++ b/third_party/typ/typ/fakes/host_fake.py
@@ -0,0 +1,298 @@
+# Copyright 2014 Dirk Pranke. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import io
+import sys
+
+
+class FakeHost(object):
+ # "too many instance attributes" pylint: disable=R0902
+ # "redefining built-in" pylint: disable=W0622
+ # "unused arg" pylint: disable=W0613
+
+ python_interpreter = 'python'
+ is_python3 = bool(sys.version_info.major == 3)
+
+ def __init__(self):
+ self.stdin = io.StringIO()
+ self.stdout = io.StringIO()
+ self.stderr = io.StringIO()
+ self.env = {}
+ self.sep = '/'
+ self.dirs = set([])
+ self.files = {}
+ self.fetches = []
+ self.fetch_responses = {}
+ self.written_files = {}
+ self.last_tmpdir = None
+ self.current_tmpno = 0
+ self.mtimes = {}
+ self.cmds = []
+ self.cwd = '/tmp'
+
+ def __getstate__(self): # pragma: untested
+ d = copy.copy(self.__dict__)
+ del d['stderr']
+ del d['stdout']
+ del d['stdin']
+ return d
+
+ def __setstate__(self, d): # pragma: untested
+ for k, v in d.items():
+ setattr(self, k, v)
+ self.stdin = io.StringIO()
+ self.stdout = io.StringIO()
+ self.stderr = io.StringIO()
+
+ def abspath(self, *comps):
+ relpath = self.join(*comps)
+ if relpath.startswith('/'):
+ return relpath
+ return self.join(self.cwd, relpath)
+
+ def add_to_path(self, *comps):
+ absolute_path = self.abspath(*comps)
+ if absolute_path not in sys.path:
+ sys.path.append(absolute_path)
+
+ def basename(self, path):
+ return path.split(self.sep)[-1]
+
+ def call(self, argv, stdin=None, env=None):
+ self.cmds.append(argv)
+ return 0, '', ''
+
+ def chdir(self, *comps):
+ path = self.join(*comps)
+ if not path.startswith('/'): # pragma: untested
+ path = self.join(self.cwd, path)
+ self.cwd = path
+
+ def cpu_count(self):
+ return 1
+
+ def dirname(self, path):
+ return '/'.join(path.split('/')[:-1])
+
+ def exists(self, *comps):
+ path = self.abspath(*comps)
+ return ((path in self.files and self.files[path] is not None) or
+ path in self.dirs)
+
+ def files_under(self, top):
+ files = []
+ top = self.abspath(top)
+ for f in self.files:
+ if self.files[f] is not None and f.startswith(top):
+ files.append(self.relpath(f, top))
+ return files
+
+ def for_mp(self):
+ return self
+
+ def getcwd(self):
+ return self.cwd
+
+ def getenv(self, key, default=None):
+ return self.env.get(key, default)
+
+ def getpid(self): # pragma: untested
+ return 1
+
+ def isdir(self, *comps):
+ path = self.abspath(*comps)
+ return path in self.dirs
+
+ def isfile(self, *comps):
+ path = self.abspath(*comps)
+ return path in self.files and self.files[path] is not None
+
+ def join(self, *comps):
+ p = ''
+ for c in comps:
+ if c in ('', '.'): # pragma: untested
+ continue
+ elif c.startswith('/'):
+ p = c
+ elif p:
+ p += '/' + c
+ else:
+ p = c
+
+ # Handle ./
+ p = p.replace('/./', '/')
+
+ # Handle ../
+ while '/..' in p: # pragma: untested
+ comps = p.split('/')
+ idx = comps.index('..')
+ comps = comps[:idx-1] + comps[idx+1:]
+ p = '/'.join(comps)
+ return p
+
+ def maybe_mkdir(self, *comps):
+ path = self.abspath(self.join(*comps))
+ if path not in self.dirs:
+ self.dirs.add(path)
+
+ def mkdtemp(self, suffix='', prefix='tmp', dir=None, **_kwargs):
+ if dir is None:
+ dir = self.sep + '__im_tmp'
+ curno = self.current_tmpno
+ self.current_tmpno += 1
+ self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix))
+ self.dirs.add(self.last_tmpdir)
+ return self.last_tmpdir
+
+ def mtime(self, *comps):
+ return self.mtimes.get(self.join(*comps), 0)
+
+ def print_(self, msg='', end='\n', stream=None):
+ stream = stream or self.stdout
+ if not self.is_python3 and isinstance(msg, str): # pragma: untested
+ msg = unicode(msg)
+ stream.write(msg + end)
+ stream.flush()
+
+ def read_binary_file(self, *comps):
+ return self._read(comps)
+
+ def read_text_file(self, *comps):
+ return self._read(comps)
+
+ def _read(self, comps):
+ return self.files[self.abspath(*comps)]
+
+ def relpath(self, path, start):
+ return path.replace(start + '/', '')
+
+ def remove(self, *comps):
+ path = self.abspath(*comps)
+ self.files[path] = None
+ self.written_files[path] = None
+
+ def rmtree(self, *comps):
+ path = self.abspath(*comps)
+ for f in self.files:
+ if f.startswith(path):
+ self.files[f] = None
+ self.written_files[f] = None
+ self.dirs.remove(path)
+
+ def terminal_width(self):
+ return 80
+
+ def splitext(self, path):
+ idx = path.rfind('.')
+ if idx == -1:
+ return (path, '')
+ return (path[:idx], path[idx:])
+
+ def time(self):
+ return 0
+
+ def write_binary_file(self, path, contents):
+ self._write(path, contents)
+
+ def write_text_file(self, path, contents):
+ self._write(path, contents)
+
+ def _write(self, path, contents):
+ full_path = self.abspath(path)
+ self.maybe_mkdir(self.dirname(full_path))
+ self.files[full_path] = contents
+ self.written_files[full_path] = contents
+
+ def fetch(self, url, data=None, headers=None): # pragma: untested
+ resp = self.fetch_responses.get(url, FakeResponse('', url))
+ self.fetches.append((url, data, headers, resp))
+ return resp
+
+ def _tap_output(self): # pragma: untested
+ # TODO: assigning to sys.stdout/sys.stderr confuses the debugger
+ # with some sort of str/unicode problem.
+ self.stdout = _TeedStream(self.stdout)
+ self.stderr = _TeedStream(self.stderr)
+ if True:
+ sys.stdout = self.stdout
+ sys.stderr = self.stderr
+
+ def _untap_output(self): # pragma: untested
+ assert isinstance(self.stdout, _TeedStream)
+ self.stdout = self.stdout.stream
+ self.stderr = self.stderr.stream
+ if True:
+ sys.stdout = self.stdout
+ sys.stderr = self.stderr
+
+ def capture_output(self, divert=True): # pragma: untested
+ self._tap_output()
+ self.stdout.capture(divert)
+ self.stderr.capture(divert)
+
+ def restore_output(self): # pragma: untested
+ assert isinstance(self.stdout, _TeedStream)
+ out, err = (self.stdout.restore(), self.stderr.restore())
+ self._untap_output()
+ return out, err
+
+
+class _TeedStream(io.StringIO): # pragma: untested
+
+ def __init__(self, stream):
+ super(_TeedStream, self).__init__()
+ self.stream = stream
+ self.capturing = False
+ self.diverting = False
+
+ def write(self, msg, *args, **kwargs):
+ if self.capturing:
+ if sys.version_info.major == 2 and isinstance(msg, str):
+ msg = unicode(msg)
+ super(_TeedStream, self).write(msg, *args, **kwargs)
+ if not self.diverting:
+ self.stream.write(msg, *args, **kwargs)
+
+ def flush(self):
+ if self.capturing:
+ super(_TeedStream, self).flush()
+ if not self.diverting:
+ self.stream.flush()
+
+ def capture(self, divert=True):
+ self.truncate(0)
+ self.capturing = True
+ self.diverting = divert
+
+ def restore(self):
+ msg = self.getvalue()
+ self.truncate(0)
+ self.capturing = False
+ self.diverting = False
+ return msg
+
+
+class FakeResponse(io.StringIO): # pragma: untested
+
+ def __init__(self, response, url, code=200):
+ io.StringIO.__init__(self, response)
+ self._url = url
+ self.code = code
+
+ def geturl(self):
+ return self._url
+
+ def getcode(self):
+ return self.code

Powered by Google App Engine
This is Rietveld 408576698