| Index: mojo/python/tests/data_pipe_utils_unittest.py
|
| diff --git a/mojo/python/tests/data_pipe_utils_unittest.py b/mojo/python/tests/data_pipe_utils_unittest.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..b6bc9fa142bad75e752c2348df26a046dbdb5f0e
|
| --- /dev/null
|
| +++ b/mojo/python/tests/data_pipe_utils_unittest.py
|
| @@ -0,0 +1,96 @@
|
| +# Copyright 2015 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +import random
|
| +
|
| +import mojo_unittest
|
| +from mojo_bindings import promise
|
| +
|
| +# pylint: disable=F0401
|
| +import mojo_system as system
|
| +
|
| +# pylint: disable=F0401
|
| +from mojo_utils import data_pipe_utils
|
| +
|
| +
|
| +def _GetRandomBuffer(size):
|
| + random.seed(size)
|
| + return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size)))
|
| +
|
| +
|
| +class DataPipeCopyTest(mojo_unittest.MojoTestCase):
|
| + def setUp(self):
|
| + super(DataPipeCopyTest, self).setUp()
|
| + self.handles = system.DataPipe()
|
| + self.error = None
|
| +
|
| + def tearDown(self):
|
| + self.handles = None
|
| + super(DataPipeCopyTest, self).tearDown()
|
| +
|
| + def _writeDataAndClose(self, handle, data):
|
| + status, num_bytes_written = handle.WriteData(data)
|
| + handle.Close()
|
| + self.assertEquals(system.RESULT_OK, status)
|
| + self.assertEquals(len(data), num_bytes_written)
|
| +
|
| + def _copyDataFromPipe(self, handle, expected_data,
|
| + deadline=system.DEADLINE_INDEFINITE):
|
| + self._VerifyDataCopied(data_pipe_utils.CopyFromDataPipe(
|
| + handle, deadline), expected_data).Catch(self._CatchError)
|
| +
|
| + def _CatchError(self, error):
|
| + if self.loop:
|
| + self.loop.Quit()
|
| + self.error = error
|
| +
|
| + @promise.async
|
| + def _VerifyDataCopied(self, data, expected_data):
|
| + self.assertEquals(expected_data, data)
|
| + self.loop.Quit()
|
| +
|
| + def _runAndCheckError(self):
|
| + self.loop.Run()
|
| + if self.error:
|
| + # pylint: disable=E0702
|
| + raise self.error
|
| +
|
| + def _testEagerWrite(self, data):
|
| + self._writeDataAndClose(self.handles.producer_handle, data)
|
| + self._copyDataFromPipe(self.handles.consumer_handle, data)
|
| + self._runAndCheckError()
|
| +
|
| + def _testDelayedWrite(self, data):
|
| + self._copyDataFromPipe(self.handles.consumer_handle, data)
|
| + self._writeDataAndClose(self.handles.producer_handle, data)
|
| + self._runAndCheckError()
|
| +
|
| + def testTimeout(self):
|
| + self._copyDataFromPipe(self.handles.consumer_handle, bytearray(),
|
| + deadline=100)
|
| + with self.assertRaises(data_pipe_utils.DataPipeCopyException):
|
| + self._runAndCheckError()
|
| +
|
| + def testCloseProducerWithoutWriting(self):
|
| + self._copyDataFromPipe(self.handles.consumer_handle, bytearray())
|
| + self.handles.producer_handle.Close()
|
| + self._runAndCheckError()
|
| +
|
| + def testEagerWriteOfEmptyData(self):
|
| + self._testEagerWrite(bytearray())
|
| +
|
| + def testDelayedWriteOfEmptyData(self):
|
| + self._testDelayedWrite(bytearray())
|
| +
|
| + def testEagerWriteOfNonEmptyData(self):
|
| + self._testEagerWrite(_GetRandomBuffer(1024))
|
| +
|
| + def testDelayedWriteOfNonEmptyData(self):
|
| + self._testDelayedWrite(_GetRandomBuffer(1024))
|
| +
|
| + def testEagerWriteOfLargeBuffer(self):
|
| + self._testEagerWrite(_GetRandomBuffer(32 * 1024))
|
| +
|
| + def testDelayedWriteOfLargeBuffer(self):
|
| + self._testDelayedWrite(_GetRandomBuffer(32 * 1024))
|
|
|