Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Unified Diff: third_party/typ/typ/host.py

Issue 627763002: Add new 'typ' python testing framework to third_party/. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: upload to typ v0.8.1, update README.chromium Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/typ/typ/host.py
diff --git a/third_party/typ/typ/host.py b/third_party/typ/typ/host.py
new file mode 100644
index 0000000000000000000000000000000000000000..5738e3a5883496ac0900ae235acb717563841a0b
--- /dev/null
+++ b/third_party/typ/typ/host.py
@@ -0,0 +1,282 @@
+# Copyright 2014 Dirk Pranke. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import io
+import logging
+import multiprocessing
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+
+
+if sys.version_info.major == 2: # pragma: python2
+ from urllib2 import urlopen, Request
+else: # pragma: python3
+ # pylint: disable=E0611
+ assert sys.version_info.major == 3
+ from urllib.request import urlopen, Request # pylint: disable=F0401,E0611
+
+
+is_debugging = False
+
+
+class Host(object):
+ python_interpreter = sys.executable
+ is_python3 = bool(sys.version_info.major == 3)
+
+ sep = os.sep
+ env = os.environ
+
+ _orig_stdout = sys.stdout
+ _orig_stderr = sys.stderr
+
+ def __init__(self):
+ self.logger = logging.getLogger()
+ self._orig_logging_handlers = None
+ self.stdout = sys.stdout
+ self.stderr = sys.stderr
+ self.stdin = sys.stdin
+ self.env = os.environ
+
+ def set_debugging(self, flag): # pragma: untested
+ # TODO: We currently use this to work around typ's brokenness
+ # when running -d under python3. We may or may not actually need
+ # this hook.
+ # pylint: disable=W0603
+ global is_debugging
+ is_debugging = flag
+
+ def abspath(self, *comps):
+ return os.path.abspath(self.join(*comps))
+
+ def add_to_path(self, *comps):
+ absolute_path = self.abspath(*comps)
+ if absolute_path not in sys.path:
+ sys.path.append(absolute_path)
+
+ def basename(self, path):
+ return os.path.basename(path)
+
+ def call(self, argv, stdin=None, env=None):
+ if stdin:
+ stdin_pipe = subprocess.PIPE
+ else:
+ stdin_pipe = None
+ proc = subprocess.Popen(argv, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, stdin=stdin_pipe,
+ env=env)
+ if stdin_pipe:
+ proc.stdin.write(stdin.encode('utf-8'))
+ stdout, stderr = proc.communicate()
+
+ # pylint type checking bug - pylint: disable=E1103
+ return proc.returncode, stdout.decode('utf-8'), stderr.decode('utf-8')
+
+ def chdir(self, *comps):
+ return os.chdir(self.join(*comps))
+
+ def cpu_count(self):
+ return multiprocessing.cpu_count()
+
+ def dirname(self, *comps):
+ return os.path.dirname(self.join(*comps))
+
+ def exists(self, *comps):
+ return os.path.exists(self.join(*comps))
+
+ def files_under(self, top):
+ all_files = []
+ for root, _, files in os.walk(top):
+ for f in files:
+ relpath = self.relpath(os.path.join(root, f), top)
+ all_files.append(relpath)
+ return all_files
+
+ def getcwd(self):
+ return os.getcwd()
+
+ def getenv(self, key, default=None):
+ return self.env.get(key, default)
+
+ def getpid(self):
+ return os.getpid()
+
+ def for_mp(self):
+ return None
+
+ def isdir(self, *comps):
+ return os.path.isdir(os.path.join(*comps))
+
+ def isfile(self, *comps):
+ return os.path.isfile(os.path.join(*comps))
+
+ def join(self, *comps):
+ return os.path.join(*comps)
+
+ def maybe_mkdir(self, *comps):
+ path = self.abspath(self.join(*comps))
+ if not self.exists(path):
+ os.mkdir(path)
+
+ def mkdtemp(self, **kwargs):
+ return tempfile.mkdtemp(**kwargs)
+
+ def mtime(self, *comps):
+ return os.stat(self.join(*comps)).st_mtime
+
+ def print_(self, msg='', end='\n', stream=None):
+ stream = stream or self.stdout
+ stream.write(str(msg) + end)
+ stream.flush()
+
+ def read_text_file(self, *comps):
+ return self._read(comps, 'r')
+
+ def read_binary_file(self, *comps):
+ return self._read(comps, 'rb')
+
+ def _read(self, comps, mode):
+ path = self.join(*comps)
+ with open(path, mode) as f:
+ return f.read()
+
+ def relpath(self, path, start):
+ return os.path.relpath(path, start)
+
+ def remove(self, *comps):
+ os.remove(self.join(*comps))
+
+ def rmtree(self, path):
+ shutil.rmtree(path, ignore_errors=True)
+
+ def splitext(self, path):
+ return os.path.splitext(path)
+
+ def time(self):
+ return time.time()
+
+ def write_text_file(self, path, contents):
+ return self._write(path, contents, mode='w')
+
+ def write_binary_file(self, path, contents):
+ return self._write(path, contents, mode='wb')
+
+ def _write(self, path, contents, mode):
+ with open(path, mode) as f:
+ f.write(contents)
+
+ def fetch(self, url, data=None, headers=None):
+ headers = headers or {}
+ return urlopen(Request(url, data.encode('utf8'), headers))
+
+ def terminal_width(self):
+ """Returns 0 if the width cannot be determined."""
+ try:
+ if sys.platform == 'win32': # pragma: win32
+ # From http://code.activestate.com/recipes/ \
+ # 440694-determine-size-of-console-window-on-windows/
+ from ctypes import windll, create_string_buffer
+
+ STDERR_HANDLE = -12
+ handle = windll.kernel32.GetStdHandle(STDERR_HANDLE)
+
+ SCREEN_BUFFER_INFO_SZ = 22
+ buf = create_string_buffer(SCREEN_BUFFER_INFO_SZ)
+
+ if windll.kernel32.GetConsoleScreenBufferInfo(handle, buf):
+ import struct
+ fields = struct.unpack("hhhhHhhhhhh", buf.raw)
+ left = fields[5]
+ right = fields[7]
+
+ # Note that we return 1 less than the width since writing
+ # into the rightmost column automatically performs a
+ # line feed.
+ return right - left
+ return 0
+ else: # pragma: no win32
+ import fcntl
+ import struct
+ import termios
+ packed = fcntl.ioctl(self.stderr.fileno(),
+ termios.TIOCGWINSZ, '\0' * 8)
+ _, columns, _, _ = struct.unpack('HHHH', packed)
+ return columns
+ except Exception: # pragma: untested
+ return 0
+
+ def _tap_output(self):
+ self.stdout = sys.stdout = _TeedStream(self.stdout)
+ self.stderr = sys.stderr = _TeedStream(self.stderr)
+
+ def _untap_output(self):
+ assert isinstance(self.stdout, _TeedStream)
+ self.stdout = sys.stdout = self.stdout.stream
+ self.stderr = sys.stderr = self.stderr.stream
+
+ def capture_output(self, divert=True):
+ self._tap_output()
+
+ # TODO: Make log capture more robust.
+ self._orig_logging_handlers = self.logger.handlers
+ if self._orig_logging_handlers: # pragma: untested
+ self.logger.handlers = [logging.StreamHandler(self.stderr)]
+
+ self.stdout.capture(divert)
+ self.stderr.capture(divert)
+
+ def restore_output(self):
+ assert isinstance(self.stdout, _TeedStream)
+ out, err = (self.stdout.restore(), self.stderr.restore())
+ self.logger.handlers = self._orig_logging_handlers
+ self._untap_output()
+ return out, err
+
+
+class _TeedStream(io.StringIO):
+
+ def __init__(self, stream):
+ super(_TeedStream, self).__init__()
+ self.stream = stream
+ self.capturing = False
+ self.diverting = False
+
+ def write(self, msg, *args, **kwargs): # pragma: untested
+ if self.capturing:
+ if sys.version_info.major == 2 and isinstance(msg, str):
+ msg = unicode(msg)
+ super(_TeedStream, self).write(msg, *args, **kwargs)
+ if not self.diverting:
+ self.stream.write(msg, *args, **kwargs)
+
+ def flush(self): # pragma: untested
+ if self.capturing:
+ super(_TeedStream, self).flush()
+ if not self.diverting:
+ self.stream.flush()
+
+ def capture(self, divert=True):
+ self.truncate(0)
+ self.capturing = True
+ self.diverting = divert
+
+ def restore(self):
+ msg = self.getvalue()
+ self.truncate(0)
+ self.capturing = False
+ self.diverting = False
+ return msg

Powered by Google App Engine
This is Rietveld 408576698