Index: tests/trace_inputs_test.py |
diff --git a/tests/trace_inputs_test.py b/tests/trace_inputs_test.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..2c82f21f158b7b7e46350d9812a9f01e2b3ce330 |
--- /dev/null |
+++ b/tests/trace_inputs_test.py |
@@ -0,0 +1,448 @@ |
+#!/usr/bin/env python |
+# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import logging |
+import os |
+import unittest |
+import sys |
+ |
+BASE_DIR = unicode(os.path.dirname(os.path.abspath(__file__))) |
+ROOT_DIR = os.path.dirname(BASE_DIR) |
+sys.path.insert(0, ROOT_DIR) |
+ |
+FILE_PATH = unicode(os.path.abspath(__file__)) |
+ |
+import trace_inputs |
+ |
+ |
+def join_norm(*args): |
+ """Joins and normalizes path in a single step.""" |
+ return unicode(os.path.normpath(os.path.join(*args))) |
+ |
+ |
+class TraceInputs(unittest.TestCase): |
+ def test_process_quoted_arguments(self): |
+ test_cases = ( |
+ ('"foo"', ['foo']), |
+ ('"foo", "bar"', ['foo', 'bar']), |
+ ('"foo"..., "bar"', ['foo', 'bar']), |
+ ('"foo", "bar"...', ['foo', 'bar']), |
+ ( |
+ '"/browser_tests", "--type=use,comma"', |
+ ['/browser_tests', '--type=use,comma'] |
+ ), |
+ ( |
+ '"/browser_tests", "--ignored=\\" --type=renderer \\""', |
+ ['/browser_tests', '--ignored=" --type=renderer "'] |
+ ), |
+ ) |
+ for actual, expected in test_cases: |
+ self.assertEquals( |
+ expected, trace_inputs.strace_process_quoted_arguments(actual)) |
+ |
+ def test_process_escaped_arguments(self): |
+ test_cases = ( |
+ ('foo\\0', ['foo']), |
+ ('foo\\001bar\\0', ['foo', 'bar']), |
+ ('\\"foo\\"\\0', ['"foo"']), |
+ ) |
+ for actual, expected in test_cases: |
+ self.assertEquals( |
+ expected, |
+ trace_inputs.Dtrace.Context.process_escaped_arguments(actual)) |
+ |
+ def test_variable_abs(self): |
+ value = trace_inputs.Results.File(None, '/foo/bar', False, False) |
+ actual = value.replace_variables({'$FOO': '/foo'}) |
+ self.assertEquals('$FOO/bar', actual.path) |
+ self.assertEquals('$FOO/bar', actual.full_path) |
+ self.assertEquals(True, actual.tainted) |
+ |
+ def test_variable_rel(self): |
+ value = trace_inputs.Results.File('/usr', 'foo/bar', False, False) |
+ actual = value.replace_variables({'$FOO': 'foo'}) |
+ self.assertEquals('$FOO/bar', actual.path) |
+ self.assertEquals(os.path.join('/usr', '$FOO/bar'), actual.full_path) |
+ self.assertEquals(True, actual.tainted) |
+ |
+ def test_native_case_end_with_os_path_sep(self): |
+ # Make sure the trailing os.path.sep is kept. |
+ path = trace_inputs.get_native_path_case(ROOT_DIR) + os.path.sep |
+ self.assertEquals(trace_inputs.get_native_path_case(path), path) |
+ |
+ def test_native_case_non_existing(self): |
+ # Make sure it doesn't throw on non-existing files. |
+ non_existing = 'trace_input_test_this_file_should_not_exist' |
+ path = os.path.expanduser('~/' + non_existing) |
+ self.assertFalse(os.path.exists(path)) |
+ path = trace_inputs.get_native_path_case(ROOT_DIR) + os.path.sep |
+ self.assertEquals(trace_inputs.get_native_path_case(path), path) |
+ |
+ if sys.platform in ('darwin', 'win32'): |
+ def test_native_case_not_sensitive(self): |
+ # The home directory is almost guaranteed to have mixed upper/lower case |
+ # letters on both Windows and OSX. |
+ # This test also ensures that the output is independent on the input |
+ # string case. |
+ path = os.path.expanduser('~') |
+ self.assertTrue(os.path.isdir(path)) |
+ # This test assumes the variable is in the native path case on disk, this |
+ # should be the case. Verify this assumption: |
+ self.assertEquals(path, trace_inputs.get_native_path_case(path)) |
+ self.assertEquals( |
+ trace_inputs.get_native_path_case(path.lower()), |
+ trace_inputs.get_native_path_case(path.upper())) |
+ |
+ def test_native_case_not_sensitive_non_existent(self): |
+ # This test also ensures that the output is independent on the input |
+ # string case. |
+ non_existing = os.path.join( |
+ 'trace_input_test_this_dir_should_not_exist', 'really not', '') |
+ path = os.path.expanduser(os.path.join('~', non_existing)) |
+ self.assertFalse(os.path.exists(path)) |
+ lower = trace_inputs.get_native_path_case(path.lower()) |
+ upper = trace_inputs.get_native_path_case(path.upper()) |
+ # Make sure non-existing element is not modified: |
+ self.assertTrue(lower.endswith(non_existing.lower())) |
+ self.assertTrue(upper.endswith(non_existing.upper())) |
+ self.assertEquals(lower[:-len(non_existing)], upper[:-len(non_existing)]) |
+ |
+ if sys.platform != 'win32': |
+ def test_symlink(self): |
+ # This test will fail if the checkout is in a symlink. |
+ actual = trace_inputs.split_at_symlink(None, ROOT_DIR) |
+ expected = (ROOT_DIR, None, None) |
+ self.assertEquals(expected, actual) |
+ |
+ actual = trace_inputs.split_at_symlink( |
+ None, os.path.join(BASE_DIR, 'trace_inputs')) |
+ expected = ( |
+ os.path.join(BASE_DIR, 'trace_inputs'), None, None) |
+ self.assertEquals(expected, actual) |
+ |
+ actual = trace_inputs.split_at_symlink( |
+ None, os.path.join(BASE_DIR, 'trace_inputs', 'files2')) |
+ expected = ( |
+ os.path.join(BASE_DIR, 'trace_inputs'), 'files2', '') |
+ self.assertEquals(expected, actual) |
+ |
+ actual = trace_inputs.split_at_symlink( |
+ ROOT_DIR, os.path.join('tests', 'trace_inputs', 'files2')) |
+ expected = ( |
+ os.path.join('tests', 'trace_inputs'), 'files2', '') |
+ self.assertEquals(expected, actual) |
+ actual = trace_inputs.split_at_symlink( |
+ ROOT_DIR, os.path.join('tests', 'trace_inputs', 'files2', 'bar')) |
+ expected = ( |
+ os.path.join('tests', 'trace_inputs'), 'files2', '/bar') |
+ self.assertEquals(expected, actual) |
+ |
+ def test_native_case_symlink_right_case(self): |
+ actual = trace_inputs.get_native_path_case( |
+ os.path.join(BASE_DIR, 'trace_inputs')) |
+ self.assertEquals('trace_inputs', os.path.basename(actual)) |
+ |
+ # Make sure the symlink is not resolved. |
+ actual = trace_inputs.get_native_path_case( |
+ os.path.join(BASE_DIR, 'trace_inputs', 'files2')) |
+ self.assertEquals('files2', os.path.basename(actual)) |
+ |
+ if sys.platform == 'darwin': |
+ def test_native_case_symlink_wrong_case(self): |
+ actual = trace_inputs.get_native_path_case( |
+ os.path.join(BASE_DIR, 'trace_inputs')) |
+ self.assertEquals('trace_inputs', os.path.basename(actual)) |
+ |
+ # Make sure the symlink is not resolved. |
+ actual = trace_inputs.get_native_path_case( |
+ os.path.join(BASE_DIR, 'trace_inputs', 'Files2')) |
+ self.assertEquals('files2', os.path.basename(actual)) |
+ |
+ |
+if sys.platform != 'win32': |
+ class StraceInputs(unittest.TestCase): |
+ # Represents the root process pid (an arbitrary number). |
+ _ROOT_PID = 27 |
+ _CHILD_PID = 14 |
+ _GRAND_CHILD_PID = 70 |
+ |
+ @staticmethod |
+ def _load_context(lines, initial_cwd): |
+ context = trace_inputs.Strace.Context(lambda _: False, initial_cwd) |
+ for line in lines: |
+ context.on_line(*line) |
+ return context.to_results().flatten() |
+ |
+ def _test_lines(self, lines, initial_cwd, files, command=None): |
+ filepath = join_norm(initial_cwd, '../out/unittests') |
+ command = command or ['../out/unittests'] |
+ expected = { |
+ 'root': { |
+ 'children': [], |
+ 'command': command, |
+ 'executable': filepath, |
+ 'files': files, |
+ 'initial_cwd': initial_cwd, |
+ 'pid': self._ROOT_PID, |
+ } |
+ } |
+ if not files: |
+ expected['root']['command'] = None |
+ expected['root']['executable'] = None |
+ self.assertEquals(expected, self._load_context(lines, initial_cwd)) |
+ |
+ def test_execve(self): |
+ lines = [ |
+ (self._ROOT_PID, |
+ 'execve("/home/foo_bar_user/out/unittests", ' |
+ '["/home/foo_bar_user/out/unittests", ' |
+ '"--gtest_filter=AtExitTest.Basic"], [/* 44 vars */]) = 0'), |
+ (self._ROOT_PID, |
+ 'open("out/unittests.log", O_WRONLY|O_CREAT|O_APPEND, 0666) = 8'), |
+ ] |
+ files = [ |
+ { |
+ 'path': u'/home/foo_bar_user/out/unittests', |
+ 'size': -1, |
+ }, |
+ { |
+ 'path': u'/home/foo_bar_user/src/out/unittests.log', |
+ 'size': -1, |
+ }, |
+ ] |
+ command = [ |
+ '/home/foo_bar_user/out/unittests', '--gtest_filter=AtExitTest.Basic', |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', files, command) |
+ |
+ def test_empty(self): |
+ try: |
+ self._load_context([], None) |
+ self.fail() |
+ except trace_inputs.TracingFailure, e: |
+ expected = ( |
+ 'Found internal inconsitency in process lifetime detection ' |
+ 'while finding the root process', |
+ None, |
+ None, |
+ None, |
+ []) |
+ self.assertEquals(expected, e.args) |
+ |
+ def test_chmod(self): |
+ lines = [ |
+ (self._ROOT_PID, 'chmod("temp/file", 0100644) = 0'), |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', []) |
+ |
+ def test_close(self): |
+ lines = [ |
+ (self._ROOT_PID, 'close(7) = 0'), |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', []) |
+ |
+ def test_clone(self): |
+ # Grand-child with relative directory. |
+ lines = [ |
+ (self._ROOT_PID, |
+ 'clone(child_stack=0, flags=CLONE_CHILD_CLEARTID' |
+ '|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5350f829d0) = %d' % |
+ self._CHILD_PID), |
+ (self._CHILD_PID, |
+ 'clone(child_stack=0, flags=CLONE_CHILD_CLEARTID' |
+ '|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5350f829d0) = %d' % |
+ self._GRAND_CHILD_PID), |
+ (self._GRAND_CHILD_PID, |
+ 'open("%s", O_RDONLY) = 76' % os.path.basename(FILE_PATH)), |
+ ] |
+ size = os.stat(FILE_PATH).st_size |
+ expected = { |
+ 'root': { |
+ 'children': [ |
+ { |
+ 'children': [ |
+ { |
+ 'children': [], |
+ 'command': None, |
+ 'executable': None, |
+ 'files': [ |
+ { |
+ 'path': FILE_PATH, |
+ 'size': size, |
+ }, |
+ ], |
+ 'initial_cwd': BASE_DIR, |
+ 'pid': self._GRAND_CHILD_PID, |
+ }, |
+ ], |
+ 'command': None, |
+ 'executable': None, |
+ 'files': [], |
+ 'initial_cwd': BASE_DIR, |
+ 'pid': self._CHILD_PID, |
+ }, |
+ ], |
+ 'command': None, |
+ 'executable': None, |
+ 'files': [], |
+ 'initial_cwd': BASE_DIR, |
+ 'pid': self._ROOT_PID, |
+ }, |
+ } |
+ self.assertEquals(expected, self._load_context(lines, BASE_DIR)) |
+ |
+ def test_clone_chdir(self): |
+ # Grand-child with relative directory. |
+ lines = [ |
+ (self._ROOT_PID, |
+ 'execve("../out/unittests", ' |
+ '["../out/unittests"...], [/* 44 vars */]) = 0'), |
+ (self._ROOT_PID, |
+ 'clone(child_stack=0, flags=CLONE_CHILD_CLEARTID' |
+ '|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5350f829d0) = %d' % |
+ self._CHILD_PID), |
+ (self._CHILD_PID, |
+ 'chdir("/home_foo_bar_user/path1") = 0'), |
+ (self._CHILD_PID, |
+ 'clone(child_stack=0, flags=CLONE_CHILD_CLEARTID' |
+ '|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5350f829d0) = %d' % |
+ self._GRAND_CHILD_PID), |
+ (self._GRAND_CHILD_PID, |
+ 'execve("../out/unittests", ' |
+ '["../out/unittests"...], [/* 44 vars */]) = 0'), |
+ (self._ROOT_PID, 'chdir("/home_foo_bar_user/path2") = 0'), |
+ (self._GRAND_CHILD_PID, |
+ 'open("random.txt", O_RDONLY) = 76'), |
+ ] |
+ expected = { |
+ 'root': { |
+ 'children': [ |
+ { |
+ 'children': [ |
+ { |
+ 'children': [], |
+ 'command': ['../out/unittests'], |
+ 'executable': '/home_foo_bar_user/out/unittests', |
+ 'files': [ |
+ { |
+ 'path': u'/home_foo_bar_user/out/unittests', |
+ 'size': -1, |
+ }, |
+ { |
+ 'path': u'/home_foo_bar_user/path1/random.txt', |
+ 'size': -1, |
+ }, |
+ ], |
+ 'initial_cwd': u'/home_foo_bar_user/path1', |
+ 'pid': self._GRAND_CHILD_PID, |
+ }, |
+ ], |
+ # clone does not carry over the command and executable so it is |
+ # clear if an execve() call was done or not. |
+ 'command': None, |
+ 'executable': None, |
+ # This is important, since no execve call was done, it didn't |
+ # touch the executable file. |
+ 'files': [], |
+ 'initial_cwd': unicode(ROOT_DIR), |
+ 'pid': self._CHILD_PID, |
+ }, |
+ ], |
+ 'command': ['../out/unittests'], |
+ 'executable': join_norm(ROOT_DIR, '../out/unittests'), |
+ 'files': [ |
+ { |
+ 'path': join_norm(ROOT_DIR, '../out/unittests'), |
+ 'size': -1, |
+ }, |
+ ], |
+ 'initial_cwd': unicode(ROOT_DIR), |
+ 'pid': self._ROOT_PID, |
+ }, |
+ } |
+ self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) |
+ |
+ def test_open(self): |
+ lines = [ |
+ (self._ROOT_PID, |
+ 'execve("../out/unittests", ' |
+ '["../out/unittests"...], [/* 44 vars */]) = 0'), |
+ (self._ROOT_PID, |
+ 'open("out/unittests.log", O_WRONLY|O_CREAT|O_APPEND, 0666) = 8'), |
+ ] |
+ files = [ |
+ { |
+ 'path': u'/home/foo_bar_user/out/unittests', |
+ 'size': -1, |
+ }, |
+ { |
+ 'path': u'/home/foo_bar_user/src/out/unittests.log', |
+ 'size': -1, |
+ }, |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', files) |
+ |
+ def test_open_resumed(self): |
+ lines = [ |
+ (self._ROOT_PID, |
+ 'execve("../out/unittests", ' |
+ '["../out/unittests"...], [/* 44 vars */]) = 0'), |
+ (self._ROOT_PID, |
+ 'open("out/unittests.log", O_WRONLY|O_CREAT|O_APPEND ' |
+ '<unfinished ...>'), |
+ (self._ROOT_PID, '<... open resumed> ) = 3'), |
+ ] |
+ files = [ |
+ { |
+ 'path': u'/home/foo_bar_user/out/unittests', |
+ 'size': -1, |
+ }, |
+ { |
+ 'path': u'/home/foo_bar_user/src/out/unittests.log', |
+ 'size': -1, |
+ }, |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', files) |
+ |
+ def test_rmdir(self): |
+ lines = [ |
+ (self._ROOT_PID, 'rmdir("directory/to/delete") = 0'), |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', []) |
+ |
+ def test_setxattr(self): |
+ lines = [ |
+ (self._ROOT_PID, |
+ 'setxattr("file.exe", "attribute", "value", 0, 0) = 0'), |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', []) |
+ |
+ def test_sig_unexpected(self): |
+ lines = [ |
+ (self._ROOT_PID, 'exit_group(0) = ?'), |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', []) |
+ |
+ def test_stray(self): |
+ lines = [ |
+ (self._ROOT_PID, |
+ 'execve("../out/unittests", ' |
+ '["../out/unittests"...], [/* 44 vars */]) = 0'), |
+ (self._ROOT_PID, |
+ ') = ? <unavailable>'), |
+ ] |
+ files = [ |
+ { |
+ 'path': u'/home/foo_bar_user/out/unittests', |
+ 'size': -1, |
+ }, |
+ ] |
+ self._test_lines(lines, '/home/foo_bar_user/src', files) |
+ |
+ |
+if __name__ == '__main__': |
+ VERBOSE = '-v' in sys.argv |
+ logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) |
+ unittest.main() |