Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(195)

Unified Diff: devil/devil/utils/cmd_helper_test.py

Issue 2707113002: [devil] Add an iteration timeout option to cmd_helper.IterCmdStdoutLines. (Closed)
Patch Set: mikecase comments Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « devil/devil/utils/cmd_helper.py ('k') | devil/docs/adb_wrapper.md » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: devil/devil/utils/cmd_helper_test.py
diff --git a/devil/devil/utils/cmd_helper_test.py b/devil/devil/utils/cmd_helper_test.py
index a04f1adf135070bee0e41a3df718852145fe60db..783c4137c8d0b15e87a81f76f2351a30b29af7ef 100755
--- a/devil/devil/utils/cmd_helper_test.py
+++ b/devil/devil/utils/cmd_helper_test.py
@@ -7,9 +7,14 @@
import unittest
import subprocess
+import time
+from devil import devil_env
from devil.utils import cmd_helper
+with devil_env.SysPath(devil_env.PYMOCK_PATH):
+ import mock # pylint: disable=import-error
+
class CmdHelperSingleQuoteTest(unittest.TestCase):
@@ -84,35 +89,173 @@ class CmdHelperShinkToSnippetTest(unittest.TestCase):
cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
+_DEFAULT = 'DEFAULT'
+
+
+class _ProcessOutputEvent(object):
+
+ def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT):
+ self.select_fds = select_fds
+ self.read_contents = read_contents
+ self.ts = ts
+
+
+class _MockProcess(object):
+
+ def __init__(self, output_sequence=None, return_value=0):
+
+ # Arbitrary.
+ fake_stdout_fileno = 25
+
+ self.mock_proc = mock.MagicMock(spec=subprocess.Popen)
+ self.mock_proc.stdout = mock.MagicMock()
+ self.mock_proc.stdout.fileno = mock.MagicMock(
+ return_value=fake_stdout_fileno)
+ self.mock_proc.returncode = None
+
+ self._return_value = return_value
+
+ # This links the behavior of os.read, select.select, time.time, and
+ # <process>.poll. The output sequence can be thought of as a list of
+ # return values for select.select with corresponding return values for
+ # the other calls at any time between that select call and the following
+ # one. We iterate through the sequence only on calls to select.select.
+ #
+ # os.read is a special case, though, where we only return a given chunk
+ # of data *once* after a given call to select.
+
+ if not output_sequence:
+ output_sequence = []
+
+ # Use an leading element to make the iteration logic work.
+ initial_seq_element = _ProcessOutputEvent(
+ _DEFAULT, '',
+ output_sequence[0].ts if output_sequence else _DEFAULT)
+ output_sequence.insert(0, initial_seq_element)
+
+ for o in output_sequence:
+ if o.select_fds == _DEFAULT:
+ if o.read_contents is None:
+ o.select_fds = []
+ else:
+ o.select_fds = [fake_stdout_fileno]
+ if o.ts == _DEFAULT:
+ o.ts = time.time()
+ self._output_sequence = output_sequence
+
+ self._output_seq_index = 0
+ self._read_flags = [False] * len(output_sequence)
+
+ def read_side_effect(*_args, **_kwargs):
+ if self._read_flags[self._output_seq_index]:
+ return None
+ self._read_flags[self._output_seq_index] = True
+ return self._output_sequence[self._output_seq_index].read_contents
+
+ def select_side_effect(*_args, **_kwargs):
+ if self._output_seq_index is None:
+ self._output_seq_index = 0
+ else:
+ self._output_seq_index += 1
+ return (self._output_sequence[self._output_seq_index].select_fds,
+ None, None)
+
+ def time_side_effect(*_args, **_kwargs):
+ return self._output_sequence[self._output_seq_index].ts
+
+ def poll_side_effect(*_args, **_kwargs):
+ if self._output_seq_index >= len(self._output_sequence) - 1:
+ self.mock_proc.returncode = self._return_value
+ return self.mock_proc.returncode
+
+ mock_read = mock.MagicMock(side_effect=read_side_effect)
+ mock_select = mock.MagicMock(side_effect=select_side_effect)
+ mock_time = mock.MagicMock(side_effect=time_side_effect)
+ self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect)
+
+ # Set up but *do not start* the mocks.
+ self._mocks = [
+ mock.patch('fcntl.fcntl'),
+ mock.patch('os.read', new=mock_read),
+ mock.patch('select.select', new=mock_select),
+ mock.patch('time.time', new=mock_time),
+ ]
+
+ def __enter__(self):
+ for m in self._mocks:
+ m.__enter__()
+ return self.mock_proc
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ for m in reversed(self._mocks):
+ m.__exit__(exc_type, exc_val, exc_tb)
+
+
class CmdHelperIterCmdOutputLinesTest(unittest.TestCase):
"""Test IterCmdOutputLines with some calls to the unix 'seq' command."""
+ # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
+ # can mock the process.
+ # pylint: disable=protected-access
+
+ _SIMPLE_OUTPUT_SEQUENCE = [
+ _ProcessOutputEvent(read_contents='1\n2\n'),
+ ]
+
def testIterCmdOutputLines_success(self):
- for num, line in enumerate(
- cmd_helper.IterCmdOutputLines(['seq', '10']), 1):
- self.assertEquals(num, int(line))
+ with _MockProcess(
+ output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
+ self.assertEquals(num, int(line))
def testIterCmdOutputLines_exitStatusFail(self):
with self.assertRaises(subprocess.CalledProcessError):
- for num, line in enumerate(
- cmd_helper.IterCmdOutputLines('seq 10 && false', shell=True), 1):
- self.assertEquals(num, int(line))
- # after reading all the output we get an exit status of 1
+ with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
+ return_value=1) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
+ self.assertEquals(num, int(line))
+ # after reading all the output we get an exit status of 1
def testIterCmdOutputLines_exitStatusIgnored(self):
- for num, line in enumerate(
- cmd_helper.IterCmdOutputLines('seq 10 && false', shell=True,
- check_status=False), 1):
- self.assertEquals(num, int(line))
+ with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
+ return_value=1) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(
+ mock_proc, 'mock_proc', check_status=False),
+ 1):
+ self.assertEquals(num, int(line))
def testIterCmdOutputLines_exitStatusSkipped(self):
- for num, line in enumerate(
- cmd_helper.IterCmdOutputLines('seq 10 && false', shell=True), 1):
- self.assertEquals(num, int(line))
- # no exception will be raised because we don't attempt to read past
- # the end of the output and, thus, the status never gets checked
- if num == 10:
- break
+ with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
+ return_value=1) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
+ self.assertEquals(num, int(line))
+ # no exception will be raised because we don't attempt to read past
+ # the end of the output and, thus, the status never gets checked
+ if num == 2:
+ break
+
+ def testIterCmdOutputLines_delay(self):
+ output_sequence = [
+ _ProcessOutputEvent(read_contents='1\n2\n', ts=1),
+ _ProcessOutputEvent(read_contents=None, ts=2),
+ _ProcessOutputEvent(read_contents='Awake', ts=10),
+ ]
+ with _MockProcess(output_sequence=output_sequence) as mock_proc:
+ for num, line in enumerate(
+ cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc',
+ iter_timeout=5), 1):
+ if num <= 2:
+ self.assertEquals(num, int(line))
+ elif num == 3:
+ self.assertEquals(None, line)
+ elif num == 4:
+ self.assertEquals('Awake', line)
+ else:
+ self.fail()
if __name__ == '__main__':
« no previous file with comments | « devil/devil/utils/cmd_helper.py ('k') | devil/docs/adb_wrapper.md » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698