Index: mojo/python/tests/data_pipe_utils_unittest.py |
diff --git a/mojo/python/tests/data_pipe_utils_unittest.py b/mojo/python/tests/data_pipe_utils_unittest.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b6bc9fa142bad75e752c2348df26a046dbdb5f0e |
--- /dev/null |
+++ b/mojo/python/tests/data_pipe_utils_unittest.py |
@@ -0,0 +1,96 @@ |
+# Copyright 2015 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import random |
+ |
+import mojo_unittest |
+from mojo_bindings import promise |
+ |
+# pylint: disable=F0401 |
+import mojo_system as system |
+ |
+# pylint: disable=F0401 |
+from mojo_utils import data_pipe_utils |
+ |
+ |
+def _GetRandomBuffer(size): |
+ random.seed(size) |
+ return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size))) |
+ |
+ |
+class DataPipeCopyTest(mojo_unittest.MojoTestCase): |
+ def setUp(self): |
+ super(DataPipeCopyTest, self).setUp() |
+ self.handles = system.DataPipe() |
+ self.error = None |
+ |
+ def tearDown(self): |
+ self.handles = None |
+ super(DataPipeCopyTest, self).tearDown() |
+ |
+ def _writeDataAndClose(self, handle, data): |
+ status, num_bytes_written = handle.WriteData(data) |
+ handle.Close() |
+ self.assertEquals(system.RESULT_OK, status) |
+ self.assertEquals(len(data), num_bytes_written) |
+ |
+ def _copyDataFromPipe(self, handle, expected_data, |
+ deadline=system.DEADLINE_INDEFINITE): |
+ self._VerifyDataCopied(data_pipe_utils.CopyFromDataPipe( |
+ handle, deadline), expected_data).Catch(self._CatchError) |
+ |
+ def _CatchError(self, error): |
+ if self.loop: |
+ self.loop.Quit() |
+ self.error = error |
+ |
+ @promise.async |
+ def _VerifyDataCopied(self, data, expected_data): |
+ self.assertEquals(expected_data, data) |
+ self.loop.Quit() |
+ |
+ def _runAndCheckError(self): |
+ self.loop.Run() |
+ if self.error: |
+ # pylint: disable=E0702 |
+ raise self.error |
+ |
+ def _testEagerWrite(self, data): |
+ self._writeDataAndClose(self.handles.producer_handle, data) |
+ self._copyDataFromPipe(self.handles.consumer_handle, data) |
+ self._runAndCheckError() |
+ |
+ def _testDelayedWrite(self, data): |
+ self._copyDataFromPipe(self.handles.consumer_handle, data) |
+ self._writeDataAndClose(self.handles.producer_handle, data) |
+ self._runAndCheckError() |
+ |
+ def testTimeout(self): |
+ self._copyDataFromPipe(self.handles.consumer_handle, bytearray(), |
+ deadline=100) |
+ with self.assertRaises(data_pipe_utils.DataPipeCopyException): |
+ self._runAndCheckError() |
+ |
+ def testCloseProducerWithoutWriting(self): |
+ self._copyDataFromPipe(self.handles.consumer_handle, bytearray()) |
+ self.handles.producer_handle.Close() |
+ self._runAndCheckError() |
+ |
+ def testEagerWriteOfEmptyData(self): |
+ self._testEagerWrite(bytearray()) |
+ |
+ def testDelayedWriteOfEmptyData(self): |
+ self._testDelayedWrite(bytearray()) |
+ |
+ def testEagerWriteOfNonEmptyData(self): |
+ self._testEagerWrite(_GetRandomBuffer(1024)) |
+ |
+ def testDelayedWriteOfNonEmptyData(self): |
+ self._testDelayedWrite(_GetRandomBuffer(1024)) |
+ |
+ def testEagerWriteOfLargeBuffer(self): |
+ self._testEagerWrite(_GetRandomBuffer(32 * 1024)) |
+ |
+ def testDelayedWriteOfLargeBuffer(self): |
+ self._testDelayedWrite(_GetRandomBuffer(32 * 1024)) |