Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: mojo/edk/system/data_pipe_unittest.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: move to mojo::edk namespace in preparation for runtim flag Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //#include "mojo/edk/system/data_pipe_impl.h"
6
7 #include <stdint.h>
8
9 #include "base/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "mojo/edk/embedder/platform_channel_pair.h"
15 #include "mojo/edk/embedder/simple_platform_support.h"
16 #include "mojo/edk/system/memory.h"
17 #include "mojo/edk/system/test_utils.h"
18 #include "mojo/edk/system/waiter.h"
19 #include "mojo/public/c/system/data_pipe.h"
20 #include "mojo/public/c/system/functions.h"
21 #include "mojo/public/cpp/system/macros.h"
22 #include "testing/gtest/include/gtest/gtest.h"
23
24 namespace mojo {
25 namespace edk {
26 namespace {
27
28 const MojoHandleSignals kSignalAll = MOJO_HANDLE_SIGNAL_READABLE |
Ken Rockot(use gerrit already) 2015/09/23 22:32:17 unused
29 MOJO_HANDLE_SIGNAL_WRITABLE |
30 MOJO_HANDLE_SIGNAL_PEER_CLOSED;
31 const uint32_t kSizeOfOptions =
32 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
33
34 // In various places, we have to poll (since, e.g., we can't yet wait for a
35 // certain amount of data to be available). This is the maximum number of
36 // iterations (separated by a short sleep).
37 // TODO(vtl): Get rid of this.
38 const size_t kMaxPoll = 100;
39
40 class DataPipeTest : public test::MojoSystemTest {
41 public:
42 DataPipeTest() : producer_(MOJO_HANDLE_INVALID),
43 consumer_(MOJO_HANDLE_INVALID) {}
44
45 ~DataPipeTest() override {
46 if (producer_ != MOJO_HANDLE_INVALID)
47 CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
48 if (consumer_ != MOJO_HANDLE_INVALID)
49 CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
50 }
51
52 MojoResult Create(const MojoCreateDataPipeOptions* options) {
53 return MojoCreateDataPipe(options, &producer_, &consumer_);
54 }
55
56 MojoResult WriteData(const void* elements,
57 uint32_t* num_bytes,
58 bool all_or_none = false) {
59 return MojoWriteData(producer_, elements, num_bytes,
60 all_or_none ? MOJO_READ_DATA_FLAG_ALL_OR_NONE :
61 MOJO_WRITE_DATA_FLAG_NONE);
62 }
63
64 MojoResult ReadData(void* elements,
65 uint32_t* num_bytes,
66 bool all_or_none = false,
67 bool peek = false) {
68 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
69 if (all_or_none)
70 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
71 if (peek)
72 flags |= MOJO_READ_DATA_FLAG_PEEK;
73 return MojoReadData(consumer_, elements, num_bytes, flags);
74 }
75
76 MojoResult QueryData(uint32_t* num_bytes) {
77 return MojoReadData(consumer_, nullptr, num_bytes,
78 MOJO_READ_DATA_FLAG_QUERY);
79 }
80
81 MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
82 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
83 if (all_or_none)
84 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
85 return MojoReadData(consumer_, nullptr, num_bytes, flags);
86 }
87
88 MojoResult BeginReadData(const void** elements,
89 uint32_t* num_bytes,
90 bool all_or_none = false) {
91 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
92 if (all_or_none)
93 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
94 return MojoBeginReadData(consumer_, elements, num_bytes, flags);
95 }
96
97 MojoResult EndReadData(uint32_t num_bytes_read) {
98 return MojoEndReadData(consumer_, num_bytes_read);
99 }
100
101 MojoResult BeginWriteData(void** elements,
102 uint32_t* num_bytes,
103 bool all_or_none = false) {
104 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
105 if (all_or_none)
106 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
107 return MojoBeginWriteData(producer_, elements, num_bytes, flags);
108 }
109
110 MojoResult EndWriteData(uint32_t num_bytes_written) {
111 return MojoEndWriteData(producer_, num_bytes_written);
112 }
113
114 MojoResult CloseProducer() {
115 MojoResult rv = MojoClose(producer_);
116 producer_ = MOJO_HANDLE_INVALID;
117 return rv;
118 }
119
120 MojoResult CloseConsumer() {
121 MojoResult rv = MojoClose(consumer_);
122 consumer_ = MOJO_HANDLE_INVALID;
123 return rv;
124 }
125
126 MojoHandle producer_, consumer_;
127
128 private:
129 MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
130 };
131
132 TEST_F(DataPipeTest, Basic) {
133 const MojoCreateDataPipeOptions options = {
134 kSizeOfOptions, // |struct_size|.
135 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
136 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
137 1000 * sizeof(int32_t) // |capacity_num_bytes|.
138 };
139
140 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
141
142 // We can write to a data pipe handle immediately.
143 int32_t elements[10] = {};
144 uint32_t num_bytes = 0;
145
146 num_bytes =
147 static_cast<uint32_t>(MOJO_ARRAYSIZE(elements) * sizeof(elements[0]));
148
149 elements[0] = 123;
150 elements[1] = 456;
151 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
152 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
153
154 // Now wait for the other side to become readable.
155 MojoHandleSignalsState state;
156 ASSERT_EQ(MOJO_RESULT_OK,
157 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
158 MOJO_DEADLINE_INDEFINITE, &state));
159 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, state.satisfied_signals);
160
161 elements[0] = -1;
162 elements[1] = -1;
163 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
164 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
165 ASSERT_EQ(elements[0], 123);
166 ASSERT_EQ(elements[1], 456);
167 }
168
169 // Tests creation of data pipes with various (valid) options.
170 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
171 MojoCreateDataPipeOptions test_options[] = {
172 // Default options.
173 {},
174 // Trivial element size, non-default capacity.
175 {kSizeOfOptions, // |struct_size|.
176 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
177 1, // |element_num_bytes|.
178 1000}, // |capacity_num_bytes|.
179 // Nontrivial element size, non-default capacity.
180 {kSizeOfOptions, // |struct_size|.
181 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
182 4, // |element_num_bytes|.
183 4000}, // |capacity_num_bytes|.
184 // Nontrivial element size, default capacity.
185 {kSizeOfOptions, // |struct_size|.
186 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
187 100, // |element_num_bytes|.
188 0} // |capacity_num_bytes|.
189 };
190 for (size_t i = 0; i < arraysize(test_options); i++) {
191 MojoHandle producer_handle, consumer_handle;
192 MojoCreateDataPipeOptions* options =
193 i ? &test_options[i] : nullptr;
194 ASSERT_EQ(MOJO_RESULT_OK,
195 MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
196 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
197 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
198 }
199 }
200
201
202 TEST_F(DataPipeTest, SimpleReadWrite) {
203 const MojoCreateDataPipeOptions options = {
204 kSizeOfOptions, // |struct_size|.
205 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
206 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
207 1000 * sizeof(int32_t) // |capacity_num_bytes|.
208 };
209
210 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
211 MojoHandleSignalsState hss;
212
213 int32_t elements[10] = {};
214 uint32_t num_bytes = 0;
215
216 // Try reading; nothing there yet.
217 num_bytes =
218 static_cast<uint32_t>(MOJO_ARRAYSIZE(elements) * sizeof(elements[0]));
219 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
220
221 // Query; nothing there yet.
222 num_bytes = 0;
223 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
224 ASSERT_EQ(0u, num_bytes);
225
226 // Discard; nothing there yet.
227 num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
228 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
229
230 // Read with invalid |num_bytes|.
231 num_bytes = sizeof(elements[0]) + 1;
232 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
233
234 // Write two elements.
235 elements[0] = 123;
236 elements[1] = 456;
237 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
238 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
239 // It should have written everything (even without "all or none").
240 ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
241
242 // Wait.
243 ASSERT_EQ(MOJO_RESULT_OK,
244 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
245 MOJO_DEADLINE_INDEFINITE, &hss));
246 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
247 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
248 hss.satisfiable_signals);
249
250 // Query.
251 // TODO(vtl): It's theoretically possible (though not with the current
252 // implementation/configured limits) that not all the data has arrived yet.
253 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
254 // or |2 * ...|.)
255 num_bytes = 0;
256 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
257 ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
258
259 // Read one element.
260 elements[0] = -1;
261 elements[1] = -1;
262 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
263 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
264 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
265 ASSERT_EQ(123, elements[0]);
266 ASSERT_EQ(-1, elements[1]);
267
268 // Query.
269 // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
270 // should get 1 here.)
271 num_bytes = 0;
272 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
273 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
274
275 // Peek one element.
276 elements[0] = -1;
277 elements[1] = -1;
278 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
279 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
280 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
281 ASSERT_EQ(456, elements[0]);
282 ASSERT_EQ(-1, elements[1]);
283
284 // Query. Still has 1 element remaining.
285 num_bytes = 0;
286 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
287 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
288
289 // Try to read two elements, with "all or none".
290 elements[0] = -1;
291 elements[1] = -1;
292 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
293 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
294 ReadData(elements, &num_bytes, true, false));
295 ASSERT_EQ(-1, elements[0]);
296 ASSERT_EQ(-1, elements[1]);
297
298 // Try to read two elements, without "all or none".
299 elements[0] = -1;
300 elements[1] = -1;
301 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
302 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
303 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
304 ASSERT_EQ(456, elements[0]);
305 ASSERT_EQ(-1, elements[1]);
306
307 // Query.
308 num_bytes = 0;
309 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
310 ASSERT_EQ(0u, num_bytes);
311 }
312
313 // Note: The "basic" waiting tests test that the "wait states" are correct in
314 // various situations; they don't test that waiters are properly awoken on state
315 // changes. (For that, we need to use multiple threads.)
316 TEST_F(DataPipeTest, BasicProducerWaiting) {
317 // Note: We take advantage of the fact that current for current
318 // implementations capacities are strict maximums. This is not guaranteed by
319 // the API.
320
321 const MojoCreateDataPipeOptions options = {
322 kSizeOfOptions, // |struct_size|.
323 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
324 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
325 2 * sizeof(int32_t) // |capacity_num_bytes|.
326 };
327 Create(&options);
328 MojoHandleSignalsState hss;
329
330 // Never readable.
331 hss = MojoHandleSignalsState();
332 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
333 MojoWait(producer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
334 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
335 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
336 hss.satisfiable_signals);
337
338 // Already writable.
339 hss = MojoHandleSignalsState();
340 ASSERT_EQ(MOJO_RESULT_OK,
341 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
342
343 // Write two elements.
344 int32_t elements[2] = {123, 456};
345 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
346 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
347 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
348
349 // Wait for data to become available to the consumer.
350 ASSERT_EQ(MOJO_RESULT_OK,
351 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
352 MOJO_DEADLINE_INDEFINITE, &hss));
353 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
354 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
355 hss.satisfiable_signals);
356
357 // Peek one element.
358 elements[0] = -1;
359 elements[1] = -1;
360 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
361 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
362 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
363 ASSERT_EQ(123, elements[0]);
364 ASSERT_EQ(-1, elements[1]);
365
366 // Read one element.
367 elements[0] = -1;
368 elements[1] = -1;
369 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
370 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
371 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
372 ASSERT_EQ(123, elements[0]);
373 ASSERT_EQ(-1, elements[1]);
374
375 // Try writing, using a two-phase write.
376 void* buffer = nullptr;
377 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
378 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
379 EXPECT_TRUE(buffer);
380 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
381
382 static_cast<int32_t*>(buffer)[0] = 789;
383 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(static_cast<uint32_t>(
384 1u * sizeof(elements[0]))));
385
386 // Read one element, using a two-phase read.
387 const void* read_buffer = nullptr;
388 num_bytes = 0u;
389 ASSERT_EQ(MOJO_RESULT_OK,
390 BeginReadData(&read_buffer, &num_bytes, false));
391 EXPECT_TRUE(read_buffer);
392 // Since we only read one element (after having written three in all), the
393 // two-phase read should only allow us to read one. This checks an
394 // implementation detail!
395 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
396 ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
397 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(static_cast<uint32_t>(
398 1u * sizeof(elements[0]))));
399
400 // Write one element.
401 elements[0] = 123;
402 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
403 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
404 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
405
406 // Close the consumer.
407 CloseConsumer();
408
409 // It should now be never-writable.
410 hss = MojoHandleSignalsState();
411 ASSERT_EQ(MOJO_RESULT_OK,
412 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
413 MOJO_DEADLINE_INDEFINITE, &hss));
414 hss = MojoHandleSignalsState();
415 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
416 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
417 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
418 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
419 }
420
421 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
422 const MojoCreateDataPipeOptions options = {
423 kSizeOfOptions, // |struct_size|.
424 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
425 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
426 2 * sizeof(int32_t) // |capacity_num_bytes|.
427 };
428 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
429 MojoHandleSignalsState hss;
430
431 // Close the consumer.
432 CloseConsumer();
433
434 // It should be signaled.
435 hss = MojoHandleSignalsState();
436 ASSERT_EQ(MOJO_RESULT_OK,
437 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
438 MOJO_DEADLINE_INDEFINITE, &hss));
439 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
440 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
441 }
442
443 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
444 const MojoCreateDataPipeOptions options = {
445 kSizeOfOptions, // |struct_size|.
446 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
447 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
448 2 * sizeof(int32_t) // |capacity_num_bytes|.
449 };
450 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
451 MojoHandleSignalsState hss;
452
453 // Close the producer.
454 CloseProducer();
455
456 // It should be signaled.
457 hss = MojoHandleSignalsState();
458 ASSERT_EQ(MOJO_RESULT_OK,
459 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
460 MOJO_DEADLINE_INDEFINITE, &hss));
461 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
462 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
463 }
464
465 TEST_F(DataPipeTest, BasicConsumerWaiting) {
466 const MojoCreateDataPipeOptions options = {
467 kSizeOfOptions, // |struct_size|.
468 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
469 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
470 1000 * sizeof(int32_t) // |capacity_num_bytes|.
471 };
472 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
473 MojoHandleSignalsState hss;
474
475 // Never writable.
476 hss = MojoHandleSignalsState();
477 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
478 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE,
479 MOJO_DEADLINE_INDEFINITE, &hss));
480 ASSERT_EQ(0u, hss.satisfied_signals);
481 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
482 hss.satisfiable_signals);
483
484 // Write two elements.
485 int32_t elements[2] = {123, 456};
486 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
487 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
488
489 // Wait for readability.
490 hss = MojoHandleSignalsState();
491 ASSERT_EQ(MOJO_RESULT_OK,
492 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
493 MOJO_DEADLINE_INDEFINITE, &hss));
494 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
495 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
496 hss.satisfiable_signals);
497
498 // Discard one element.
499 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
500 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
501 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
502
503 // Should still be readable.
504 hss = MojoHandleSignalsState();
505 ASSERT_EQ(MOJO_RESULT_OK,
506 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
507 MOJO_DEADLINE_INDEFINITE, &hss));
508 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
509 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
510 hss.satisfiable_signals);
511
512 // Peek one element.
513 elements[0] = -1;
514 elements[1] = -1;
515 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
516 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
517 ASSERT_EQ(456, elements[0]);
518 ASSERT_EQ(-1, elements[1]);
519
520 // Should still be readable.
521 hss = MojoHandleSignalsState();
522 ASSERT_EQ(MOJO_RESULT_OK,
523 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
524 MOJO_DEADLINE_INDEFINITE, &hss));
525 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
526 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
527 hss.satisfiable_signals);
528
529 // Read one element.
530 elements[0] = -1;
531 elements[1] = -1;
532 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
533 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
534 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
535 ASSERT_EQ(456, elements[0]);
536 ASSERT_EQ(-1, elements[1]);
537
538 // Write one element.
539 elements[0] = 789;
540 elements[1] = -1;
541 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
542 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
543
544 // Waiting should now succeed.
545 hss = MojoHandleSignalsState();
546 ASSERT_EQ(MOJO_RESULT_OK,
547 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
548 MOJO_DEADLINE_INDEFINITE, &hss));
549 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
550 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
551 hss.satisfiable_signals);
552
553 // Close the producer.
554 CloseProducer();
555
556 // Should still be readable.
557 hss = MojoHandleSignalsState();
558 ASSERT_EQ(MOJO_RESULT_OK,
559 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
560 MOJO_DEADLINE_INDEFINITE, &hss));
561 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
562 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
563 hss.satisfiable_signals);
564
565 // Wait for the peer closed signal.
566 hss = MojoHandleSignalsState();
567 ASSERT_EQ(MOJO_RESULT_OK,
568 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
569 MOJO_DEADLINE_INDEFINITE, &hss));
570 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0);
571 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
572 hss.satisfiable_signals);
573
574 // Read one element.
575 elements[0] = -1;
576 elements[1] = -1;
577 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
578 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
579 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
580 ASSERT_EQ(789, elements[0]);
581 ASSERT_EQ(-1, elements[1]);
582
583 // Should be never-readable.
584 hss = MojoHandleSignalsState();
585 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
586 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
587 MOJO_DEADLINE_INDEFINITE, &hss));
588 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
589 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
590 }
591
592 // Test with two-phase APIs and also closing the producer with an active
593 // consumer waiter.
594 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
595 const MojoCreateDataPipeOptions options = {
596 kSizeOfOptions, // |struct_size|.
597 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
598 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
599 1000 * sizeof(int32_t) // |capacity_num_bytes|.
600 };
601 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
602 MojoHandleSignalsState hss;
603
604 // Write two elements.
605 int32_t* elements = nullptr;
606 void* buffer = nullptr;
607 // Request room for three (but we'll only write two).
608 uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
609 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes, true));
610 EXPECT_TRUE(buffer);
611 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
612 elements = static_cast<int32_t*>(buffer);
613 elements[0] = 123;
614 elements[1] = 456;
615 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
616
617 // Wait for readability.
618 hss = MojoHandleSignalsState();
619 ASSERT_EQ(MOJO_RESULT_OK,
620 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
621 MOJO_DEADLINE_INDEFINITE, &hss));
622 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
623 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
624 hss.satisfiable_signals);
625
626 // Read one element.
627 // Request two in all-or-none mode, but only read one.
628 const void* read_buffer = nullptr;
629 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
630 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes, true));
631 EXPECT_TRUE(read_buffer);
632 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
633 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
634 ASSERT_EQ(123, read_elements[0]);
635 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
636
637 // Should still be readable.
638 hss = MojoHandleSignalsState();
639 ASSERT_EQ(MOJO_RESULT_OK,
640 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
641 MOJO_DEADLINE_INDEFINITE, &hss));
642 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
643 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
644 hss.satisfiable_signals);
645
646 // Read one element.
647 // Request three, but not in all-or-none mode.
648 read_buffer = nullptr;
649 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
650 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
651 EXPECT_TRUE(read_buffer);
652 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
653 read_elements = static_cast<const int32_t*>(read_buffer);
654 ASSERT_EQ(456, read_elements[0]);
655 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
656
657 // Close the producer.
658 CloseProducer();
659
660 // Should be never-readable.
661 hss = MojoHandleSignalsState();
662 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
663 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
664 MOJO_DEADLINE_INDEFINITE, &hss));
665 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
666 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
667 }
668
669 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
670 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
671 const MojoCreateDataPipeOptions options = {
672 kSizeOfOptions, // |struct_size|.
673 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
674 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
675 1000 * sizeof(int32_t) // |capacity_num_bytes|.
676 };
677 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
678 MojoHandleSignalsState hss;
679
680 // It should be writable.
681 hss = MojoHandleSignalsState();
682 ASSERT_EQ(MOJO_RESULT_OK,
683 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
684 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
685 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
686 hss.satisfiable_signals);
687
688 uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
689 void* write_ptr = nullptr;
690 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
691 EXPECT_TRUE(write_ptr);
692 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
693
694 // At this point, it shouldn't be writable.
695 hss = MojoHandleSignalsState();
696 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
697 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
698 ASSERT_EQ(0u, hss.satisfied_signals);
699 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
700 hss.satisfiable_signals);
701
702 // It shouldn't be readable yet either (we'll wait later).
703 hss = MojoHandleSignalsState();
704 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
705 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
706 ASSERT_EQ(0u, hss.satisfied_signals);
707 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
708 hss.satisfiable_signals);
709
710 static_cast<int32_t*>(write_ptr)[0] = 123;
711 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
712
713 // It should immediately be writable again.
714 hss = MojoHandleSignalsState();
715 ASSERT_EQ(MOJO_RESULT_OK,
716 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
717 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
718 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
719 hss.satisfiable_signals);
720
721 // It should become readable.
722 hss = MojoHandleSignalsState();
723 ASSERT_EQ(MOJO_RESULT_OK,
724 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
725 MOJO_DEADLINE_INDEFINITE, &hss));
726 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
727 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
728 hss.satisfiable_signals);
729
730 // Start another two-phase write and check that it's readable even in the
731 // middle of it.
732 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
733 write_ptr = nullptr;
734 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
735 EXPECT_TRUE(write_ptr);
736 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
737
738 // It should be readable.
739 hss = MojoHandleSignalsState();
740 ASSERT_EQ(MOJO_RESULT_OK,
741 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
742 MOJO_DEADLINE_INDEFINITE, &hss));
743 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
744 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
745 hss.satisfiable_signals);
746
747 // End the two-phase write without writing anything.
748 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
749
750 // Start a two-phase read.
751 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
752 const void* read_ptr = nullptr;
753 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
754 EXPECT_TRUE(read_ptr);
755 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
756
757 // At this point, it should still be writable.
758 hss = MojoHandleSignalsState();
759 ASSERT_EQ(MOJO_RESULT_OK,
760 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
761 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
762 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
763 hss.satisfiable_signals);
764
765 // But not readable.
766 hss = MojoHandleSignalsState();
767 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
768 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
769 ASSERT_EQ(0u, hss.satisfied_signals);
770 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
771 hss.satisfiable_signals);
772
773 // End the two-phase read without reading anything.
774 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
775
776 // It should be readable again.
777 hss = MojoHandleSignalsState();
778 ASSERT_EQ(MOJO_RESULT_OK,
779 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
780 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
781 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
782 hss.satisfiable_signals);
783 }
784
785 void Seq(int32_t start, size_t count, int32_t* out) {
786 for (size_t i = 0; i < count; i++)
787 out[i] = start + static_cast<int32_t>(i);
788 }
789
790 TEST_F(DataPipeTest, AllOrNone) {
791 const MojoCreateDataPipeOptions options = {
792 kSizeOfOptions, // |struct_size|.
793 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
794 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
795 10 * sizeof(int32_t) // |capacity_num_bytes|.
796 };
797 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
798 MojoHandleSignalsState hss;
799
800 // Try writing way too much.
801 uint32_t num_bytes = 20u * sizeof(int32_t);
802 int32_t buffer[100];
803 Seq(0, MOJO_ARRAYSIZE(buffer), buffer);
804 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
805
806 // Should still be empty.
807 num_bytes = ~0u;
808 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
809 ASSERT_EQ(0u, num_bytes);
810
811 // Write some data.
812 num_bytes = 5u * sizeof(int32_t);
813 Seq(100, MOJO_ARRAYSIZE(buffer), buffer);
814 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
815 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
816
817 // Wait for data.
818 // TODO(vtl): There's no real guarantee that all the data will become
819 // available at once (except that in current implementations, with reasonable
820 // limits, it will). Eventually, we'll be able to wait for a specified amount
821 // of data to become available.
822 hss = MojoHandleSignalsState();
823 ASSERT_EQ(MOJO_RESULT_OK,
824 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
825 MOJO_DEADLINE_INDEFINITE, &hss));
826 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
827 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
828 hss.satisfiable_signals);
829
830 // Half full.
831 num_bytes = 0u;
832 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
833 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
834
835 /* TODO(jam): enable if we end up observing max capacity
836 // Too much.
837 num_bytes = 6u * sizeof(int32_t);
838 Seq(200, MOJO_ARRAYSIZE(buffer), buffer);
839 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
840 */
841
842 // Try reading too much.
843 num_bytes = 11u * sizeof(int32_t);
844 memset(buffer, 0xab, sizeof(buffer));
845 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
846 int32_t expected_buffer[100];
847 memset(expected_buffer, 0xab, sizeof(expected_buffer));
848 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
849
850 // Try discarding too much.
851 num_bytes = 11u * sizeof(int32_t);
852 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
853
854 // Just a little.
855 num_bytes = 2u * sizeof(int32_t);
856 Seq(300, MOJO_ARRAYSIZE(buffer), buffer);
857 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
858 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
859
860 // Just right.
861 num_bytes = 3u * sizeof(int32_t);
862 Seq(400, MOJO_ARRAYSIZE(buffer), buffer);
863 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
864 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
865
866 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
867 // specified amount of data to be available, so poll.
868 for (size_t i = 0; i < kMaxPoll; i++) {
869 num_bytes = 0u;
870 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
871 if (num_bytes >= 10u * sizeof(int32_t))
872 break;
873
874 test::Sleep(test::EpsilonDeadline());
875 }
876 ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
877
878 // Read half.
879 num_bytes = 5u * sizeof(int32_t);
880 memset(buffer, 0xab, sizeof(buffer));
881 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
882 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
883 memset(expected_buffer, 0xab, sizeof(expected_buffer));
884 Seq(100, 5, expected_buffer);
885 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
886
887 // Try reading too much again.
888 num_bytes = 6u * sizeof(int32_t);
889 memset(buffer, 0xab, sizeof(buffer));
890 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
891 memset(expected_buffer, 0xab, sizeof(expected_buffer));
892 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
893
894 // Try discarding too much again.
895 num_bytes = 6u * sizeof(int32_t);
896 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
897
898 // Discard a little.
899 num_bytes = 2u * sizeof(int32_t);
900 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
901 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
902
903 // Three left.
904 num_bytes = 0u;
905 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
906 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
907
908 // Close the producer, then test producer-closed cases.
909 CloseProducer();
910
911 // Wait.
912 hss = MojoHandleSignalsState();
913 ASSERT_EQ(MOJO_RESULT_OK,
914 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
915 MOJO_DEADLINE_INDEFINITE, &hss));
916 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
917 hss.satisfied_signals);
918 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
919 hss.satisfiable_signals);
920
921 // Try reading too much; "failed precondition" since the producer is closed.
922 num_bytes = 4u * sizeof(int32_t);
923 memset(buffer, 0xab, sizeof(buffer));
924 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
925 ReadData(buffer, &num_bytes, true));
926 memset(expected_buffer, 0xab, sizeof(expected_buffer));
927 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
928
929 // Try discarding too much; "failed precondition" again.
930 num_bytes = 4u * sizeof(int32_t);
931 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
932
933 // Read a little.
934 num_bytes = 2u * sizeof(int32_t);
935 memset(buffer, 0xab, sizeof(buffer));
936 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
937 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
938 memset(expected_buffer, 0xab, sizeof(expected_buffer));
939 Seq(400, 2, expected_buffer);
940 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
941
942 // Discard the remaining element.
943 num_bytes = 1u * sizeof(int32_t);
944 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
945 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
946
947 // Empty again.
948 num_bytes = ~0u;
949 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
950 ASSERT_EQ(0u, num_bytes);
951 }
952
953 TEST_F(DataPipeTest, DISABLED_TwoPhaseAllOrNone) {
954 const MojoCreateDataPipeOptions options = {
955 kSizeOfOptions, // |struct_size|.
956 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
957 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
958 10 * sizeof(int32_t) // |capacity_num_bytes|.
959 };
960 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
961 MojoHandleSignalsState hss;
962
963 // Try writing way too much (two-phase).
964 uint32_t num_bytes = 20u * sizeof(int32_t);
965 void* write_ptr = nullptr;
966 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
967 BeginWriteData(&write_ptr, &num_bytes, true));
968
969 // Try writing an amount which isn't a multiple of the element size
970 // (two-phase).
971 static_assert(sizeof(int32_t) > 1u, "Wow! int32_t's have size 1");
972 num_bytes = 1u;
973 write_ptr = nullptr;
974 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
975 BeginWriteData(&write_ptr, &num_bytes, true));
976
977 // Try reading way too much (two-phase).
978 num_bytes = 20u * sizeof(int32_t);
979 const void* read_ptr = nullptr;
980 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
981 BeginReadData(&read_ptr, &num_bytes, true));
982
983 // Write half (two-phase).
984 num_bytes = 5u * sizeof(int32_t);
985 write_ptr = nullptr;
986 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes, true));
987 // May provide more space than requested.
988 EXPECT_GE(num_bytes, 5u * sizeof(int32_t));
989 EXPECT_TRUE(write_ptr);
990 Seq(0, 5, static_cast<int32_t*>(write_ptr));
991 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(5u * sizeof(int32_t)));
992
993 // Wait for data.
994 // TODO(vtl): (See corresponding TODO in AllOrNone.)
995 hss = MojoHandleSignalsState();
996 ASSERT_EQ(MOJO_RESULT_OK,
997 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
998 MOJO_DEADLINE_INDEFINITE, &hss));
999 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1000 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1001 hss.satisfiable_signals);
1002
1003 // Try reading an amount which isn't a multiple of the element size
1004 // (two-phase).
1005 num_bytes = 1u;
1006 read_ptr = nullptr;
1007 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1008 BeginReadData(&read_ptr, &num_bytes, true));
1009
1010 // Read one (two-phase).
1011 num_bytes = 1u * sizeof(int32_t);
1012 read_ptr = nullptr;
1013 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes, true));
1014 EXPECT_GE(num_bytes, 1u * sizeof(int32_t));
1015 ASSERT_EQ(0, static_cast<const int32_t*>(read_ptr)[0]);
1016 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(int32_t)));
1017
1018 // We should have four left, leaving room for six.
1019 num_bytes = 0u;
1020 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1021 ASSERT_EQ(4u * sizeof(int32_t), num_bytes);
1022
1023 // Assuming a tight circular buffer of the specified capacity, we can't do a
1024 // two-phase write of six now.
1025 num_bytes = 6u * sizeof(int32_t);
1026 write_ptr = nullptr;
1027 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1028 BeginWriteData(&write_ptr, &num_bytes, true));
1029
1030 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
1031 // specified amount of space to be available, so poll.
1032 for (size_t i = 0; i < kMaxPoll; i++) {
1033 // Write six elements (simple), filling the buffer.
1034 num_bytes = 6u * sizeof(int32_t);
1035 int32_t buffer[100];
1036 Seq(100, 6, buffer);
1037 MojoResult result = WriteData(buffer, &num_bytes, true);
1038 if (result == MOJO_RESULT_OK)
1039 break;
1040 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
1041
1042 test::Sleep(test::EpsilonDeadline());
1043 }
1044 ASSERT_EQ(6u * sizeof(int32_t), num_bytes);
1045
1046 // TODO(vtl): Hack: poll again.
1047 for (size_t i = 0; i < kMaxPoll; i++) {
1048 // We have ten.
1049 num_bytes = 0u;
1050 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1051 if (num_bytes >= 10u * sizeof(int32_t))
1052 break;
1053
1054 test::Sleep(test::EpsilonDeadline());
1055 }
1056 ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
1057
1058 // Note: Whether a two-phase read of ten would fail here or not is
1059 // implementation-dependent.
1060
1061 // Close the producer.
1062 CloseProducer();
1063
1064 // A two-phase read of nine should work.
1065 num_bytes = 9u * sizeof(int32_t);
1066 read_ptr = nullptr;
1067 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes, true));
1068 EXPECT_GE(num_bytes, 9u * sizeof(int32_t));
1069 ASSERT_EQ(1, static_cast<const int32_t*>(read_ptr)[0]);
1070 ASSERT_EQ(2, static_cast<const int32_t*>(read_ptr)[1]);
1071 ASSERT_EQ(3, static_cast<const int32_t*>(read_ptr)[2]);
1072 ASSERT_EQ(4, static_cast<const int32_t*>(read_ptr)[3]);
1073 ASSERT_EQ(100, static_cast<const int32_t*>(read_ptr)[4]);
1074 ASSERT_EQ(101, static_cast<const int32_t*>(read_ptr)[5]);
1075 ASSERT_EQ(102, static_cast<const int32_t*>(read_ptr)[6]);
1076 ASSERT_EQ(103, static_cast<const int32_t*>(read_ptr)[7]);
1077 ASSERT_EQ(104, static_cast<const int32_t*>(read_ptr)[8]);
1078 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(9u * sizeof(int32_t)));
1079
1080 // Wait for peer closed.
1081 hss = MojoHandleSignalsState();
1082 ASSERT_EQ(MOJO_RESULT_OK,
1083 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1084 MOJO_DEADLINE_INDEFINITE, &hss));
1085 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1086 hss.satisfied_signals);
1087 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1088 hss.satisfiable_signals);
1089
1090 // A two-phase read of two should fail, with "failed precondition".
1091 num_bytes = 2u * sizeof(int32_t);
1092 read_ptr = nullptr;
1093 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1094 BeginReadData(&read_ptr, &num_bytes, true));
1095 }
1096
1097 /*
1098 jam: this is testing that the implementation uses a circular buffer, which we
1099 don't use currently.
1100 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
1101 // respectively, as much as possible, even if it may have to "wrap around" the
1102 // internal circular buffer. (Note that the two-phase write and read need not do
1103 // this.)
1104 TYPED_TEST(DataPipeImplTest, WrapAround) {
1105 unsigned char test_data[1000];
1106 for (size_t i = 0; i < MOJO_ARRAYSIZE(test_data); i++)
1107 test_data[i] = static_cast<unsigned char>(i);
1108
1109 const MojoCreateDataPipeOptions options = {
1110 kSizeOfOptions, // |struct_size|.
1111 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1112 1u, // |element_num_bytes|.
1113 100u // |capacity_num_bytes|.
1114 };
1115 MojoCreateDataPipeOptions validated_options = {};
1116 // This test won't be valid if |ValidateCreateOptions()| decides to give the
1117 // pipe more space.
1118 ASSERT_EQ(MOJO_RESULT_OK, DataPipe::ValidateCreateOptions(
1119 MakeUserPointer(&options), &validated_options));
1120 ASSERT_EQ(100u, validated_options.capacity_num_bytes);
1121 this->Create(options);
1122 this->DoTransfer();
1123
1124 Waiter waiter;
1125 HandleSignalsState hss;
1126
1127 // Add waiter.
1128 waiter.Init();
1129 ASSERT_EQ(MOJO_RESULT_OK,
1130 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
1131 nullptr));
1132
1133 // Write 20 bytes.
1134 uint32_t num_bytes = 20u;
1135 ASSERT_EQ(MOJO_RESULT_OK,
1136 this->ProducerWriteData(UserPointer<const void>(&test_data[0]),
1137 MakeUserPointer(&num_bytes), false));
1138 ASSERT_EQ(20u, num_bytes);
1139
1140 // Wait for data.
1141 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1142 ASSERT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
1143 hss = HandleSignalsState();
1144 this->ConsumerRemoveAwakable(&waiter, &hss);
1145 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1146 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1147 hss.satisfiable_signals);
1148
1149 // Read 10 bytes.
1150 unsigned char read_buffer[1000] = {0};
1151 num_bytes = 10u;
1152 ASSERT_EQ(MOJO_RESULT_OK,
1153 this->ConsumerReadData(UserPointer<void>(read_buffer),
1154 MakeUserPointer(&num_bytes), false, false));
1155 ASSERT_EQ(10u, num_bytes);
1156 ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
1157
1158 if (this->IsStrictCircularBuffer()) {
1159 // Check that a two-phase write can now only write (at most) 80 bytes. (This
1160 // checks an implementation detail; this behavior is not guaranteed.)
1161 void* write_buffer_ptr = nullptr;
1162 num_bytes = 0u;
1163 ASSERT_EQ(MOJO_RESULT_OK,
1164 this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
1165 MakeUserPointer(&num_bytes), false));
1166 EXPECT_TRUE(write_buffer_ptr);
1167 ASSERT_EQ(80u, num_bytes);
1168 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
1169 }
1170
1171 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1172 size_t total_num_bytes = 0;
1173 for (size_t i = 0; i < kMaxPoll; i++) {
1174 // Write as much data as we can (using |ProducerWriteData()|). We should
1175 // write 90 bytes (eventually).
1176 num_bytes = 200u;
1177 MojoResult result = this->ProducerWriteData(
1178 UserPointer<const void>(&test_data[20 + total_num_bytes]),
1179 MakeUserPointer(&num_bytes), false);
1180 if (result == MOJO_RESULT_OK) {
1181 total_num_bytes += num_bytes;
1182 if (total_num_bytes >= 90u)
1183 break;
1184 } else {
1185 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
1186 }
1187
1188 test::Sleep(test::EpsilonDeadline());
1189 }
1190 ASSERT_EQ(90u, total_num_bytes);
1191
1192 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1193 for (size_t i = 0; i < kMaxPoll; i++) {
1194 // We have 100.
1195 num_bytes = 0u;
1196 ASSERT_EQ(MOJO_RESULT_OK,
1197 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1198 if (num_bytes >= 100u)
1199 break;
1200
1201 test::Sleep(test::EpsilonDeadline());
1202 }
1203 ASSERT_EQ(100u, num_bytes);
1204
1205 if (this->IsStrictCircularBuffer()) {
1206 // Check that a two-phase read can now only read (at most) 90 bytes. (This
1207 // checks an implementation detail; this behavior is not guaranteed.)
1208 const void* read_buffer_ptr = nullptr;
1209 num_bytes = 0u;
1210 ASSERT_EQ(MOJO_RESULT_OK,
1211 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
1212 MakeUserPointer(&num_bytes), false));
1213 EXPECT_TRUE(read_buffer_ptr);
1214 ASSERT_EQ(90u, num_bytes);
1215 ASSERT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
1216 }
1217
1218 // Read as much as possible (using |ConsumerReadData()|). We should read 100
1219 // bytes.
1220 num_bytes = static_cast<uint32_t>(MOJO_ARRAYSIZE(read_buffer) *
1221 sizeof(read_buffer[0]));
1222 memset(read_buffer, 0, num_bytes);
1223 ASSERT_EQ(MOJO_RESULT_OK,
1224 this->ConsumerReadData(UserPointer<void>(read_buffer),
1225 MakeUserPointer(&num_bytes), false, false));
1226 ASSERT_EQ(100u, num_bytes);
1227 ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
1228
1229 this->ProducerClose();
1230 this->ConsumerClose();
1231 }
1232 */
1233
1234 // Tests the behavior of writing (simple and two-phase), closing the producer,
1235 // then reading (simple and two-phase).
1236 TEST_F(DataPipeTest, WriteCloseProducerRead) {
1237 const char kTestData[] = "hello world";
1238 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1239
1240 const MojoCreateDataPipeOptions options = {
1241 kSizeOfOptions, // |struct_size|.
1242 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1243 1u, // |element_num_bytes|.
1244 1000u // |capacity_num_bytes|.
1245 };
1246 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1247
1248 // Write some data, so we'll have something to read.
1249 uint32_t num_bytes = kTestDataSize;
1250 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1251 ASSERT_EQ(kTestDataSize, num_bytes);
1252
1253 // Write it again, so we'll have something left over.
1254 num_bytes = kTestDataSize;
1255 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1256 ASSERT_EQ(kTestDataSize, num_bytes);
1257
1258 // Start two-phase write.
1259 void* write_buffer_ptr = nullptr;
1260 num_bytes = 0u;
1261 ASSERT_EQ(MOJO_RESULT_OK,
1262 BeginWriteData(&write_buffer_ptr, &num_bytes, false));
1263 EXPECT_TRUE(write_buffer_ptr);
1264 EXPECT_GT(num_bytes, 0u);
1265
1266 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1267 for (size_t i = 0; i < kMaxPoll; i++) {
1268 num_bytes = 0u;
1269 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1270 if (num_bytes >= 2u * kTestDataSize)
1271 break;
1272
1273 test::Sleep(test::EpsilonDeadline());
1274 }
1275 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1276
1277 // Start two-phase read.
1278 const void* read_buffer_ptr = nullptr;
1279 num_bytes = 0u;
1280 ASSERT_EQ(MOJO_RESULT_OK,
1281 BeginReadData(&read_buffer_ptr, &num_bytes));
1282 EXPECT_TRUE(read_buffer_ptr);
1283 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1284
1285 // Close the producer.
1286 CloseProducer();
1287
1288 // The consumer can finish its two-phase read.
1289 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1290 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
1291
1292 // And start another.
1293 read_buffer_ptr = nullptr;
1294 num_bytes = 0u;
1295 ASSERT_EQ(MOJO_RESULT_OK,
1296 BeginReadData(&read_buffer_ptr, &num_bytes));
1297 EXPECT_TRUE(read_buffer_ptr);
1298 ASSERT_EQ(kTestDataSize, num_bytes);
1299 }
1300
1301
1302 // Tests the behavior of interrupting a two-phase read and write by closing the
1303 // consumer.
1304 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
1305 const char kTestData[] = "hello world";
1306 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1307
1308 const MojoCreateDataPipeOptions options = {
1309 kSizeOfOptions, // |struct_size|.
1310 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1311 1u, // |element_num_bytes|.
1312 1000u // |capacity_num_bytes|.
1313 };
1314 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1315 MojoHandleSignalsState hss;
1316
1317 // Write some data, so we'll have something to read.
1318 uint32_t num_bytes = kTestDataSize;
1319 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1320 ASSERT_EQ(kTestDataSize, num_bytes);
1321
1322 // Start two-phase write.
1323 void* write_buffer_ptr = nullptr;
1324 num_bytes = 0u;
1325 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1326 EXPECT_TRUE(write_buffer_ptr);
1327 ASSERT_GT(num_bytes, kTestDataSize);
1328
1329 // Wait for data.
1330 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1331 hss = MojoHandleSignalsState();
1332 ASSERT_EQ(MOJO_RESULT_OK,
1333 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1334 MOJO_DEADLINE_INDEFINITE, &hss));
1335 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1336 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1337 hss.satisfiable_signals);
1338
1339 // Start two-phase read.
1340 const void* read_buffer_ptr = nullptr;
1341 num_bytes = 0u;
1342 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1343 EXPECT_TRUE(read_buffer_ptr);
1344 ASSERT_EQ(kTestDataSize, num_bytes);
1345
1346 // Close the consumer.
1347 CloseConsumer();
1348
1349 // Wait for producer to know that the consumer is closed.
1350 hss = MojoHandleSignalsState();
1351 ASSERT_EQ(MOJO_RESULT_OK,
1352 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1353 MOJO_DEADLINE_INDEFINITE, &hss));
1354 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1355 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1356
1357 // Actually write some data. (Note: Premature freeing of the buffer would
1358 // probably only be detected under ASAN or similar.)
1359 memcpy(write_buffer_ptr, kTestData, kTestDataSize);
1360 // Note: Even though the consumer has been closed, ending the two-phase
1361 // write will report success.
1362 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
1363
1364 // But trying to write should result in failure.
1365 num_bytes = kTestDataSize;
1366 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
1367
1368 // As will trying to start another two-phase write.
1369 write_buffer_ptr = nullptr;
1370 num_bytes = 0u;
1371 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1372 BeginWriteData(&write_buffer_ptr, &num_bytes));
1373 }
1374
1375 // Tests the behavior of "interrupting" a two-phase write by closing both the
1376 // producer and the consumer.
1377 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
1378 const uint32_t kTestDataSize = 15u;
1379
1380 const MojoCreateDataPipeOptions options = {
1381 kSizeOfOptions, // |struct_size|.
1382 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1383 1u, // |element_num_bytes|.
1384 1000u // |capacity_num_bytes|.
1385 };
1386 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1387
1388 // Start two-phase write.
1389 void* write_buffer_ptr = nullptr;
1390 uint32_t num_bytes = 0u;
1391 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1392 EXPECT_TRUE(write_buffer_ptr);
1393 ASSERT_GT(num_bytes, kTestDataSize);
1394 }
1395
1396 // Tests the behavior of writing, closing the producer, and then reading (with
1397 // and without data remaining).
1398 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
1399 const char kTestData[] = "hello world";
1400 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1401
1402 const MojoCreateDataPipeOptions options = {
1403 kSizeOfOptions, // |struct_size|.
1404 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1405 1u, // |element_num_bytes|.
1406 1000u // |capacity_num_bytes|.
1407 };
1408 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1409 MojoHandleSignalsState hss;
1410
1411 // Write some data, so we'll have something to read.
1412 uint32_t num_bytes = kTestDataSize;
1413 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1414 ASSERT_EQ(kTestDataSize, num_bytes);
1415
1416 // Close the producer.
1417 CloseProducer();
1418
1419 // Wait. (Note that once the consumer knows that the producer is closed, it
1420 // must also know about all the data that was sent.)
1421 hss = MojoHandleSignalsState();
1422 ASSERT_EQ(MOJO_RESULT_OK,
1423 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1424 MOJO_DEADLINE_INDEFINITE, &hss));
1425 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1426 hss.satisfied_signals);
1427 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1428 hss.satisfiable_signals);
1429
1430 // Peek that data.
1431 char buffer[1000];
1432 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1433 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
1434 ASSERT_EQ(kTestDataSize, num_bytes);
1435 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1436
1437 // Read that data.
1438 memset(buffer, 0, 1000);
1439 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1440 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
1441 ASSERT_EQ(kTestDataSize, num_bytes);
1442 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1443
1444 // A second read should fail.
1445 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1446 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
1447
1448 // A two-phase read should also fail.
1449 const void* read_buffer_ptr = nullptr;
1450 num_bytes = 0u;
1451 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1452 ReadData(&read_buffer_ptr, &num_bytes));
1453
1454 // Ditto for discard.
1455 num_bytes = 10u;
1456 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
1457 }
1458
1459 // Test that two-phase reads/writes behave correctly when given invalid
1460 // arguments.
1461 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
1462 const MojoCreateDataPipeOptions options = {
1463 kSizeOfOptions, // |struct_size|.
1464 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1465 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1466 10 * sizeof(int32_t) // |capacity_num_bytes|.
1467 };
1468 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1469 MojoHandleSignalsState hss;
1470
1471 // No data.
1472 uint32_t num_bytes = 1000u;
1473 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1474 ASSERT_EQ(0u, num_bytes);
1475
1476 // Try "ending" a two-phase write when one isn't active.
1477 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1478 EndWriteData(1u * sizeof(int32_t)));
1479
1480 // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
1481 // have time to propagate.
1482 test::Sleep(test::EpsilonDeadline());
1483
1484 // Still no data.
1485 num_bytes = 1000u;
1486 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1487 ASSERT_EQ(0u, num_bytes);
1488
1489 // Try ending a two-phase write with an invalid amount (too much).
1490 num_bytes = 0u;
1491 void* write_ptr = nullptr;
1492 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1493 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1494 EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1495
1496 // But the two-phase write still ended.
1497 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1498
1499 // Wait a bit (as above).
1500 test::Sleep(test::EpsilonDeadline());
1501
1502 // Still no data.
1503 num_bytes = 1000u;
1504 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1505 ASSERT_EQ(0u, num_bytes);
1506
1507 // Try ending a two-phase write with an invalid amount (not a multiple of the
1508 // element size).
1509 num_bytes = 0u;
1510 write_ptr = nullptr;
1511 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1512 EXPECT_GE(num_bytes, 1u);
1513 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
1514
1515 // But the two-phase write still ended.
1516 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1517
1518 // Wait a bit (as above).
1519 test::Sleep(test::EpsilonDeadline());
1520
1521 // Still no data.
1522 num_bytes = 1000u;
1523 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1524 ASSERT_EQ(0u, num_bytes);
1525
1526 // Now write some data, so we'll be able to try reading.
1527 int32_t element = 123;
1528 num_bytes = 1u * sizeof(int32_t);
1529 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
1530
1531 // Wait for data.
1532 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1533 hss = MojoHandleSignalsState();
1534 ASSERT_EQ(MOJO_RESULT_OK,
1535 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1536 MOJO_DEADLINE_INDEFINITE, &hss));
1537 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1538 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1539 hss.satisfiable_signals);
1540
1541 // One element available.
1542 num_bytes = 0u;
1543 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1544 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1545
1546 // Try "ending" a two-phase read when one isn't active.
1547 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
1548
1549 // Still one element available.
1550 num_bytes = 0u;
1551 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1552 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1553
1554 // Try ending a two-phase read with an invalid amount (too much).
1555 num_bytes = 0u;
1556 const void* read_ptr = nullptr;
1557 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1558 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1559 EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1560
1561 // Still one element available.
1562 num_bytes = 0u;
1563 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1564 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1565
1566 // Try ending a two-phase read with an invalid amount (not a multiple of the
1567 // element size).
1568 num_bytes = 0u;
1569 read_ptr = nullptr;
1570 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1571 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1572 ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
1573 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
1574
1575 // Still one element available.
1576 num_bytes = 0u;
1577 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1578 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1579 }
1580
1581 } // namespace
1582 } // namespace edk
1583 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698