Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(82)

Side by Side Diff: mojo/python/tests/system_unittest.py

Issue 675563002: Remove mojo/python and gyp targets (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/python/tests/runloop_unittest.py ('k') | mojo/tools/mojob.sh » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import random
6 import time
7
8 import mojo_unittest
9
10 # pylint: disable=E0611
11 from mojo import system
12
13 DATA_SIZE = 1024
14
15
16 def _GetRandomBuffer(size):
17 random.seed(size)
18 return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size)))
19
20
21 class CoreTest(mojo_unittest.MojoTestCase):
22
23 def testResults(self):
24 self.assertEquals(system.RESULT_OK, 0)
25 self.assertLess(system.RESULT_CANCELLED, 0)
26 self.assertLess(system.RESULT_UNKNOWN, 0)
27 self.assertLess(system.RESULT_INVALID_ARGUMENT, 0)
28 self.assertLess(system.RESULT_DEADLINE_EXCEEDED, 0)
29 self.assertLess(system.RESULT_NOT_FOUND, 0)
30 self.assertLess(system.RESULT_ALREADY_EXISTS, 0)
31 self.assertLess(system.RESULT_PERMISSION_DENIED, 0)
32 self.assertLess(system.RESULT_RESOURCE_EXHAUSTED, 0)
33 self.assertLess(system.RESULT_FAILED_PRECONDITION, 0)
34 self.assertLess(system.RESULT_ABORTED, 0)
35 self.assertLess(system.RESULT_OUT_OF_RANGE, 0)
36 self.assertLess(system.RESULT_UNIMPLEMENTED, 0)
37 self.assertLess(system.RESULT_INTERNAL, 0)
38 self.assertLess(system.RESULT_UNAVAILABLE, 0)
39 self.assertLess(system.RESULT_DATA_LOSS, 0)
40 self.assertLess(system.RESULT_BUSY, 0)
41 self.assertLess(system.RESULT_SHOULD_WAIT, 0)
42
43 def testConstants(self):
44 self.assertGreaterEqual(system.DEADLINE_INDEFINITE, 0)
45 self.assertGreaterEqual(system.HANDLE_SIGNAL_NONE, 0)
46 self.assertGreaterEqual(system.HANDLE_SIGNAL_READABLE, 0)
47 self.assertGreaterEqual(system.HANDLE_SIGNAL_WRITABLE, 0)
48 self.assertGreaterEqual(system.WRITE_MESSAGE_FLAG_NONE, 0)
49 self.assertGreaterEqual(system.READ_MESSAGE_FLAG_NONE, 0)
50 self.assertGreaterEqual(system.READ_MESSAGE_FLAG_MAY_DISCARD, 0)
51 self.assertGreaterEqual(system.WRITE_DATA_FLAG_NONE, 0)
52 self.assertGreaterEqual(system.WRITE_DATA_FLAG_ALL_OR_NONE, 0)
53 self.assertGreaterEqual(system.READ_DATA_FLAG_NONE, 0)
54 self.assertGreaterEqual(system.READ_DATA_FLAG_ALL_OR_NONE, 0)
55 self.assertGreaterEqual(system.READ_DATA_FLAG_DISCARD, 0)
56 self.assertGreaterEqual(system.READ_DATA_FLAG_QUERY, 0)
57 self.assertGreaterEqual(system.MAP_BUFFER_FLAG_NONE, 0)
58
59 def testGetTimeTicksNow(self):
60 v1 = system.GetTimeTicksNow()
61 time.sleep(1e-3)
62 v2 = system.GetTimeTicksNow()
63 self.assertGreater(v1, 0)
64 self.assertGreater(v2, v1 + 1e2)
65 self.assertLess(v2, v1 + 1e5)
66
67 def _testHandlesCreation(self, *args):
68 for handle in args:
69 self.assertTrue(handle.IsValid())
70 handle.Close()
71 self.assertFalse(handle.IsValid())
72
73 def _TestMessageHandleCreation(self, handles):
74 self._testHandlesCreation(handles.handle0, handles.handle1)
75
76 def testCreateMessagePipe(self):
77 self._TestMessageHandleCreation(system.MessagePipe())
78
79 def testCreateMessagePipeWithNoneOptions(self):
80 self._TestMessageHandleCreation(system.MessagePipe(None))
81
82 def testCreateMessagePipeWithOptions(self):
83 self._TestMessageHandleCreation(
84 system.MessagePipe(system.CreateMessagePipeOptions()))
85
86 def testWaitOverMessagePipe(self):
87 handles = system.MessagePipe()
88 handle = handles.handle0
89
90 self.assertEquals(system.RESULT_OK, handle.Wait(
91 system.HANDLE_SIGNAL_WRITABLE, system.DEADLINE_INDEFINITE))
92 self.assertEquals(system.RESULT_DEADLINE_EXCEEDED,
93 handle.Wait(system.HANDLE_SIGNAL_READABLE, 0))
94
95 handles.handle1.WriteMessage()
96
97 self.assertEquals(
98 system.RESULT_OK,
99 handle.Wait(
100 system.HANDLE_SIGNAL_READABLE,
101 system.DEADLINE_INDEFINITE))
102
103 def testWaitOverManyMessagePipe(self):
104 handles = system.MessagePipe()
105 handle0 = handles.handle0
106 handle1 = handles.handle1
107
108 self.assertEquals(
109 0,
110 system.WaitMany(
111 [(handle0, system.HANDLE_SIGNAL_WRITABLE),
112 (handle1, system.HANDLE_SIGNAL_WRITABLE)],
113 system.DEADLINE_INDEFINITE))
114 self.assertEquals(
115 system.RESULT_DEADLINE_EXCEEDED,
116 system.WaitMany(
117 [(handle0, system.HANDLE_SIGNAL_READABLE),
118 (handle1, system.HANDLE_SIGNAL_READABLE)], 0))
119
120 handle0.WriteMessage()
121
122 self.assertEquals(
123 1,
124 system.WaitMany(
125 [(handle0, system.HANDLE_SIGNAL_READABLE),
126 (handle1, system.HANDLE_SIGNAL_READABLE)],
127 system.DEADLINE_INDEFINITE))
128
129 def testSendBytesOverMessagePipe(self):
130 handles = system.MessagePipe()
131 data = _GetRandomBuffer(DATA_SIZE)
132 handles.handle0.WriteMessage(data)
133 (res, buffers, next_message) = handles.handle1.ReadMessage()
134 self.assertEquals(system.RESULT_RESOURCE_EXHAUSTED, res)
135 self.assertEquals(None, buffers)
136 self.assertEquals((DATA_SIZE, 0), next_message)
137 result = bytearray(DATA_SIZE)
138 (res, buffers, next_message) = handles.handle1.ReadMessage(result)
139 self.assertEquals(system.RESULT_OK, res)
140 self.assertEquals(None, next_message)
141 self.assertEquals((data, []), buffers)
142
143 def testSendEmptyDataOverMessagePipe(self):
144 handles = system.MessagePipe()
145 handles.handle0.WriteMessage(None)
146 (res, buffers, next_message) = handles.handle1.ReadMessage()
147
148 self.assertEquals(system.RESULT_OK, res)
149 self.assertEquals(None, next_message)
150 self.assertEquals((None, []), buffers)
151
152 def testSendHandleOverMessagePipe(self):
153 handles = system.MessagePipe()
154 handles_to_send = system.MessagePipe()
155 handles.handle0.WriteMessage(handles=[handles_to_send.handle0,
156 handles_to_send.handle1])
157 (res, buffers, next_message) = handles.handle1.ReadMessage(
158 max_number_of_handles=2)
159
160 self.assertFalse(handles_to_send.handle0.IsValid())
161 self.assertFalse(handles_to_send.handle1.IsValid())
162 self.assertEquals(system.RESULT_OK, res)
163 self.assertEquals(None, next_message)
164 self.assertEquals(None, buffers[0])
165 self.assertEquals(2, len(buffers[1]))
166
167 handles = buffers[1]
168 for handle in handles:
169 self.assertTrue(handle.IsValid())
170 (res, buffers, next_message) = handle.ReadMessage()
171 self.assertEquals(system.RESULT_SHOULD_WAIT, res)
172
173 for handle in handles:
174 handle.WriteMessage()
175
176 for handle in handles:
177 (res, buffers, next_message) = handle.ReadMessage()
178 self.assertEquals(system.RESULT_OK, res)
179
180 def _TestDataHandleCreation(self, handles):
181 self._testHandlesCreation(
182 handles.producer_handle, handles.consumer_handle)
183
184 def testCreateDataPipe(self):
185 self._TestDataHandleCreation(system.DataPipe())
186
187 def testCreateDataPipeWithNoneOptions(self):
188 self._TestDataHandleCreation(system.DataPipe(None))
189
190 def testCreateDataPipeWithDefaultOptions(self):
191 self._TestDataHandleCreation(
192 system.DataPipe(system.CreateDataPipeOptions()))
193
194 def testCreateDataPipeWithDiscardFlag(self):
195 options = system.CreateDataPipeOptions()
196 options.flags = system.CreateDataPipeOptions.FLAG_MAY_DISCARD
197 self._TestDataHandleCreation(system.DataPipe(options))
198
199 def testCreateDataPipeWithElementSize(self):
200 options = system.CreateDataPipeOptions()
201 options.element_num_bytes = 5
202 self._TestDataHandleCreation(system.DataPipe(options))
203
204 def testCreateDataPipeWithCapacity(self):
205 options = system.CreateDataPipeOptions()
206 options.element_capacity_num_bytes = DATA_SIZE
207 self._TestDataHandleCreation(system.DataPipe(options))
208
209 def testCreateDataPipeWithIncorrectParameters(self):
210 options = system.CreateDataPipeOptions()
211 options.element_num_bytes = 5
212 options.capacity_num_bytes = DATA_SIZE
213 with self.assertRaises(system.MojoException) as cm:
214 self._TestDataHandleCreation(system.DataPipe(options))
215 self.assertEquals(system.RESULT_INVALID_ARGUMENT, cm.exception.mojo_result)
216
217 def testSendEmptyDataOverDataPipe(self):
218 pipes = system.DataPipe()
219 self.assertEquals((system.RESULT_OK, 0), pipes.producer_handle.WriteData())
220 self.assertEquals(
221 (system.RESULT_OK, None), pipes.consumer_handle.ReadData())
222
223 def testSendDataOverDataPipe(self):
224 pipes = system.DataPipe()
225 data = _GetRandomBuffer(DATA_SIZE)
226 self.assertEquals((system.RESULT_OK, DATA_SIZE),
227 pipes.producer_handle.WriteData(data))
228 self.assertEquals((system.RESULT_OK, data),
229 pipes.consumer_handle.ReadData(bytearray(DATA_SIZE)))
230
231 def testTwoPhaseWriteOnDataPipe(self):
232 pipes = system.DataPipe()
233 (res, buf) = pipes.producer_handle.BeginWriteData(DATA_SIZE)
234 self.assertEquals(system.RESULT_OK, res)
235 self.assertGreaterEqual(len(buf.buffer), DATA_SIZE)
236 data = _GetRandomBuffer(DATA_SIZE)
237 buf.buffer[0:DATA_SIZE] = data
238 self.assertEquals(system.RESULT_OK, buf.End(DATA_SIZE))
239 self.assertEquals((system.RESULT_OK, data),
240 pipes.consumer_handle.ReadData(bytearray(DATA_SIZE)))
241
242 def testTwoPhaseReadOnDataPipe(self):
243 pipes = system.DataPipe()
244 data = _GetRandomBuffer(DATA_SIZE)
245 self.assertEquals((system.RESULT_OK, DATA_SIZE),
246 pipes.producer_handle.WriteData(data))
247 (res, buf) = pipes.consumer_handle.BeginReadData()
248 self.assertEquals(system.RESULT_OK, res)
249 self.assertEquals(DATA_SIZE, len(buf.buffer))
250 self.assertEquals(data, buf.buffer)
251 self.assertEquals(system.RESULT_OK, buf.End(DATA_SIZE))
252
253 def testCreateSharedBuffer(self):
254 self._testHandlesCreation(system.CreateSharedBuffer(DATA_SIZE))
255
256 def testCreateSharedBufferWithNoneOptions(self):
257 self._testHandlesCreation(system.CreateSharedBuffer(DATA_SIZE, None))
258
259 def testCreateSharedBufferWithDefaultOptions(self):
260 self._testHandlesCreation(
261 system.CreateSharedBuffer(
262 DATA_SIZE,
263 system.CreateSharedBufferOptions()))
264
265 def testDuplicateSharedBuffer(self):
266 handle = system.CreateSharedBuffer(DATA_SIZE)
267 self._testHandlesCreation(handle.Duplicate())
268
269 def testDuplicateSharedBufferWithNoneOptions(self):
270 handle = system.CreateSharedBuffer(DATA_SIZE)
271 self._testHandlesCreation(handle.Duplicate(None))
272
273 def testDuplicateSharedBufferWithDefaultOptions(self):
274 handle = system.CreateSharedBuffer(DATA_SIZE)
275 self._testHandlesCreation(
276 handle.Duplicate(system.DuplicateSharedBufferOptions()))
277
278 def testSendBytesOverSharedBuffer(self):
279 handle = system.CreateSharedBuffer(DATA_SIZE)
280 duplicated = handle.Duplicate()
281 data = _GetRandomBuffer(DATA_SIZE)
282 (res1, buf1) = handle.Map(0, DATA_SIZE)
283 (res2, buf2) = duplicated.Map(0, DATA_SIZE)
284 self.assertEquals(system.RESULT_OK, res1)
285 self.assertEquals(system.RESULT_OK, res2)
286 self.assertEquals(DATA_SIZE, len(buf1.buffer))
287 self.assertEquals(DATA_SIZE, len(buf2.buffer))
288 self.assertEquals(buf1.buffer, buf2.buffer)
289
290 buf1.buffer[:] = data
291 self.assertEquals(data, buf1.buffer)
292 self.assertEquals(data, buf2.buffer)
293 self.assertEquals(buf1.buffer, buf2.buffer)
OLDNEW
« no previous file with comments | « mojo/python/tests/runloop_unittest.py ('k') | mojo/tools/mojob.sh » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698