Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: mojo/python/tests/data_pipe_utils_unittest.py

Issue 952223002: Add Python util to read from a data pipe into a bytearray (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Response to review Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/python/mojo_utils/data_pipe_utils.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import random
6
7 import mojo_unittest
8 from mojo_bindings import promise
9
10 # pylint: disable=F0401
11 import mojo_system as system
12
13 # pylint: disable=F0401
14 from mojo_utils import data_pipe_utils
15
16
17 def _GetRandomBuffer(size):
18 random.seed(size)
19 return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size)))
20
21
22 class DataPipeCopyTest(mojo_unittest.MojoTestCase):
23 def setUp(self):
24 super(DataPipeCopyTest, self).setUp()
25 self.handles = system.DataPipe()
26 self.error = None
27
28 def tearDown(self):
29 self.handles = None
30 super(DataPipeCopyTest, self).tearDown()
31
32 def _writeDataAndClose(self, handle, data):
33 status, num_bytes_written = handle.WriteData(data)
34 handle.Close()
35 self.assertEquals(system.RESULT_OK, status)
36 self.assertEquals(len(data), num_bytes_written)
37
38 def _copyDataFromPipe(self, handle, expected_data,
39 deadline=system.DEADLINE_INDEFINITE):
40 self._VerifyDataCopied(data_pipe_utils.CopyFromDataPipe(
41 handle, deadline), expected_data).Catch(self._CatchError)
42
43 def _CatchError(self, error):
44 if self.loop:
45 self.loop.Quit()
46 self.error = error
47
48 @promise.async
49 def _VerifyDataCopied(self, data, expected_data):
50 self.assertEquals(expected_data, data)
51 self.loop.Quit()
52
53 def _runAndCheckError(self):
54 self.loop.Run()
55 if self.error:
56 # pylint: disable=E0702
57 raise self.error
58
59 def _testEagerWrite(self, data):
60 self._writeDataAndClose(self.handles.producer_handle, data)
61 self._copyDataFromPipe(self.handles.consumer_handle, data)
62 self._runAndCheckError()
63
64 def _testDelayedWrite(self, data):
65 self._copyDataFromPipe(self.handles.consumer_handle, data)
66 self._writeDataAndClose(self.handles.producer_handle, data)
67 self._runAndCheckError()
68
69 def testTimeout(self):
70 self._copyDataFromPipe(self.handles.consumer_handle, bytearray(),
71 deadline=100)
72 with self.assertRaises(data_pipe_utils.DataPipeCopyException):
73 self._runAndCheckError()
74
75 def testCloseProducerWithoutWriting(self):
76 self._copyDataFromPipe(self.handles.consumer_handle, bytearray())
77 self.handles.producer_handle.Close()
78 self._runAndCheckError()
79
80 def testEagerWriteOfEmptyData(self):
81 self._testEagerWrite(bytearray())
82
83 def testDelayedWriteOfEmptyData(self):
84 self._testDelayedWrite(bytearray())
85
86 def testEagerWriteOfNonEmptyData(self):
87 self._testEagerWrite(_GetRandomBuffer(1024))
88
89 def testDelayedWriteOfNonEmptyData(self):
90 self._testDelayedWrite(_GetRandomBuffer(1024))
91
92 def testEagerWriteOfLargeBuffer(self):
93 self._testEagerWrite(_GetRandomBuffer(32 * 1024))
94
95 def testDelayedWriteOfLargeBuffer(self):
96 self._testDelayedWrite(_GetRandomBuffer(32 * 1024))
OLDNEW
« no previous file with comments | « mojo/python/mojo_utils/data_pipe_utils.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698