Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(371)

Side by Side Diff: mojo/edk/system/data_pipe_unittest.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: more cleanup Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdint.h>
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/logging.h"
10 #include "base/memory/scoped_ptr.h"
11 #include "base/message_loop/message_loop.h"
12 #include "mojo/edk/embedder/platform_channel_pair.h"
13 #include "mojo/edk/embedder/simple_platform_support.h"
14 #include "mojo/edk/system/test_utils.h"
15 #include "mojo/edk/system/waiter.h"
16 #include "mojo/public/c/system/data_pipe.h"
17 #include "mojo/public/c/system/functions.h"
18 #include "mojo/public/cpp/system/macros.h"
19 #include "testing/gtest/include/gtest/gtest.h"
20
21 namespace mojo {
22 namespace edk {
23 namespace {
24
25 const MojoHandleSignals kSignalAll = MOJO_HANDLE_SIGNAL_READABLE |
26 MOJO_HANDLE_SIGNAL_WRITABLE |
27 MOJO_HANDLE_SIGNAL_PEER_CLOSED;
28 const uint32_t kSizeOfOptions =
29 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
30
31 // In various places, we have to poll (since, e.g., we can't yet wait for a
32 // certain amount of data to be available). This is the maximum number of
33 // iterations (separated by a short sleep).
34 // TODO(vtl): Get rid of this.
35 const size_t kMaxPoll = 100;
36
37 class DataPipeTest : public test::MojoSystemTest {
38 public:
39 DataPipeTest() : producer_(MOJO_HANDLE_INVALID),
40 consumer_(MOJO_HANDLE_INVALID) {}
41
42 ~DataPipeTest() override {
43 if (producer_ != MOJO_HANDLE_INVALID)
44 CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
45 if (consumer_ != MOJO_HANDLE_INVALID)
46 CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
47 }
48
49 MojoResult Create(const MojoCreateDataPipeOptions* options) {
50 return MojoCreateDataPipe(options, &producer_, &consumer_);
51 }
52
53 MojoResult WriteData(const void* elements,
54 uint32_t* num_bytes,
55 bool all_or_none = false) {
56 return MojoWriteData(producer_, elements, num_bytes,
57 all_or_none ? MOJO_READ_DATA_FLAG_ALL_OR_NONE :
58 MOJO_WRITE_DATA_FLAG_NONE);
59 }
60
61 MojoResult ReadData(void* elements,
62 uint32_t* num_bytes,
63 bool all_or_none = false,
64 bool peek = false) {
65 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
66 if (all_or_none)
67 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
68 if (peek)
69 flags |= MOJO_READ_DATA_FLAG_PEEK;
70 return MojoReadData(consumer_, elements, num_bytes, flags);
71 }
72
73 MojoResult QueryData(uint32_t* num_bytes) {
74 return MojoReadData(consumer_, nullptr, num_bytes,
75 MOJO_READ_DATA_FLAG_QUERY);
76 }
77
78 MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
79 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
80 if (all_or_none)
81 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
82 return MojoReadData(consumer_, nullptr, num_bytes, flags);
83 }
84
85 MojoResult BeginReadData(const void** elements,
86 uint32_t* num_bytes,
87 bool all_or_none = false) {
88 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
89 if (all_or_none)
90 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
91 return MojoBeginReadData(consumer_, elements, num_bytes, flags);
92 }
93
94 MojoResult EndReadData(uint32_t num_bytes_read) {
95 return MojoEndReadData(consumer_, num_bytes_read);
96 }
97
98 MojoResult BeginWriteData(void** elements,
99 uint32_t* num_bytes,
100 bool all_or_none = false) {
101 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
102 if (all_or_none)
103 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
104 return MojoBeginWriteData(producer_, elements, num_bytes, flags);
105 }
106
107 MojoResult EndWriteData(uint32_t num_bytes_written) {
108 return MojoEndWriteData(producer_, num_bytes_written);
109 }
110
111 MojoResult CloseProducer() {
112 MojoResult rv = MojoClose(producer_);
113 producer_ = MOJO_HANDLE_INVALID;
114 return rv;
115 }
116
117 MojoResult CloseConsumer() {
118 MojoResult rv = MojoClose(consumer_);
119 consumer_ = MOJO_HANDLE_INVALID;
120 return rv;
121 }
122
123 MojoHandle producer_, consumer_;
124
125 private:
126 MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
127 };
128
129 TEST_F(DataPipeTest, Basic) {
130 const MojoCreateDataPipeOptions options = {
131 kSizeOfOptions, // |struct_size|.
132 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
133 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
134 1000 * sizeof(int32_t) // |capacity_num_bytes|.
135 };
136
137 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
138
139 // We can write to a data pipe handle immediately.
140 int32_t elements[10] = {};
141 uint32_t num_bytes = 0;
142
143 num_bytes =
144 static_cast<uint32_t>(MOJO_ARRAYSIZE(elements) * sizeof(elements[0]));
145
146 elements[0] = 123;
147 elements[1] = 456;
148 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
149 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
150
151 // Now wait for the other side to become readable.
152 MojoHandleSignalsState state;
153 ASSERT_EQ(MOJO_RESULT_OK,
154 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
155 MOJO_DEADLINE_INDEFINITE, &state));
156 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, state.satisfied_signals);
157
158 elements[0] = -1;
159 elements[1] = -1;
160 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
161 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
162 ASSERT_EQ(elements[0], 123);
163 ASSERT_EQ(elements[1], 456);
164 }
165
166 // Tests creation of data pipes with various (valid) options.
167 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
168 MojoCreateDataPipeOptions test_options[] = {
169 // Default options.
170 {},
171 // Trivial element size, non-default capacity.
172 {kSizeOfOptions, // |struct_size|.
173 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
174 1, // |element_num_bytes|.
175 1000}, // |capacity_num_bytes|.
176 // Nontrivial element size, non-default capacity.
177 {kSizeOfOptions, // |struct_size|.
178 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
179 4, // |element_num_bytes|.
180 4000}, // |capacity_num_bytes|.
181 // Nontrivial element size, default capacity.
182 {kSizeOfOptions, // |struct_size|.
183 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
184 100, // |element_num_bytes|.
185 0} // |capacity_num_bytes|.
186 };
187 for (size_t i = 0; i < arraysize(test_options); i++) {
188 MojoHandle producer_handle, consumer_handle;
189 MojoCreateDataPipeOptions* options =
190 i ? &test_options[i] : nullptr;
191 ASSERT_EQ(MOJO_RESULT_OK,
192 MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
193 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
194 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
195 }
196 }
197
198 TEST_F(DataPipeTest, SimpleReadWrite) {
199 const MojoCreateDataPipeOptions options = {
200 kSizeOfOptions, // |struct_size|.
201 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
202 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
203 1000 * sizeof(int32_t) // |capacity_num_bytes|.
204 };
205
206 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
207 MojoHandleSignalsState hss;
208
209 int32_t elements[10] = {};
210 uint32_t num_bytes = 0;
211
212 // Try reading; nothing there yet.
213 num_bytes =
214 static_cast<uint32_t>(MOJO_ARRAYSIZE(elements) * sizeof(elements[0]));
215 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
216
217 // Query; nothing there yet.
218 num_bytes = 0;
219 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
220 ASSERT_EQ(0u, num_bytes);
221
222 // Discard; nothing there yet.
223 num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
224 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
225
226 // Read with invalid |num_bytes|.
227 num_bytes = sizeof(elements[0]) + 1;
228 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
229
230 // Write two elements.
231 elements[0] = 123;
232 elements[1] = 456;
233 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
234 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
235 // It should have written everything (even without "all or none").
236 ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
237
238 // Wait.
239 ASSERT_EQ(MOJO_RESULT_OK,
240 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
241 MOJO_DEADLINE_INDEFINITE, &hss));
242 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
243 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
244 hss.satisfiable_signals);
245
246 // Query.
247 // TODO(vtl): It's theoretically possible (though not with the current
248 // implementation/configured limits) that not all the data has arrived yet.
249 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
250 // or |2 * ...|.)
251 num_bytes = 0;
252 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
253 ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
254
255 // Read one element.
256 elements[0] = -1;
257 elements[1] = -1;
258 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
259 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
260 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
261 ASSERT_EQ(123, elements[0]);
262 ASSERT_EQ(-1, elements[1]);
263
264 // Query.
265 // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
266 // should get 1 here.)
267 num_bytes = 0;
268 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
269 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
270
271 // Peek one element.
272 elements[0] = -1;
273 elements[1] = -1;
274 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
275 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
276 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
277 ASSERT_EQ(456, elements[0]);
278 ASSERT_EQ(-1, elements[1]);
279
280 // Query. Still has 1 element remaining.
281 num_bytes = 0;
282 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
283 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
284
285 // Try to read two elements, with "all or none".
286 elements[0] = -1;
287 elements[1] = -1;
288 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
289 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
290 ReadData(elements, &num_bytes, true, false));
291 ASSERT_EQ(-1, elements[0]);
292 ASSERT_EQ(-1, elements[1]);
293
294 // Try to read two elements, without "all or none".
295 elements[0] = -1;
296 elements[1] = -1;
297 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
298 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
299 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
300 ASSERT_EQ(456, elements[0]);
301 ASSERT_EQ(-1, elements[1]);
302
303 // Query.
304 num_bytes = 0;
305 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
306 ASSERT_EQ(0u, num_bytes);
307 }
308
309 // Note: The "basic" waiting tests test that the "wait states" are correct in
310 // various situations; they don't test that waiters are properly awoken on state
311 // changes. (For that, we need to use multiple threads.)
312 TEST_F(DataPipeTest, BasicProducerWaiting) {
313 // Note: We take advantage of the fact that current for current
314 // implementations capacities are strict maximums. This is not guaranteed by
315 // the API.
316
317 const MojoCreateDataPipeOptions options = {
318 kSizeOfOptions, // |struct_size|.
319 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
320 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
321 2 * sizeof(int32_t) // |capacity_num_bytes|.
322 };
323 Create(&options);
324 MojoHandleSignalsState hss;
325
326 // Never readable.
327 hss = MojoHandleSignalsState();
328 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
329 MojoWait(producer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
330 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
331 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
332 hss.satisfiable_signals);
333
334 // Already writable.
335 hss = MojoHandleSignalsState();
336 ASSERT_EQ(MOJO_RESULT_OK,
337 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
338
339 // Write two elements.
340 int32_t elements[2] = {123, 456};
341 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
342 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
343 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
344
345 // Wait for data to become available to the consumer.
346 ASSERT_EQ(MOJO_RESULT_OK,
347 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
348 MOJO_DEADLINE_INDEFINITE, &hss));
349 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
350 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
351 hss.satisfiable_signals);
352
353 // Peek one element.
354 elements[0] = -1;
355 elements[1] = -1;
356 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
357 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
358 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
359 ASSERT_EQ(123, elements[0]);
360 ASSERT_EQ(-1, elements[1]);
361
362 // Read one element.
363 elements[0] = -1;
364 elements[1] = -1;
365 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
366 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
367 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
368 ASSERT_EQ(123, elements[0]);
369 ASSERT_EQ(-1, elements[1]);
370
371 // Try writing, using a two-phase write.
372 void* buffer = nullptr;
373 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
374 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
375 EXPECT_TRUE(buffer);
376 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
377
378 static_cast<int32_t*>(buffer)[0] = 789;
379 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(static_cast<uint32_t>(
380 1u * sizeof(elements[0]))));
381
382 // Read one element, using a two-phase read.
383 const void* read_buffer = nullptr;
384 num_bytes = 0u;
385 ASSERT_EQ(MOJO_RESULT_OK,
386 BeginReadData(&read_buffer, &num_bytes, false));
387 EXPECT_TRUE(read_buffer);
388 // Since we only read one element (after having written three in all), the
389 // two-phase read should only allow us to read one. This checks an
390 // implementation detail!
391 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
392 ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
393 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(static_cast<uint32_t>(
394 1u * sizeof(elements[0]))));
395
396 // Write one element.
397 elements[0] = 123;
398 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
399 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
400 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
401
402 // Close the consumer.
403 CloseConsumer();
404
405 // It should now be never-writable.
406 hss = MojoHandleSignalsState();
407 ASSERT_EQ(MOJO_RESULT_OK,
408 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
409 MOJO_DEADLINE_INDEFINITE, &hss));
410 hss = MojoHandleSignalsState();
411 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
412 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
413 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
414 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
415 }
416
417 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
418 const MojoCreateDataPipeOptions options = {
419 kSizeOfOptions, // |struct_size|.
420 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
421 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
422 2 * sizeof(int32_t) // |capacity_num_bytes|.
423 };
424 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
425 MojoHandleSignalsState hss;
426
427 // Close the consumer.
428 CloseConsumer();
429
430 // It should be signaled.
431 hss = MojoHandleSignalsState();
432 ASSERT_EQ(MOJO_RESULT_OK,
433 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
434 MOJO_DEADLINE_INDEFINITE, &hss));
435 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
436 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
437 }
438
439 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
440 const MojoCreateDataPipeOptions options = {
441 kSizeOfOptions, // |struct_size|.
442 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
443 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
444 2 * sizeof(int32_t) // |capacity_num_bytes|.
445 };
446 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
447 MojoHandleSignalsState hss;
448
449 // Close the producer.
450 CloseProducer();
451
452 // It should be signaled.
453 hss = MojoHandleSignalsState();
454 ASSERT_EQ(MOJO_RESULT_OK,
455 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
456 MOJO_DEADLINE_INDEFINITE, &hss));
457 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
458 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
459 }
460
461 TEST_F(DataPipeTest, BasicConsumerWaiting) {
462 const MojoCreateDataPipeOptions options = {
463 kSizeOfOptions, // |struct_size|.
464 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
465 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
466 1000 * sizeof(int32_t) // |capacity_num_bytes|.
467 };
468 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
469 MojoHandleSignalsState hss;
470
471 // Never writable.
472 hss = MojoHandleSignalsState();
473 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
474 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE,
475 MOJO_DEADLINE_INDEFINITE, &hss));
476 ASSERT_EQ(0u, hss.satisfied_signals);
477 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
478 hss.satisfiable_signals);
479
480 // Write two elements.
481 int32_t elements[2] = {123, 456};
482 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
483 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
484
485 // Wait for readability.
486 hss = MojoHandleSignalsState();
487 ASSERT_EQ(MOJO_RESULT_OK,
488 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
489 MOJO_DEADLINE_INDEFINITE, &hss));
490 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
491 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
492 hss.satisfiable_signals);
493
494 // Discard one element.
495 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
496 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
497 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
498
499 // Should still be readable.
500 hss = MojoHandleSignalsState();
501 ASSERT_EQ(MOJO_RESULT_OK,
502 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
503 MOJO_DEADLINE_INDEFINITE, &hss));
504 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
505 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
506 hss.satisfiable_signals);
507
508 // Peek one element.
509 elements[0] = -1;
510 elements[1] = -1;
511 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
512 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
513 ASSERT_EQ(456, elements[0]);
514 ASSERT_EQ(-1, elements[1]);
515
516 // Should still be readable.
517 hss = MojoHandleSignalsState();
518 ASSERT_EQ(MOJO_RESULT_OK,
519 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
520 MOJO_DEADLINE_INDEFINITE, &hss));
521 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
522 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
523 hss.satisfiable_signals);
524
525 // Read one element.
526 elements[0] = -1;
527 elements[1] = -1;
528 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
529 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
530 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
531 ASSERT_EQ(456, elements[0]);
532 ASSERT_EQ(-1, elements[1]);
533
534 // Write one element.
535 elements[0] = 789;
536 elements[1] = -1;
537 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
538 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
539
540 // Waiting should now succeed.
541 hss = MojoHandleSignalsState();
542 ASSERT_EQ(MOJO_RESULT_OK,
543 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
544 MOJO_DEADLINE_INDEFINITE, &hss));
545 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
546 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
547 hss.satisfiable_signals);
548
549 // Close the producer.
550 CloseProducer();
551
552 // Should still be readable.
553 hss = MojoHandleSignalsState();
554 ASSERT_EQ(MOJO_RESULT_OK,
555 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
556 MOJO_DEADLINE_INDEFINITE, &hss));
557 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
558 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
559 hss.satisfiable_signals);
560
561 // Wait for the peer closed signal.
562 hss = MojoHandleSignalsState();
563 ASSERT_EQ(MOJO_RESULT_OK,
564 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
565 MOJO_DEADLINE_INDEFINITE, &hss));
566 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0);
567 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
568 hss.satisfiable_signals);
569
570 // Read one element.
571 elements[0] = -1;
572 elements[1] = -1;
573 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
574 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
575 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
576 ASSERT_EQ(789, elements[0]);
577 ASSERT_EQ(-1, elements[1]);
578
579 // Should be never-readable.
580 hss = MojoHandleSignalsState();
581 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
582 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
583 MOJO_DEADLINE_INDEFINITE, &hss));
584 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
585 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
586 }
587
588 // Test with two-phase APIs and also closing the producer with an active
589 // consumer waiter.
590 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
591 const MojoCreateDataPipeOptions options = {
592 kSizeOfOptions, // |struct_size|.
593 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
594 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
595 1000 * sizeof(int32_t) // |capacity_num_bytes|.
596 };
597 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
598 MojoHandleSignalsState hss;
599
600 // Write two elements.
601 int32_t* elements = nullptr;
602 void* buffer = nullptr;
603 // Request room for three (but we'll only write two).
604 uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
605 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes, true));
606 EXPECT_TRUE(buffer);
607 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
608 elements = static_cast<int32_t*>(buffer);
609 elements[0] = 123;
610 elements[1] = 456;
611 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
612
613 // Wait for readability.
614 hss = MojoHandleSignalsState();
615 ASSERT_EQ(MOJO_RESULT_OK,
616 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
617 MOJO_DEADLINE_INDEFINITE, &hss));
618 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
619 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
620 hss.satisfiable_signals);
621
622 // Read one element.
623 // Request two in all-or-none mode, but only read one.
624 const void* read_buffer = nullptr;
625 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
626 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes, true));
627 EXPECT_TRUE(read_buffer);
628 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
629 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
630 ASSERT_EQ(123, read_elements[0]);
631 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
632
633 // Should still be readable.
634 hss = MojoHandleSignalsState();
635 ASSERT_EQ(MOJO_RESULT_OK,
636 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
637 MOJO_DEADLINE_INDEFINITE, &hss));
638 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
639 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
640 hss.satisfiable_signals);
641
642 // Read one element.
643 // Request three, but not in all-or-none mode.
644 read_buffer = nullptr;
645 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
646 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
647 EXPECT_TRUE(read_buffer);
648 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
649 read_elements = static_cast<const int32_t*>(read_buffer);
650 ASSERT_EQ(456, read_elements[0]);
651 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
652
653 // Close the producer.
654 CloseProducer();
655
656 // Should be never-readable.
657 hss = MojoHandleSignalsState();
658 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
659 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
660 MOJO_DEADLINE_INDEFINITE, &hss));
661 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
662 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
663 }
664
665 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
666 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
667 const MojoCreateDataPipeOptions options = {
668 kSizeOfOptions, // |struct_size|.
669 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
670 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
671 1000 * sizeof(int32_t) // |capacity_num_bytes|.
672 };
673 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
674 MojoHandleSignalsState hss;
675
676 // It should be writable.
677 hss = MojoHandleSignalsState();
678 ASSERT_EQ(MOJO_RESULT_OK,
679 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
680 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
681 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
682 hss.satisfiable_signals);
683
684 uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
685 void* write_ptr = nullptr;
686 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
687 EXPECT_TRUE(write_ptr);
688 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
689
690 // At this point, it shouldn't be writable.
691 hss = MojoHandleSignalsState();
692 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
693 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
694 ASSERT_EQ(0u, hss.satisfied_signals);
695 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
696 hss.satisfiable_signals);
697
698 // It shouldn't be readable yet either (we'll wait later).
699 hss = MojoHandleSignalsState();
700 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
701 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
702 ASSERT_EQ(0u, hss.satisfied_signals);
703 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
704 hss.satisfiable_signals);
705
706 static_cast<int32_t*>(write_ptr)[0] = 123;
707 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
708
709 // It should immediately be writable again.
710 hss = MojoHandleSignalsState();
711 ASSERT_EQ(MOJO_RESULT_OK,
712 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
713 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
714 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
715 hss.satisfiable_signals);
716
717 // It should become readable.
718 hss = MojoHandleSignalsState();
719 ASSERT_EQ(MOJO_RESULT_OK,
720 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
721 MOJO_DEADLINE_INDEFINITE, &hss));
722 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
723 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
724 hss.satisfiable_signals);
725
726 // Start another two-phase write and check that it's readable even in the
727 // middle of it.
728 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
729 write_ptr = nullptr;
730 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
731 EXPECT_TRUE(write_ptr);
732 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
733
734 // It should be readable.
735 hss = MojoHandleSignalsState();
736 ASSERT_EQ(MOJO_RESULT_OK,
737 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
738 MOJO_DEADLINE_INDEFINITE, &hss));
739 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
740 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
741 hss.satisfiable_signals);
742
743 // End the two-phase write without writing anything.
744 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
745
746 // Start a two-phase read.
747 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
748 const void* read_ptr = nullptr;
749 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
750 EXPECT_TRUE(read_ptr);
751 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
752
753 // At this point, it should still be writable.
754 hss = MojoHandleSignalsState();
755 ASSERT_EQ(MOJO_RESULT_OK,
756 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
757 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
758 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
759 hss.satisfiable_signals);
760
761 // But not readable.
762 hss = MojoHandleSignalsState();
763 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
764 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
765 ASSERT_EQ(0u, hss.satisfied_signals);
766 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
767 hss.satisfiable_signals);
768
769 // End the two-phase read without reading anything.
770 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
771
772 // It should be readable again.
773 hss = MojoHandleSignalsState();
774 ASSERT_EQ(MOJO_RESULT_OK,
775 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
776 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
777 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
778 hss.satisfiable_signals);
779 }
780
781 void Seq(int32_t start, size_t count, int32_t* out) {
782 for (size_t i = 0; i < count; i++)
783 out[i] = start + static_cast<int32_t>(i);
784 }
785
786 TEST_F(DataPipeTest, AllOrNone) {
787 const MojoCreateDataPipeOptions options = {
788 kSizeOfOptions, // |struct_size|.
789 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
790 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
791 10 * sizeof(int32_t) // |capacity_num_bytes|.
792 };
793 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
794 MojoHandleSignalsState hss;
795
796 // Try writing way too much.
797 uint32_t num_bytes = 20u * sizeof(int32_t);
798 int32_t buffer[100];
799 Seq(0, MOJO_ARRAYSIZE(buffer), buffer);
800 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
801
802 // Should still be empty.
803 num_bytes = ~0u;
804 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
805 ASSERT_EQ(0u, num_bytes);
806
807 // Write some data.
808 num_bytes = 5u * sizeof(int32_t);
809 Seq(100, MOJO_ARRAYSIZE(buffer), buffer);
810 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
811 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
812
813 // Wait for data.
814 // TODO(vtl): There's no real guarantee that all the data will become
815 // available at once (except that in current implementations, with reasonable
816 // limits, it will). Eventually, we'll be able to wait for a specified amount
817 // of data to become available.
818 hss = MojoHandleSignalsState();
819 ASSERT_EQ(MOJO_RESULT_OK,
820 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
821 MOJO_DEADLINE_INDEFINITE, &hss));
822 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
823 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
824 hss.satisfiable_signals);
825
826 // Half full.
827 num_bytes = 0u;
828 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
829 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
830
831 /* TODO(jam): enable if we end up observing max capacity
832 // Too much.
833 num_bytes = 6u * sizeof(int32_t);
834 Seq(200, MOJO_ARRAYSIZE(buffer), buffer);
835 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
836 */
837
838 // Try reading too much.
839 num_bytes = 11u * sizeof(int32_t);
840 memset(buffer, 0xab, sizeof(buffer));
841 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
842 int32_t expected_buffer[100];
843 memset(expected_buffer, 0xab, sizeof(expected_buffer));
844 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
845
846 // Try discarding too much.
847 num_bytes = 11u * sizeof(int32_t);
848 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
849
850 // Just a little.
851 num_bytes = 2u * sizeof(int32_t);
852 Seq(300, MOJO_ARRAYSIZE(buffer), buffer);
853 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
854 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
855
856 // Just right.
857 num_bytes = 3u * sizeof(int32_t);
858 Seq(400, MOJO_ARRAYSIZE(buffer), buffer);
859 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
860 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
861
862 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
863 // specified amount of data to be available, so poll.
864 for (size_t i = 0; i < kMaxPoll; i++) {
865 num_bytes = 0u;
866 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
867 if (num_bytes >= 10u * sizeof(int32_t))
868 break;
869
870 test::Sleep(test::EpsilonDeadline());
871 }
872 ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
873
874 // Read half.
875 num_bytes = 5u * sizeof(int32_t);
876 memset(buffer, 0xab, sizeof(buffer));
877 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
878 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
879 memset(expected_buffer, 0xab, sizeof(expected_buffer));
880 Seq(100, 5, expected_buffer);
881 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
882
883 // Try reading too much again.
884 num_bytes = 6u * sizeof(int32_t);
885 memset(buffer, 0xab, sizeof(buffer));
886 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
887 memset(expected_buffer, 0xab, sizeof(expected_buffer));
888 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
889
890 // Try discarding too much again.
891 num_bytes = 6u * sizeof(int32_t);
892 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
893
894 // Discard a little.
895 num_bytes = 2u * sizeof(int32_t);
896 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
897 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
898
899 // Three left.
900 num_bytes = 0u;
901 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
902 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
903
904 // Close the producer, then test producer-closed cases.
905 CloseProducer();
906
907 // Wait.
908 hss = MojoHandleSignalsState();
909 ASSERT_EQ(MOJO_RESULT_OK,
910 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
911 MOJO_DEADLINE_INDEFINITE, &hss));
912 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
913 hss.satisfied_signals);
914 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
915 hss.satisfiable_signals);
916
917 // Try reading too much; "failed precondition" since the producer is closed.
918 num_bytes = 4u * sizeof(int32_t);
919 memset(buffer, 0xab, sizeof(buffer));
920 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
921 ReadData(buffer, &num_bytes, true));
922 memset(expected_buffer, 0xab, sizeof(expected_buffer));
923 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
924
925 // Try discarding too much; "failed precondition" again.
926 num_bytes = 4u * sizeof(int32_t);
927 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
928
929 // Read a little.
930 num_bytes = 2u * sizeof(int32_t);
931 memset(buffer, 0xab, sizeof(buffer));
932 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
933 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
934 memset(expected_buffer, 0xab, sizeof(expected_buffer));
935 Seq(400, 2, expected_buffer);
936 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
937
938 // Discard the remaining element.
939 num_bytes = 1u * sizeof(int32_t);
940 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
941 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
942
943 // Empty again.
944 num_bytes = ~0u;
945 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
946 ASSERT_EQ(0u, num_bytes);
947 }
948
949 TEST_F(DataPipeTest, DISABLED_TwoPhaseAllOrNone) {
950 const MojoCreateDataPipeOptions options = {
951 kSizeOfOptions, // |struct_size|.
952 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
953 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
954 10 * sizeof(int32_t) // |capacity_num_bytes|.
955 };
956 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
957 MojoHandleSignalsState hss;
958
959 // Try writing way too much (two-phase).
960 uint32_t num_bytes = 20u * sizeof(int32_t);
961 void* write_ptr = nullptr;
962 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
963 BeginWriteData(&write_ptr, &num_bytes, true));
964
965 // Try writing an amount which isn't a multiple of the element size
966 // (two-phase).
967 static_assert(sizeof(int32_t) > 1u, "Wow! int32_t's have size 1");
968 num_bytes = 1u;
969 write_ptr = nullptr;
970 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
971 BeginWriteData(&write_ptr, &num_bytes, true));
972
973 // Try reading way too much (two-phase).
974 num_bytes = 20u * sizeof(int32_t);
975 const void* read_ptr = nullptr;
976 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
977 BeginReadData(&read_ptr, &num_bytes, true));
978
979 // Write half (two-phase).
980 num_bytes = 5u * sizeof(int32_t);
981 write_ptr = nullptr;
982 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes, true));
983 // May provide more space than requested.
984 EXPECT_GE(num_bytes, 5u * sizeof(int32_t));
985 EXPECT_TRUE(write_ptr);
986 Seq(0, 5, static_cast<int32_t*>(write_ptr));
987 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(5u * sizeof(int32_t)));
988
989 // Wait for data.
990 // TODO(vtl): (See corresponding TODO in AllOrNone.)
991 hss = MojoHandleSignalsState();
992 ASSERT_EQ(MOJO_RESULT_OK,
993 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
994 MOJO_DEADLINE_INDEFINITE, &hss));
995 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
996 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
997 hss.satisfiable_signals);
998
999 // Try reading an amount which isn't a multiple of the element size
1000 // (two-phase).
1001 num_bytes = 1u;
1002 read_ptr = nullptr;
1003 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1004 BeginReadData(&read_ptr, &num_bytes, true));
1005
1006 // Read one (two-phase).
1007 num_bytes = 1u * sizeof(int32_t);
1008 read_ptr = nullptr;
1009 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes, true));
1010 EXPECT_GE(num_bytes, 1u * sizeof(int32_t));
1011 ASSERT_EQ(0, static_cast<const int32_t*>(read_ptr)[0]);
1012 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(int32_t)));
1013
1014 // We should have four left, leaving room for six.
1015 num_bytes = 0u;
1016 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1017 ASSERT_EQ(4u * sizeof(int32_t), num_bytes);
1018
1019 // Assuming a tight circular buffer of the specified capacity, we can't do a
1020 // two-phase write of six now.
1021 num_bytes = 6u * sizeof(int32_t);
1022 write_ptr = nullptr;
1023 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1024 BeginWriteData(&write_ptr, &num_bytes, true));
1025
1026 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
1027 // specified amount of space to be available, so poll.
1028 for (size_t i = 0; i < kMaxPoll; i++) {
1029 // Write six elements (simple), filling the buffer.
1030 num_bytes = 6u * sizeof(int32_t);
1031 int32_t buffer[100];
1032 Seq(100, 6, buffer);
1033 MojoResult result = WriteData(buffer, &num_bytes, true);
1034 if (result == MOJO_RESULT_OK)
1035 break;
1036 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
1037
1038 test::Sleep(test::EpsilonDeadline());
1039 }
1040 ASSERT_EQ(6u * sizeof(int32_t), num_bytes);
1041
1042 // TODO(vtl): Hack: poll again.
1043 for (size_t i = 0; i < kMaxPoll; i++) {
1044 // We have ten.
1045 num_bytes = 0u;
1046 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1047 if (num_bytes >= 10u * sizeof(int32_t))
1048 break;
1049
1050 test::Sleep(test::EpsilonDeadline());
1051 }
1052 ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
1053
1054 // Note: Whether a two-phase read of ten would fail here or not is
1055 // implementation-dependent.
1056
1057 // Close the producer.
1058 CloseProducer();
1059
1060 // A two-phase read of nine should work.
1061 num_bytes = 9u * sizeof(int32_t);
1062 read_ptr = nullptr;
1063 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes, true));
1064 EXPECT_GE(num_bytes, 9u * sizeof(int32_t));
1065 ASSERT_EQ(1, static_cast<const int32_t*>(read_ptr)[0]);
1066 ASSERT_EQ(2, static_cast<const int32_t*>(read_ptr)[1]);
1067 ASSERT_EQ(3, static_cast<const int32_t*>(read_ptr)[2]);
1068 ASSERT_EQ(4, static_cast<const int32_t*>(read_ptr)[3]);
1069 ASSERT_EQ(100, static_cast<const int32_t*>(read_ptr)[4]);
1070 ASSERT_EQ(101, static_cast<const int32_t*>(read_ptr)[5]);
1071 ASSERT_EQ(102, static_cast<const int32_t*>(read_ptr)[6]);
1072 ASSERT_EQ(103, static_cast<const int32_t*>(read_ptr)[7]);
1073 ASSERT_EQ(104, static_cast<const int32_t*>(read_ptr)[8]);
1074 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(9u * sizeof(int32_t)));
1075
1076 // Wait for peer closed.
1077 hss = MojoHandleSignalsState();
1078 ASSERT_EQ(MOJO_RESULT_OK,
1079 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1080 MOJO_DEADLINE_INDEFINITE, &hss));
1081 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1082 hss.satisfied_signals);
1083 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1084 hss.satisfiable_signals);
1085
1086 // A two-phase read of two should fail, with "failed precondition".
1087 num_bytes = 2u * sizeof(int32_t);
1088 read_ptr = nullptr;
1089 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1090 BeginReadData(&read_ptr, &num_bytes, true));
1091 }
1092
1093 /*
1094 jam: this is testing that the implementation uses a circular buffer, which we
1095 don't use currently.
1096 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
1097 // respectively, as much as possible, even if it may have to "wrap around" the
1098 // internal circular buffer. (Note that the two-phase write and read need not do
1099 // this.)
1100 TYPED_TEST(DataPipeImplTest, WrapAround) {
1101 unsigned char test_data[1000];
1102 for (size_t i = 0; i < MOJO_ARRAYSIZE(test_data); i++)
1103 test_data[i] = static_cast<unsigned char>(i);
1104
1105 const MojoCreateDataPipeOptions options = {
1106 kSizeOfOptions, // |struct_size|.
1107 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1108 1u, // |element_num_bytes|.
1109 100u // |capacity_num_bytes|.
1110 };
1111 MojoCreateDataPipeOptions validated_options = {};
1112 // This test won't be valid if |ValidateCreateOptions()| decides to give the
1113 // pipe more space.
1114 ASSERT_EQ(MOJO_RESULT_OK, DataPipe::ValidateCreateOptions(
1115 &options, &validated_options));
1116 ASSERT_EQ(100u, validated_options.capacity_num_bytes);
1117 this->Create(options);
1118 this->DoTransfer();
1119
1120 Waiter waiter;
1121 HandleSignalsState hss;
1122
1123 // Add waiter.
1124 waiter.Init();
1125 ASSERT_EQ(MOJO_RESULT_OK,
1126 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
1127 nullptr));
1128
1129 // Write 20 bytes.
1130 uint32_t num_bytes = 20u;
1131 ASSERT_EQ(MOJO_RESULT_OK,
1132 this->ProducerWriteData(&test_data[0], &num_bytes, false));
1133 ASSERT_EQ(20u, num_bytes);
1134
1135 // Wait for data.
1136 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1137 ASSERT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
1138 hss = HandleSignalsState();
1139 this->ConsumerRemoveAwakable(&waiter, &hss);
1140 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1141 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1142 hss.satisfiable_signals);
1143
1144 // Read 10 bytes.
1145 unsigned char read_buffer[1000] = {0};
1146 num_bytes = 10u;
1147 ASSERT_EQ(MOJO_RESULT_OK,
1148 this->ConsumerReadData(read_buffer, &num_bytes, false, false));
1149 ASSERT_EQ(10u, num_bytes);
1150 ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
1151
1152 if (this->IsStrictCircularBuffer()) {
1153 // Check that a two-phase write can now only write (at most) 80 bytes. (This
1154 // checks an implementation detail; this behavior is not guaranteed.)
1155 void* write_buffer_ptr = nullptr;
1156 num_bytes = 0u;
1157 ASSERT_EQ(MOJO_RESULT_OK,
1158 this->ProducerBeginWriteData(&write_buffer_ptr, &num_bytes,
1159 false));
1160 EXPECT_TRUE(write_buffer_ptr);
1161 ASSERT_EQ(80u, num_bytes);
1162 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
1163 }
1164
1165 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1166 size_t total_num_bytes = 0;
1167 for (size_t i = 0; i < kMaxPoll; i++) {
1168 // Write as much data as we can (using |ProducerWriteData()|). We should
1169 // write 90 bytes (eventually).
1170 num_bytes = 200u;
1171 MojoResult result = this->ProducerWriteData(
1172 &test_data[20 + total_num_bytes], &num_bytes, false);
1173 if (result == MOJO_RESULT_OK) {
1174 total_num_bytes += num_bytes;
1175 if (total_num_bytes >= 90u)
1176 break;
1177 } else {
1178 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
1179 }
1180
1181 test::Sleep(test::EpsilonDeadline());
1182 }
1183 ASSERT_EQ(90u, total_num_bytes);
1184
1185 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1186 for (size_t i = 0; i < kMaxPoll; i++) {
1187 // We have 100.
1188 num_bytes = 0u;
1189 ASSERT_EQ(MOJO_RESULT_OK,
1190 this->ConsumerQueryData(&num_bytes));
1191 if (num_bytes >= 100u)
1192 break;
1193
1194 test::Sleep(test::EpsilonDeadline());
1195 }
1196 ASSERT_EQ(100u, num_bytes);
1197
1198 if (this->IsStrictCircularBuffer()) {
1199 // Check that a two-phase read can now only read (at most) 90 bytes. (This
1200 // checks an implementation detail; this behavior is not guaranteed.)
1201 const void* read_buffer_ptr = nullptr;
1202 num_bytes = 0u;
1203 ASSERT_EQ(MOJO_RESULT_OK,
1204 this->ConsumerBeginReadData(&read_buffer_ptr, &num_bytes, false));
1205 EXPECT_TRUE(read_buffer_ptr);
1206 ASSERT_EQ(90u, num_bytes);
1207 ASSERT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
1208 }
1209
1210 // Read as much as possible (using |ConsumerReadData()|). We should read 100
1211 // bytes.
1212 num_bytes = static_cast<uint32_t>(MOJO_ARRAYSIZE(read_buffer) *
1213 sizeof(read_buffer[0]));
1214 memset(read_buffer, 0, num_bytes);
1215 ASSERT_EQ(MOJO_RESULT_OK,
1216 this->ConsumerReadData(read_buffer, &num_bytes, false, false));
1217 ASSERT_EQ(100u, num_bytes);
1218 ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
1219
1220 this->ProducerClose();
1221 this->ConsumerClose();
1222 }
1223 */
1224
1225 // Tests the behavior of writing (simple and two-phase), closing the producer,
1226 // then reading (simple and two-phase).
1227 TEST_F(DataPipeTest, WriteCloseProducerRead) {
1228 const char kTestData[] = "hello world";
1229 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1230
1231 const MojoCreateDataPipeOptions options = {
1232 kSizeOfOptions, // |struct_size|.
1233 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1234 1u, // |element_num_bytes|.
1235 1000u // |capacity_num_bytes|.
1236 };
1237 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1238
1239 // Write some data, so we'll have something to read.
1240 uint32_t num_bytes = kTestDataSize;
1241 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1242 ASSERT_EQ(kTestDataSize, num_bytes);
1243
1244 // Write it again, so we'll have something left over.
1245 num_bytes = kTestDataSize;
1246 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1247 ASSERT_EQ(kTestDataSize, num_bytes);
1248
1249 // Start two-phase write.
1250 void* write_buffer_ptr = nullptr;
1251 num_bytes = 0u;
1252 ASSERT_EQ(MOJO_RESULT_OK,
1253 BeginWriteData(&write_buffer_ptr, &num_bytes, false));
1254 EXPECT_TRUE(write_buffer_ptr);
1255 EXPECT_GT(num_bytes, 0u);
1256
1257 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1258 for (size_t i = 0; i < kMaxPoll; i++) {
1259 num_bytes = 0u;
1260 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1261 if (num_bytes >= 2u * kTestDataSize)
1262 break;
1263
1264 test::Sleep(test::EpsilonDeadline());
1265 }
1266 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1267
1268 // Start two-phase read.
1269 const void* read_buffer_ptr = nullptr;
1270 num_bytes = 0u;
1271 ASSERT_EQ(MOJO_RESULT_OK,
1272 BeginReadData(&read_buffer_ptr, &num_bytes));
1273 EXPECT_TRUE(read_buffer_ptr);
1274 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1275
1276 // Close the producer.
1277 CloseProducer();
1278
1279 // The consumer can finish its two-phase read.
1280 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1281 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
1282
1283 // And start another.
1284 read_buffer_ptr = nullptr;
1285 num_bytes = 0u;
1286 ASSERT_EQ(MOJO_RESULT_OK,
1287 BeginReadData(&read_buffer_ptr, &num_bytes));
1288 EXPECT_TRUE(read_buffer_ptr);
1289 ASSERT_EQ(kTestDataSize, num_bytes);
1290 }
1291
1292
1293 // Tests the behavior of interrupting a two-phase read and write by closing the
1294 // consumer.
1295 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
1296 const char kTestData[] = "hello world";
1297 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1298
1299 const MojoCreateDataPipeOptions options = {
1300 kSizeOfOptions, // |struct_size|.
1301 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1302 1u, // |element_num_bytes|.
1303 1000u // |capacity_num_bytes|.
1304 };
1305 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1306 MojoHandleSignalsState hss;
1307
1308 // Write some data, so we'll have something to read.
1309 uint32_t num_bytes = kTestDataSize;
1310 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1311 ASSERT_EQ(kTestDataSize, num_bytes);
1312
1313 // Start two-phase write.
1314 void* write_buffer_ptr = nullptr;
1315 num_bytes = 0u;
1316 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1317 EXPECT_TRUE(write_buffer_ptr);
1318 ASSERT_GT(num_bytes, kTestDataSize);
1319
1320 // Wait for data.
1321 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1322 hss = MojoHandleSignalsState();
1323 ASSERT_EQ(MOJO_RESULT_OK,
1324 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1325 MOJO_DEADLINE_INDEFINITE, &hss));
1326 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1327 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1328 hss.satisfiable_signals);
1329
1330 // Start two-phase read.
1331 const void* read_buffer_ptr = nullptr;
1332 num_bytes = 0u;
1333 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1334 EXPECT_TRUE(read_buffer_ptr);
1335 ASSERT_EQ(kTestDataSize, num_bytes);
1336
1337 // Close the consumer.
1338 CloseConsumer();
1339
1340 // Wait for producer to know that the consumer is closed.
1341 hss = MojoHandleSignalsState();
1342 ASSERT_EQ(MOJO_RESULT_OK,
1343 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1344 MOJO_DEADLINE_INDEFINITE, &hss));
1345 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1346 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1347
1348 // Actually write some data. (Note: Premature freeing of the buffer would
1349 // probably only be detected under ASAN or similar.)
1350 memcpy(write_buffer_ptr, kTestData, kTestDataSize);
1351 // Note: Even though the consumer has been closed, ending the two-phase
1352 // write will report success.
1353 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
1354
1355 // But trying to write should result in failure.
1356 num_bytes = kTestDataSize;
1357 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
1358
1359 // As will trying to start another two-phase write.
1360 write_buffer_ptr = nullptr;
1361 num_bytes = 0u;
1362 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1363 BeginWriteData(&write_buffer_ptr, &num_bytes));
1364 }
1365
1366 // Tests the behavior of "interrupting" a two-phase write by closing both the
1367 // producer and the consumer.
1368 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
1369 const uint32_t kTestDataSize = 15u;
1370
1371 const MojoCreateDataPipeOptions options = {
1372 kSizeOfOptions, // |struct_size|.
1373 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1374 1u, // |element_num_bytes|.
1375 1000u // |capacity_num_bytes|.
1376 };
1377 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1378
1379 // Start two-phase write.
1380 void* write_buffer_ptr = nullptr;
1381 uint32_t num_bytes = 0u;
1382 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1383 EXPECT_TRUE(write_buffer_ptr);
1384 ASSERT_GT(num_bytes, kTestDataSize);
1385 }
1386
1387 // Tests the behavior of writing, closing the producer, and then reading (with
1388 // and without data remaining).
1389 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
1390 const char kTestData[] = "hello world";
1391 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1392
1393 const MojoCreateDataPipeOptions options = {
1394 kSizeOfOptions, // |struct_size|.
1395 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1396 1u, // |element_num_bytes|.
1397 1000u // |capacity_num_bytes|.
1398 };
1399 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1400 MojoHandleSignalsState hss;
1401
1402 // Write some data, so we'll have something to read.
1403 uint32_t num_bytes = kTestDataSize;
1404 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1405 ASSERT_EQ(kTestDataSize, num_bytes);
1406
1407 // Close the producer.
1408 CloseProducer();
1409
1410 // Wait. (Note that once the consumer knows that the producer is closed, it
1411 // must also know about all the data that was sent.)
1412 hss = MojoHandleSignalsState();
1413 ASSERT_EQ(MOJO_RESULT_OK,
1414 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1415 MOJO_DEADLINE_INDEFINITE, &hss));
1416 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1417 hss.satisfied_signals);
1418 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1419 hss.satisfiable_signals);
1420
1421 // Peek that data.
1422 char buffer[1000];
1423 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1424 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
1425 ASSERT_EQ(kTestDataSize, num_bytes);
1426 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1427
1428 // Read that data.
1429 memset(buffer, 0, 1000);
1430 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1431 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
1432 ASSERT_EQ(kTestDataSize, num_bytes);
1433 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1434
1435 // A second read should fail.
1436 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1437 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
1438
1439 // A two-phase read should also fail.
1440 const void* read_buffer_ptr = nullptr;
1441 num_bytes = 0u;
1442 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1443 ReadData(&read_buffer_ptr, &num_bytes));
1444
1445 // Ditto for discard.
1446 num_bytes = 10u;
1447 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
1448 }
1449
1450 // Test that two-phase reads/writes behave correctly when given invalid
1451 // arguments.
1452 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
1453 const MojoCreateDataPipeOptions options = {
1454 kSizeOfOptions, // |struct_size|.
1455 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1456 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1457 10 * sizeof(int32_t) // |capacity_num_bytes|.
1458 };
1459 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1460 MojoHandleSignalsState hss;
1461
1462 // No data.
1463 uint32_t num_bytes = 1000u;
1464 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1465 ASSERT_EQ(0u, num_bytes);
1466
1467 // Try "ending" a two-phase write when one isn't active.
1468 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1469 EndWriteData(1u * sizeof(int32_t)));
1470
1471 // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
1472 // have time to propagate.
1473 test::Sleep(test::EpsilonDeadline());
1474
1475 // Still no data.
1476 num_bytes = 1000u;
1477 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1478 ASSERT_EQ(0u, num_bytes);
1479
1480 // Try ending a two-phase write with an invalid amount (too much).
1481 num_bytes = 0u;
1482 void* write_ptr = nullptr;
1483 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1484 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1485 EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1486
1487 // But the two-phase write still ended.
1488 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1489
1490 // Wait a bit (as above).
1491 test::Sleep(test::EpsilonDeadline());
1492
1493 // Still no data.
1494 num_bytes = 1000u;
1495 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1496 ASSERT_EQ(0u, num_bytes);
1497
1498 // Try ending a two-phase write with an invalid amount (not a multiple of the
1499 // element size).
1500 num_bytes = 0u;
1501 write_ptr = nullptr;
1502 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1503 EXPECT_GE(num_bytes, 1u);
1504 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
1505
1506 // But the two-phase write still ended.
1507 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1508
1509 // Wait a bit (as above).
1510 test::Sleep(test::EpsilonDeadline());
1511
1512 // Still no data.
1513 num_bytes = 1000u;
1514 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1515 ASSERT_EQ(0u, num_bytes);
1516
1517 // Now write some data, so we'll be able to try reading.
1518 int32_t element = 123;
1519 num_bytes = 1u * sizeof(int32_t);
1520 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
1521
1522 // Wait for data.
1523 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1524 hss = MojoHandleSignalsState();
1525 ASSERT_EQ(MOJO_RESULT_OK,
1526 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1527 MOJO_DEADLINE_INDEFINITE, &hss));
1528 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1529 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1530 hss.satisfiable_signals);
1531
1532 // One element available.
1533 num_bytes = 0u;
1534 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1535 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1536
1537 // Try "ending" a two-phase read when one isn't active.
1538 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
1539
1540 // Still one element available.
1541 num_bytes = 0u;
1542 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1543 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1544
1545 // Try ending a two-phase read with an invalid amount (too much).
1546 num_bytes = 0u;
1547 const void* read_ptr = nullptr;
1548 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1549 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1550 EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1551
1552 // Still one element available.
1553 num_bytes = 0u;
1554 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1555 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1556
1557 // Try ending a two-phase read with an invalid amount (not a multiple of the
1558 // element size).
1559 num_bytes = 0u;
1560 read_ptr = nullptr;
1561 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1562 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1563 ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
1564 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
1565
1566 // Still one element available.
1567 num_bytes = 0u;
1568 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1569 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1570 }
1571
1572 } // namespace
1573 } // namespace edk
1574 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698