Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(243)

Side by Side Diff: mojo/edk/system/data_pipe_unittest.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: some review comments Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //#include "mojo/edk/system/data_pipe_impl.h"
6
7 #include <stdint.h>
8
9 #include "base/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "mojo/edk/embedder/platform_channel_pair.h"
15 #include "mojo/edk/embedder/simple_platform_support.h"
16 #include "mojo/edk/system/test_utils.h"
17 #include "mojo/edk/system/waiter.h"
18 #include "mojo/public/c/system/data_pipe.h"
19 #include "mojo/public/c/system/functions.h"
20 #include "mojo/public/cpp/system/macros.h"
21 #include "testing/gtest/include/gtest/gtest.h"
22
23 namespace mojo {
24 namespace edk {
25 namespace {
26
27 const MojoHandleSignals kSignalAll = MOJO_HANDLE_SIGNAL_READABLE |
28 MOJO_HANDLE_SIGNAL_WRITABLE |
29 MOJO_HANDLE_SIGNAL_PEER_CLOSED;
30 const uint32_t kSizeOfOptions =
31 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
32
33 // In various places, we have to poll (since, e.g., we can't yet wait for a
34 // certain amount of data to be available). This is the maximum number of
35 // iterations (separated by a short sleep).
36 // TODO(vtl): Get rid of this.
37 const size_t kMaxPoll = 100;
38
39 class DataPipeTest : public test::MojoSystemTest {
40 public:
41 DataPipeTest() : producer_(MOJO_HANDLE_INVALID),
42 consumer_(MOJO_HANDLE_INVALID) {}
43
44 ~DataPipeTest() override {
45 if (producer_ != MOJO_HANDLE_INVALID)
46 CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
47 if (consumer_ != MOJO_HANDLE_INVALID)
48 CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
49 }
50
51 MojoResult Create(const MojoCreateDataPipeOptions* options) {
52 return MojoCreateDataPipe(options, &producer_, &consumer_);
53 }
54
55 MojoResult WriteData(const void* elements,
56 uint32_t* num_bytes,
57 bool all_or_none = false) {
58 return MojoWriteData(producer_, elements, num_bytes,
59 all_or_none ? MOJO_READ_DATA_FLAG_ALL_OR_NONE :
60 MOJO_WRITE_DATA_FLAG_NONE);
61 }
62
63 MojoResult ReadData(void* elements,
64 uint32_t* num_bytes,
65 bool all_or_none = false,
66 bool peek = false) {
67 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
68 if (all_or_none)
69 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
70 if (peek)
71 flags |= MOJO_READ_DATA_FLAG_PEEK;
72 return MojoReadData(consumer_, elements, num_bytes, flags);
73 }
74
75 MojoResult QueryData(uint32_t* num_bytes) {
76 return MojoReadData(consumer_, nullptr, num_bytes,
77 MOJO_READ_DATA_FLAG_QUERY);
78 }
79
80 MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
81 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
82 if (all_or_none)
83 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
84 return MojoReadData(consumer_, nullptr, num_bytes, flags);
85 }
86
87 MojoResult BeginReadData(const void** elements,
88 uint32_t* num_bytes,
89 bool all_or_none = false) {
90 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
91 if (all_or_none)
92 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
93 return MojoBeginReadData(consumer_, elements, num_bytes, flags);
94 }
95
96 MojoResult EndReadData(uint32_t num_bytes_read) {
97 return MojoEndReadData(consumer_, num_bytes_read);
98 }
99
100 MojoResult BeginWriteData(void** elements,
101 uint32_t* num_bytes,
102 bool all_or_none = false) {
103 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
104 if (all_or_none)
105 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
106 return MojoBeginWriteData(producer_, elements, num_bytes, flags);
107 }
108
109 MojoResult EndWriteData(uint32_t num_bytes_written) {
110 return MojoEndWriteData(producer_, num_bytes_written);
111 }
112
113 MojoResult CloseProducer() {
114 MojoResult rv = MojoClose(producer_);
115 producer_ = MOJO_HANDLE_INVALID;
116 return rv;
117 }
118
119 MojoResult CloseConsumer() {
120 MojoResult rv = MojoClose(consumer_);
121 consumer_ = MOJO_HANDLE_INVALID;
122 return rv;
123 }
124
125 MojoHandle producer_, consumer_;
126
127 private:
128 MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
129 };
130
131 TEST_F(DataPipeTest, Basic) {
132 const MojoCreateDataPipeOptions options = {
133 kSizeOfOptions, // |struct_size|.
134 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
135 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
136 1000 * sizeof(int32_t) // |capacity_num_bytes|.
137 };
138
139 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
140
141 // We can write to a data pipe handle immediately.
142 int32_t elements[10] = {};
143 uint32_t num_bytes = 0;
144
145 num_bytes =
146 static_cast<uint32_t>(MOJO_ARRAYSIZE(elements) * sizeof(elements[0]));
147
148 elements[0] = 123;
149 elements[1] = 456;
150 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
151 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
152
153 // Now wait for the other side to become readable.
154 MojoHandleSignalsState state;
155 ASSERT_EQ(MOJO_RESULT_OK,
156 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
157 MOJO_DEADLINE_INDEFINITE, &state));
158 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, state.satisfied_signals);
159
160 elements[0] = -1;
161 elements[1] = -1;
162 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
163 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
164 ASSERT_EQ(elements[0], 123);
165 ASSERT_EQ(elements[1], 456);
166 }
167
168 // Tests creation of data pipes with various (valid) options.
169 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
170 MojoCreateDataPipeOptions test_options[] = {
171 // Default options.
172 {},
173 // Trivial element size, non-default capacity.
174 {kSizeOfOptions, // |struct_size|.
175 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
176 1, // |element_num_bytes|.
177 1000}, // |capacity_num_bytes|.
178 // Nontrivial element size, non-default capacity.
179 {kSizeOfOptions, // |struct_size|.
180 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
181 4, // |element_num_bytes|.
182 4000}, // |capacity_num_bytes|.
183 // Nontrivial element size, default capacity.
184 {kSizeOfOptions, // |struct_size|.
185 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
186 100, // |element_num_bytes|.
187 0} // |capacity_num_bytes|.
188 };
189 for (size_t i = 0; i < arraysize(test_options); i++) {
190 MojoHandle producer_handle, consumer_handle;
191 MojoCreateDataPipeOptions* options =
192 i ? &test_options[i] : nullptr;
193 ASSERT_EQ(MOJO_RESULT_OK,
194 MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
195 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
196 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
197 }
198 }
199
200
201 TEST_F(DataPipeTest, SimpleReadWrite) {
202 const MojoCreateDataPipeOptions options = {
203 kSizeOfOptions, // |struct_size|.
204 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
205 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
206 1000 * sizeof(int32_t) // |capacity_num_bytes|.
207 };
208
209 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
210 MojoHandleSignalsState hss;
211
212 int32_t elements[10] = {};
213 uint32_t num_bytes = 0;
214
215 // Try reading; nothing there yet.
216 num_bytes =
217 static_cast<uint32_t>(MOJO_ARRAYSIZE(elements) * sizeof(elements[0]));
218 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
219
220 // Query; nothing there yet.
221 num_bytes = 0;
222 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
223 ASSERT_EQ(0u, num_bytes);
224
225 // Discard; nothing there yet.
226 num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
227 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
228
229 // Read with invalid |num_bytes|.
230 num_bytes = sizeof(elements[0]) + 1;
231 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
232
233 // Write two elements.
234 elements[0] = 123;
235 elements[1] = 456;
236 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
237 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
238 // It should have written everything (even without "all or none").
239 ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
240
241 // Wait.
242 ASSERT_EQ(MOJO_RESULT_OK,
243 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
244 MOJO_DEADLINE_INDEFINITE, &hss));
245 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
246 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
247 hss.satisfiable_signals);
248
249 // Query.
250 // TODO(vtl): It's theoretically possible (though not with the current
251 // implementation/configured limits) that not all the data has arrived yet.
252 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
253 // or |2 * ...|.)
254 num_bytes = 0;
255 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
256 ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
257
258 // Read one element.
259 elements[0] = -1;
260 elements[1] = -1;
261 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
262 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
263 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
264 ASSERT_EQ(123, elements[0]);
265 ASSERT_EQ(-1, elements[1]);
266
267 // Query.
268 // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
269 // should get 1 here.)
270 num_bytes = 0;
271 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
272 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
273
274 // Peek one element.
275 elements[0] = -1;
276 elements[1] = -1;
277 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
278 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
279 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
280 ASSERT_EQ(456, elements[0]);
281 ASSERT_EQ(-1, elements[1]);
282
283 // Query. Still has 1 element remaining.
284 num_bytes = 0;
285 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
286 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
287
288 // Try to read two elements, with "all or none".
289 elements[0] = -1;
290 elements[1] = -1;
291 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
292 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
293 ReadData(elements, &num_bytes, true, false));
294 ASSERT_EQ(-1, elements[0]);
295 ASSERT_EQ(-1, elements[1]);
296
297 // Try to read two elements, without "all or none".
298 elements[0] = -1;
299 elements[1] = -1;
300 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
301 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
302 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
303 ASSERT_EQ(456, elements[0]);
304 ASSERT_EQ(-1, elements[1]);
305
306 // Query.
307 num_bytes = 0;
308 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
309 ASSERT_EQ(0u, num_bytes);
310 }
311
312 // Note: The "basic" waiting tests test that the "wait states" are correct in
313 // various situations; they don't test that waiters are properly awoken on state
314 // changes. (For that, we need to use multiple threads.)
315 TEST_F(DataPipeTest, BasicProducerWaiting) {
316 // Note: We take advantage of the fact that current for current
317 // implementations capacities are strict maximums. This is not guaranteed by
318 // the API.
319
320 const MojoCreateDataPipeOptions options = {
321 kSizeOfOptions, // |struct_size|.
322 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
323 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
324 2 * sizeof(int32_t) // |capacity_num_bytes|.
325 };
326 Create(&options);
327 MojoHandleSignalsState hss;
328
329 // Never readable.
330 hss = MojoHandleSignalsState();
331 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
332 MojoWait(producer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
333 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
334 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
335 hss.satisfiable_signals);
336
337 // Already writable.
338 hss = MojoHandleSignalsState();
339 ASSERT_EQ(MOJO_RESULT_OK,
340 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
341
342 // Write two elements.
343 int32_t elements[2] = {123, 456};
344 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
345 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
346 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
347
348 // Wait for data to become available to the consumer.
349 ASSERT_EQ(MOJO_RESULT_OK,
350 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
351 MOJO_DEADLINE_INDEFINITE, &hss));
352 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
353 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
354 hss.satisfiable_signals);
355
356 // Peek one element.
357 elements[0] = -1;
358 elements[1] = -1;
359 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
360 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
361 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
362 ASSERT_EQ(123, elements[0]);
363 ASSERT_EQ(-1, elements[1]);
364
365 // Read one element.
366 elements[0] = -1;
367 elements[1] = -1;
368 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
369 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
370 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
371 ASSERT_EQ(123, elements[0]);
372 ASSERT_EQ(-1, elements[1]);
373
374 // Try writing, using a two-phase write.
375 void* buffer = nullptr;
376 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
377 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
378 EXPECT_TRUE(buffer);
379 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
380
381 static_cast<int32_t*>(buffer)[0] = 789;
382 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(static_cast<uint32_t>(
383 1u * sizeof(elements[0]))));
384
385 // Read one element, using a two-phase read.
386 const void* read_buffer = nullptr;
387 num_bytes = 0u;
388 ASSERT_EQ(MOJO_RESULT_OK,
389 BeginReadData(&read_buffer, &num_bytes, false));
390 EXPECT_TRUE(read_buffer);
391 // Since we only read one element (after having written three in all), the
392 // two-phase read should only allow us to read one. This checks an
393 // implementation detail!
394 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
395 ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
396 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(static_cast<uint32_t>(
397 1u * sizeof(elements[0]))));
398
399 // Write one element.
400 elements[0] = 123;
401 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
402 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
403 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
404
405 // Close the consumer.
406 CloseConsumer();
407
408 // It should now be never-writable.
409 hss = MojoHandleSignalsState();
410 ASSERT_EQ(MOJO_RESULT_OK,
411 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
412 MOJO_DEADLINE_INDEFINITE, &hss));
413 hss = MojoHandleSignalsState();
414 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
415 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
416 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
417 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
418 }
419
420 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
421 const MojoCreateDataPipeOptions options = {
422 kSizeOfOptions, // |struct_size|.
423 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
424 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
425 2 * sizeof(int32_t) // |capacity_num_bytes|.
426 };
427 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
428 MojoHandleSignalsState hss;
429
430 // Close the consumer.
431 CloseConsumer();
432
433 // It should be signaled.
434 hss = MojoHandleSignalsState();
435 ASSERT_EQ(MOJO_RESULT_OK,
436 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
437 MOJO_DEADLINE_INDEFINITE, &hss));
438 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
439 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
440 }
441
442 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
443 const MojoCreateDataPipeOptions options = {
444 kSizeOfOptions, // |struct_size|.
445 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
446 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
447 2 * sizeof(int32_t) // |capacity_num_bytes|.
448 };
449 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
450 MojoHandleSignalsState hss;
451
452 // Close the producer.
453 CloseProducer();
454
455 // It should be signaled.
456 hss = MojoHandleSignalsState();
457 ASSERT_EQ(MOJO_RESULT_OK,
458 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
459 MOJO_DEADLINE_INDEFINITE, &hss));
460 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
461 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
462 }
463
464 TEST_F(DataPipeTest, BasicConsumerWaiting) {
465 const MojoCreateDataPipeOptions options = {
466 kSizeOfOptions, // |struct_size|.
467 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
468 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
469 1000 * sizeof(int32_t) // |capacity_num_bytes|.
470 };
471 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
472 MojoHandleSignalsState hss;
473
474 // Never writable.
475 hss = MojoHandleSignalsState();
476 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
477 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE,
478 MOJO_DEADLINE_INDEFINITE, &hss));
479 ASSERT_EQ(0u, hss.satisfied_signals);
480 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
481 hss.satisfiable_signals);
482
483 // Write two elements.
484 int32_t elements[2] = {123, 456};
485 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
486 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
487
488 // Wait for readability.
489 hss = MojoHandleSignalsState();
490 ASSERT_EQ(MOJO_RESULT_OK,
491 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
492 MOJO_DEADLINE_INDEFINITE, &hss));
493 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
494 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
495 hss.satisfiable_signals);
496
497 // Discard one element.
498 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
499 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
500 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
501
502 // Should still be readable.
503 hss = MojoHandleSignalsState();
504 ASSERT_EQ(MOJO_RESULT_OK,
505 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
506 MOJO_DEADLINE_INDEFINITE, &hss));
507 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
508 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
509 hss.satisfiable_signals);
510
511 // Peek one element.
512 elements[0] = -1;
513 elements[1] = -1;
514 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
515 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
516 ASSERT_EQ(456, elements[0]);
517 ASSERT_EQ(-1, elements[1]);
518
519 // Should still be readable.
520 hss = MojoHandleSignalsState();
521 ASSERT_EQ(MOJO_RESULT_OK,
522 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
523 MOJO_DEADLINE_INDEFINITE, &hss));
524 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
525 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
526 hss.satisfiable_signals);
527
528 // Read one element.
529 elements[0] = -1;
530 elements[1] = -1;
531 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
532 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
533 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
534 ASSERT_EQ(456, elements[0]);
535 ASSERT_EQ(-1, elements[1]);
536
537 // Write one element.
538 elements[0] = 789;
539 elements[1] = -1;
540 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
541 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
542
543 // Waiting should now succeed.
544 hss = MojoHandleSignalsState();
545 ASSERT_EQ(MOJO_RESULT_OK,
546 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
547 MOJO_DEADLINE_INDEFINITE, &hss));
548 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
549 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
550 hss.satisfiable_signals);
551
552 // Close the producer.
553 CloseProducer();
554
555 // Should still be readable.
556 hss = MojoHandleSignalsState();
557 ASSERT_EQ(MOJO_RESULT_OK,
558 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
559 MOJO_DEADLINE_INDEFINITE, &hss));
560 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
561 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
562 hss.satisfiable_signals);
563
564 // Wait for the peer closed signal.
565 hss = MojoHandleSignalsState();
566 ASSERT_EQ(MOJO_RESULT_OK,
567 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
568 MOJO_DEADLINE_INDEFINITE, &hss));
569 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0);
570 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
571 hss.satisfiable_signals);
572
573 // Read one element.
574 elements[0] = -1;
575 elements[1] = -1;
576 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
577 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
578 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
579 ASSERT_EQ(789, elements[0]);
580 ASSERT_EQ(-1, elements[1]);
581
582 // Should be never-readable.
583 hss = MojoHandleSignalsState();
584 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
585 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
586 MOJO_DEADLINE_INDEFINITE, &hss));
587 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
588 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
589 }
590
591 // Test with two-phase APIs and also closing the producer with an active
592 // consumer waiter.
593 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
594 const MojoCreateDataPipeOptions options = {
595 kSizeOfOptions, // |struct_size|.
596 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
597 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
598 1000 * sizeof(int32_t) // |capacity_num_bytes|.
599 };
600 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
601 MojoHandleSignalsState hss;
602
603 // Write two elements.
604 int32_t* elements = nullptr;
605 void* buffer = nullptr;
606 // Request room for three (but we'll only write two).
607 uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
608 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes, true));
609 EXPECT_TRUE(buffer);
610 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
611 elements = static_cast<int32_t*>(buffer);
612 elements[0] = 123;
613 elements[1] = 456;
614 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
615
616 // Wait for readability.
617 hss = MojoHandleSignalsState();
618 ASSERT_EQ(MOJO_RESULT_OK,
619 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
620 MOJO_DEADLINE_INDEFINITE, &hss));
621 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
622 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
623 hss.satisfiable_signals);
624
625 // Read one element.
626 // Request two in all-or-none mode, but only read one.
627 const void* read_buffer = nullptr;
628 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
629 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes, true));
630 EXPECT_TRUE(read_buffer);
631 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
632 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
633 ASSERT_EQ(123, read_elements[0]);
634 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
635
636 // Should still be readable.
637 hss = MojoHandleSignalsState();
638 ASSERT_EQ(MOJO_RESULT_OK,
639 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
640 MOJO_DEADLINE_INDEFINITE, &hss));
641 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
642 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
643 hss.satisfiable_signals);
644
645 // Read one element.
646 // Request three, but not in all-or-none mode.
647 read_buffer = nullptr;
648 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
649 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
650 EXPECT_TRUE(read_buffer);
651 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
652 read_elements = static_cast<const int32_t*>(read_buffer);
653 ASSERT_EQ(456, read_elements[0]);
654 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
655
656 // Close the producer.
657 CloseProducer();
658
659 // Should be never-readable.
660 hss = MojoHandleSignalsState();
661 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
662 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
663 MOJO_DEADLINE_INDEFINITE, &hss));
664 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
665 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
666 }
667
668 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
669 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
670 const MojoCreateDataPipeOptions options = {
671 kSizeOfOptions, // |struct_size|.
672 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
673 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
674 1000 * sizeof(int32_t) // |capacity_num_bytes|.
675 };
676 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
677 MojoHandleSignalsState hss;
678
679 // It should be writable.
680 hss = MojoHandleSignalsState();
681 ASSERT_EQ(MOJO_RESULT_OK,
682 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
683 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
684 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
685 hss.satisfiable_signals);
686
687 uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
688 void* write_ptr = nullptr;
689 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
690 EXPECT_TRUE(write_ptr);
691 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
692
693 // At this point, it shouldn't be writable.
694 hss = MojoHandleSignalsState();
695 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
696 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
697 ASSERT_EQ(0u, hss.satisfied_signals);
698 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
699 hss.satisfiable_signals);
700
701 // It shouldn't be readable yet either (we'll wait later).
702 hss = MojoHandleSignalsState();
703 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
704 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
705 ASSERT_EQ(0u, hss.satisfied_signals);
706 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
707 hss.satisfiable_signals);
708
709 static_cast<int32_t*>(write_ptr)[0] = 123;
710 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
711
712 // It should immediately be writable again.
713 hss = MojoHandleSignalsState();
714 ASSERT_EQ(MOJO_RESULT_OK,
715 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
716 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
717 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
718 hss.satisfiable_signals);
719
720 // It should become readable.
721 hss = MojoHandleSignalsState();
722 ASSERT_EQ(MOJO_RESULT_OK,
723 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
724 MOJO_DEADLINE_INDEFINITE, &hss));
725 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
726 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
727 hss.satisfiable_signals);
728
729 // Start another two-phase write and check that it's readable even in the
730 // middle of it.
731 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
732 write_ptr = nullptr;
733 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
734 EXPECT_TRUE(write_ptr);
735 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
736
737 // It should be readable.
738 hss = MojoHandleSignalsState();
739 ASSERT_EQ(MOJO_RESULT_OK,
740 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
741 MOJO_DEADLINE_INDEFINITE, &hss));
742 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
743 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
744 hss.satisfiable_signals);
745
746 // End the two-phase write without writing anything.
747 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
748
749 // Start a two-phase read.
750 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
751 const void* read_ptr = nullptr;
752 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
753 EXPECT_TRUE(read_ptr);
754 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
755
756 // At this point, it should still be writable.
757 hss = MojoHandleSignalsState();
758 ASSERT_EQ(MOJO_RESULT_OK,
759 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
760 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
761 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
762 hss.satisfiable_signals);
763
764 // But not readable.
765 hss = MojoHandleSignalsState();
766 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
767 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
768 ASSERT_EQ(0u, hss.satisfied_signals);
769 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
770 hss.satisfiable_signals);
771
772 // End the two-phase read without reading anything.
773 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
774
775 // It should be readable again.
776 hss = MojoHandleSignalsState();
777 ASSERT_EQ(MOJO_RESULT_OK,
778 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
779 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
780 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
781 hss.satisfiable_signals);
782 }
783
784 void Seq(int32_t start, size_t count, int32_t* out) {
785 for (size_t i = 0; i < count; i++)
786 out[i] = start + static_cast<int32_t>(i);
787 }
788
789 TEST_F(DataPipeTest, AllOrNone) {
790 const MojoCreateDataPipeOptions options = {
791 kSizeOfOptions, // |struct_size|.
792 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
793 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
794 10 * sizeof(int32_t) // |capacity_num_bytes|.
795 };
796 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
797 MojoHandleSignalsState hss;
798
799 // Try writing way too much.
800 uint32_t num_bytes = 20u * sizeof(int32_t);
801 int32_t buffer[100];
802 Seq(0, MOJO_ARRAYSIZE(buffer), buffer);
803 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
804
805 // Should still be empty.
806 num_bytes = ~0u;
807 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
808 ASSERT_EQ(0u, num_bytes);
809
810 // Write some data.
811 num_bytes = 5u * sizeof(int32_t);
812 Seq(100, MOJO_ARRAYSIZE(buffer), buffer);
813 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
814 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
815
816 // Wait for data.
817 // TODO(vtl): There's no real guarantee that all the data will become
818 // available at once (except that in current implementations, with reasonable
819 // limits, it will). Eventually, we'll be able to wait for a specified amount
820 // of data to become available.
821 hss = MojoHandleSignalsState();
822 ASSERT_EQ(MOJO_RESULT_OK,
823 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
824 MOJO_DEADLINE_INDEFINITE, &hss));
825 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
826 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
827 hss.satisfiable_signals);
828
829 // Half full.
830 num_bytes = 0u;
831 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
832 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
833
834 /* TODO(jam): enable if we end up observing max capacity
835 // Too much.
836 num_bytes = 6u * sizeof(int32_t);
837 Seq(200, MOJO_ARRAYSIZE(buffer), buffer);
838 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
839 */
840
841 // Try reading too much.
842 num_bytes = 11u * sizeof(int32_t);
843 memset(buffer, 0xab, sizeof(buffer));
844 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
845 int32_t expected_buffer[100];
846 memset(expected_buffer, 0xab, sizeof(expected_buffer));
847 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
848
849 // Try discarding too much.
850 num_bytes = 11u * sizeof(int32_t);
851 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
852
853 // Just a little.
854 num_bytes = 2u * sizeof(int32_t);
855 Seq(300, MOJO_ARRAYSIZE(buffer), buffer);
856 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
857 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
858
859 // Just right.
860 num_bytes = 3u * sizeof(int32_t);
861 Seq(400, MOJO_ARRAYSIZE(buffer), buffer);
862 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
863 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
864
865 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
866 // specified amount of data to be available, so poll.
867 for (size_t i = 0; i < kMaxPoll; i++) {
868 num_bytes = 0u;
869 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
870 if (num_bytes >= 10u * sizeof(int32_t))
871 break;
872
873 test::Sleep(test::EpsilonDeadline());
874 }
875 ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
876
877 // Read half.
878 num_bytes = 5u * sizeof(int32_t);
879 memset(buffer, 0xab, sizeof(buffer));
880 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
881 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
882 memset(expected_buffer, 0xab, sizeof(expected_buffer));
883 Seq(100, 5, expected_buffer);
884 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
885
886 // Try reading too much again.
887 num_bytes = 6u * sizeof(int32_t);
888 memset(buffer, 0xab, sizeof(buffer));
889 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
890 memset(expected_buffer, 0xab, sizeof(expected_buffer));
891 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
892
893 // Try discarding too much again.
894 num_bytes = 6u * sizeof(int32_t);
895 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
896
897 // Discard a little.
898 num_bytes = 2u * sizeof(int32_t);
899 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
900 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
901
902 // Three left.
903 num_bytes = 0u;
904 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
905 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
906
907 // Close the producer, then test producer-closed cases.
908 CloseProducer();
909
910 // Wait.
911 hss = MojoHandleSignalsState();
912 ASSERT_EQ(MOJO_RESULT_OK,
913 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
914 MOJO_DEADLINE_INDEFINITE, &hss));
915 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
916 hss.satisfied_signals);
917 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
918 hss.satisfiable_signals);
919
920 // Try reading too much; "failed precondition" since the producer is closed.
921 num_bytes = 4u * sizeof(int32_t);
922 memset(buffer, 0xab, sizeof(buffer));
923 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
924 ReadData(buffer, &num_bytes, true));
925 memset(expected_buffer, 0xab, sizeof(expected_buffer));
926 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
927
928 // Try discarding too much; "failed precondition" again.
929 num_bytes = 4u * sizeof(int32_t);
930 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
931
932 // Read a little.
933 num_bytes = 2u * sizeof(int32_t);
934 memset(buffer, 0xab, sizeof(buffer));
935 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
936 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
937 memset(expected_buffer, 0xab, sizeof(expected_buffer));
938 Seq(400, 2, expected_buffer);
939 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
940
941 // Discard the remaining element.
942 num_bytes = 1u * sizeof(int32_t);
943 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
944 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
945
946 // Empty again.
947 num_bytes = ~0u;
948 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
949 ASSERT_EQ(0u, num_bytes);
950 }
951
952 TEST_F(DataPipeTest, DISABLED_TwoPhaseAllOrNone) {
953 const MojoCreateDataPipeOptions options = {
954 kSizeOfOptions, // |struct_size|.
955 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
956 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
957 10 * sizeof(int32_t) // |capacity_num_bytes|.
958 };
959 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
960 MojoHandleSignalsState hss;
961
962 // Try writing way too much (two-phase).
963 uint32_t num_bytes = 20u * sizeof(int32_t);
964 void* write_ptr = nullptr;
965 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
966 BeginWriteData(&write_ptr, &num_bytes, true));
967
968 // Try writing an amount which isn't a multiple of the element size
969 // (two-phase).
970 static_assert(sizeof(int32_t) > 1u, "Wow! int32_t's have size 1");
971 num_bytes = 1u;
972 write_ptr = nullptr;
973 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
974 BeginWriteData(&write_ptr, &num_bytes, true));
975
976 // Try reading way too much (two-phase).
977 num_bytes = 20u * sizeof(int32_t);
978 const void* read_ptr = nullptr;
979 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
980 BeginReadData(&read_ptr, &num_bytes, true));
981
982 // Write half (two-phase).
983 num_bytes = 5u * sizeof(int32_t);
984 write_ptr = nullptr;
985 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes, true));
986 // May provide more space than requested.
987 EXPECT_GE(num_bytes, 5u * sizeof(int32_t));
988 EXPECT_TRUE(write_ptr);
989 Seq(0, 5, static_cast<int32_t*>(write_ptr));
990 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(5u * sizeof(int32_t)));
991
992 // Wait for data.
993 // TODO(vtl): (See corresponding TODO in AllOrNone.)
994 hss = MojoHandleSignalsState();
995 ASSERT_EQ(MOJO_RESULT_OK,
996 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
997 MOJO_DEADLINE_INDEFINITE, &hss));
998 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
999 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1000 hss.satisfiable_signals);
1001
1002 // Try reading an amount which isn't a multiple of the element size
1003 // (two-phase).
1004 num_bytes = 1u;
1005 read_ptr = nullptr;
1006 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1007 BeginReadData(&read_ptr, &num_bytes, true));
1008
1009 // Read one (two-phase).
1010 num_bytes = 1u * sizeof(int32_t);
1011 read_ptr = nullptr;
1012 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes, true));
1013 EXPECT_GE(num_bytes, 1u * sizeof(int32_t));
1014 ASSERT_EQ(0, static_cast<const int32_t*>(read_ptr)[0]);
1015 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(int32_t)));
1016
1017 // We should have four left, leaving room for six.
1018 num_bytes = 0u;
1019 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1020 ASSERT_EQ(4u * sizeof(int32_t), num_bytes);
1021
1022 // Assuming a tight circular buffer of the specified capacity, we can't do a
1023 // two-phase write of six now.
1024 num_bytes = 6u * sizeof(int32_t);
1025 write_ptr = nullptr;
1026 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1027 BeginWriteData(&write_ptr, &num_bytes, true));
1028
1029 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
1030 // specified amount of space to be available, so poll.
1031 for (size_t i = 0; i < kMaxPoll; i++) {
1032 // Write six elements (simple), filling the buffer.
1033 num_bytes = 6u * sizeof(int32_t);
1034 int32_t buffer[100];
1035 Seq(100, 6, buffer);
1036 MojoResult result = WriteData(buffer, &num_bytes, true);
1037 if (result == MOJO_RESULT_OK)
1038 break;
1039 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
1040
1041 test::Sleep(test::EpsilonDeadline());
1042 }
1043 ASSERT_EQ(6u * sizeof(int32_t), num_bytes);
1044
1045 // TODO(vtl): Hack: poll again.
1046 for (size_t i = 0; i < kMaxPoll; i++) {
1047 // We have ten.
1048 num_bytes = 0u;
1049 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1050 if (num_bytes >= 10u * sizeof(int32_t))
1051 break;
1052
1053 test::Sleep(test::EpsilonDeadline());
1054 }
1055 ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
1056
1057 // Note: Whether a two-phase read of ten would fail here or not is
1058 // implementation-dependent.
1059
1060 // Close the producer.
1061 CloseProducer();
1062
1063 // A two-phase read of nine should work.
1064 num_bytes = 9u * sizeof(int32_t);
1065 read_ptr = nullptr;
1066 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes, true));
1067 EXPECT_GE(num_bytes, 9u * sizeof(int32_t));
1068 ASSERT_EQ(1, static_cast<const int32_t*>(read_ptr)[0]);
1069 ASSERT_EQ(2, static_cast<const int32_t*>(read_ptr)[1]);
1070 ASSERT_EQ(3, static_cast<const int32_t*>(read_ptr)[2]);
1071 ASSERT_EQ(4, static_cast<const int32_t*>(read_ptr)[3]);
1072 ASSERT_EQ(100, static_cast<const int32_t*>(read_ptr)[4]);
1073 ASSERT_EQ(101, static_cast<const int32_t*>(read_ptr)[5]);
1074 ASSERT_EQ(102, static_cast<const int32_t*>(read_ptr)[6]);
1075 ASSERT_EQ(103, static_cast<const int32_t*>(read_ptr)[7]);
1076 ASSERT_EQ(104, static_cast<const int32_t*>(read_ptr)[8]);
1077 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(9u * sizeof(int32_t)));
1078
1079 // Wait for peer closed.
1080 hss = MojoHandleSignalsState();
1081 ASSERT_EQ(MOJO_RESULT_OK,
1082 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1083 MOJO_DEADLINE_INDEFINITE, &hss));
1084 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1085 hss.satisfied_signals);
1086 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1087 hss.satisfiable_signals);
1088
1089 // A two-phase read of two should fail, with "failed precondition".
1090 num_bytes = 2u * sizeof(int32_t);
1091 read_ptr = nullptr;
1092 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1093 BeginReadData(&read_ptr, &num_bytes, true));
1094 }
1095
1096 /*
1097 jam: this is testing that the implementation uses a circular buffer, which we
1098 don't use currently.
1099 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
1100 // respectively, as much as possible, even if it may have to "wrap around" the
1101 // internal circular buffer. (Note that the two-phase write and read need not do
1102 // this.)
1103 TYPED_TEST(DataPipeImplTest, WrapAround) {
1104 unsigned char test_data[1000];
1105 for (size_t i = 0; i < MOJO_ARRAYSIZE(test_data); i++)
1106 test_data[i] = static_cast<unsigned char>(i);
1107
1108 const MojoCreateDataPipeOptions options = {
1109 kSizeOfOptions, // |struct_size|.
1110 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1111 1u, // |element_num_bytes|.
1112 100u // |capacity_num_bytes|.
1113 };
1114 MojoCreateDataPipeOptions validated_options = {};
1115 // This test won't be valid if |ValidateCreateOptions()| decides to give the
1116 // pipe more space.
1117 ASSERT_EQ(MOJO_RESULT_OK, DataPipe::ValidateCreateOptions(
1118 &options, &validated_options));
1119 ASSERT_EQ(100u, validated_options.capacity_num_bytes);
1120 this->Create(options);
1121 this->DoTransfer();
1122
1123 Waiter waiter;
1124 HandleSignalsState hss;
1125
1126 // Add waiter.
1127 waiter.Init();
1128 ASSERT_EQ(MOJO_RESULT_OK,
1129 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
1130 nullptr));
1131
1132 // Write 20 bytes.
1133 uint32_t num_bytes = 20u;
1134 ASSERT_EQ(MOJO_RESULT_OK,
1135 this->ProducerWriteData(&test_data[0], &num_bytes, false));
1136 ASSERT_EQ(20u, num_bytes);
1137
1138 // Wait for data.
1139 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1140 ASSERT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
1141 hss = HandleSignalsState();
1142 this->ConsumerRemoveAwakable(&waiter, &hss);
1143 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1144 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1145 hss.satisfiable_signals);
1146
1147 // Read 10 bytes.
1148 unsigned char read_buffer[1000] = {0};
1149 num_bytes = 10u;
1150 ASSERT_EQ(MOJO_RESULT_OK,
1151 this->ConsumerReadData(read_buffer, &num_bytes, false, false));
1152 ASSERT_EQ(10u, num_bytes);
1153 ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
1154
1155 if (this->IsStrictCircularBuffer()) {
1156 // Check that a two-phase write can now only write (at most) 80 bytes. (This
1157 // checks an implementation detail; this behavior is not guaranteed.)
1158 void* write_buffer_ptr = nullptr;
1159 num_bytes = 0u;
1160 ASSERT_EQ(MOJO_RESULT_OK,
1161 this->ProducerBeginWriteData(&write_buffer_ptr, &num_bytes,
1162 false));
1163 EXPECT_TRUE(write_buffer_ptr);
1164 ASSERT_EQ(80u, num_bytes);
1165 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
1166 }
1167
1168 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1169 size_t total_num_bytes = 0;
1170 for (size_t i = 0; i < kMaxPoll; i++) {
1171 // Write as much data as we can (using |ProducerWriteData()|). We should
1172 // write 90 bytes (eventually).
1173 num_bytes = 200u;
1174 MojoResult result = this->ProducerWriteData(
1175 &test_data[20 + total_num_bytes], &num_bytes, false);
1176 if (result == MOJO_RESULT_OK) {
1177 total_num_bytes += num_bytes;
1178 if (total_num_bytes >= 90u)
1179 break;
1180 } else {
1181 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
1182 }
1183
1184 test::Sleep(test::EpsilonDeadline());
1185 }
1186 ASSERT_EQ(90u, total_num_bytes);
1187
1188 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1189 for (size_t i = 0; i < kMaxPoll; i++) {
1190 // We have 100.
1191 num_bytes = 0u;
1192 ASSERT_EQ(MOJO_RESULT_OK,
1193 this->ConsumerQueryData(&num_bytes));
1194 if (num_bytes >= 100u)
1195 break;
1196
1197 test::Sleep(test::EpsilonDeadline());
1198 }
1199 ASSERT_EQ(100u, num_bytes);
1200
1201 if (this->IsStrictCircularBuffer()) {
1202 // Check that a two-phase read can now only read (at most) 90 bytes. (This
1203 // checks an implementation detail; this behavior is not guaranteed.)
1204 const void* read_buffer_ptr = nullptr;
1205 num_bytes = 0u;
1206 ASSERT_EQ(MOJO_RESULT_OK,
1207 this->ConsumerBeginReadData(&read_buffer_ptr, &num_bytes, false));
1208 EXPECT_TRUE(read_buffer_ptr);
1209 ASSERT_EQ(90u, num_bytes);
1210 ASSERT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
1211 }
1212
1213 // Read as much as possible (using |ConsumerReadData()|). We should read 100
1214 // bytes.
1215 num_bytes = static_cast<uint32_t>(MOJO_ARRAYSIZE(read_buffer) *
1216 sizeof(read_buffer[0]));
1217 memset(read_buffer, 0, num_bytes);
1218 ASSERT_EQ(MOJO_RESULT_OK,
1219 this->ConsumerReadData(read_buffer, &num_bytes, false, false));
1220 ASSERT_EQ(100u, num_bytes);
1221 ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
1222
1223 this->ProducerClose();
1224 this->ConsumerClose();
1225 }
1226 */
1227
1228 // Tests the behavior of writing (simple and two-phase), closing the producer,
1229 // then reading (simple and two-phase).
1230 TEST_F(DataPipeTest, WriteCloseProducerRead) {
1231 const char kTestData[] = "hello world";
1232 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1233
1234 const MojoCreateDataPipeOptions options = {
1235 kSizeOfOptions, // |struct_size|.
1236 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1237 1u, // |element_num_bytes|.
1238 1000u // |capacity_num_bytes|.
1239 };
1240 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1241
1242 // Write some data, so we'll have something to read.
1243 uint32_t num_bytes = kTestDataSize;
1244 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1245 ASSERT_EQ(kTestDataSize, num_bytes);
1246
1247 // Write it again, so we'll have something left over.
1248 num_bytes = kTestDataSize;
1249 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1250 ASSERT_EQ(kTestDataSize, num_bytes);
1251
1252 // Start two-phase write.
1253 void* write_buffer_ptr = nullptr;
1254 num_bytes = 0u;
1255 ASSERT_EQ(MOJO_RESULT_OK,
1256 BeginWriteData(&write_buffer_ptr, &num_bytes, false));
1257 EXPECT_TRUE(write_buffer_ptr);
1258 EXPECT_GT(num_bytes, 0u);
1259
1260 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1261 for (size_t i = 0; i < kMaxPoll; i++) {
1262 num_bytes = 0u;
1263 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1264 if (num_bytes >= 2u * kTestDataSize)
1265 break;
1266
1267 test::Sleep(test::EpsilonDeadline());
1268 }
1269 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1270
1271 // Start two-phase read.
1272 const void* read_buffer_ptr = nullptr;
1273 num_bytes = 0u;
1274 ASSERT_EQ(MOJO_RESULT_OK,
1275 BeginReadData(&read_buffer_ptr, &num_bytes));
1276 EXPECT_TRUE(read_buffer_ptr);
1277 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1278
1279 // Close the producer.
1280 CloseProducer();
1281
1282 // The consumer can finish its two-phase read.
1283 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1284 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
1285
1286 // And start another.
1287 read_buffer_ptr = nullptr;
1288 num_bytes = 0u;
1289 ASSERT_EQ(MOJO_RESULT_OK,
1290 BeginReadData(&read_buffer_ptr, &num_bytes));
1291 EXPECT_TRUE(read_buffer_ptr);
1292 ASSERT_EQ(kTestDataSize, num_bytes);
1293 }
1294
1295
1296 // Tests the behavior of interrupting a two-phase read and write by closing the
1297 // consumer.
1298 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
1299 const char kTestData[] = "hello world";
1300 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1301
1302 const MojoCreateDataPipeOptions options = {
1303 kSizeOfOptions, // |struct_size|.
1304 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1305 1u, // |element_num_bytes|.
1306 1000u // |capacity_num_bytes|.
1307 };
1308 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1309 MojoHandleSignalsState hss;
1310
1311 // Write some data, so we'll have something to read.
1312 uint32_t num_bytes = kTestDataSize;
1313 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1314 ASSERT_EQ(kTestDataSize, num_bytes);
1315
1316 // Start two-phase write.
1317 void* write_buffer_ptr = nullptr;
1318 num_bytes = 0u;
1319 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1320 EXPECT_TRUE(write_buffer_ptr);
1321 ASSERT_GT(num_bytes, kTestDataSize);
1322
1323 // Wait for data.
1324 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1325 hss = MojoHandleSignalsState();
1326 ASSERT_EQ(MOJO_RESULT_OK,
1327 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1328 MOJO_DEADLINE_INDEFINITE, &hss));
1329 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1330 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1331 hss.satisfiable_signals);
1332
1333 // Start two-phase read.
1334 const void* read_buffer_ptr = nullptr;
1335 num_bytes = 0u;
1336 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1337 EXPECT_TRUE(read_buffer_ptr);
1338 ASSERT_EQ(kTestDataSize, num_bytes);
1339
1340 // Close the consumer.
1341 CloseConsumer();
1342
1343 // Wait for producer to know that the consumer is closed.
1344 hss = MojoHandleSignalsState();
1345 ASSERT_EQ(MOJO_RESULT_OK,
1346 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1347 MOJO_DEADLINE_INDEFINITE, &hss));
1348 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1349 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1350
1351 // Actually write some data. (Note: Premature freeing of the buffer would
1352 // probably only be detected under ASAN or similar.)
1353 memcpy(write_buffer_ptr, kTestData, kTestDataSize);
1354 // Note: Even though the consumer has been closed, ending the two-phase
1355 // write will report success.
1356 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
1357
1358 // But trying to write should result in failure.
1359 num_bytes = kTestDataSize;
1360 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
1361
1362 // As will trying to start another two-phase write.
1363 write_buffer_ptr = nullptr;
1364 num_bytes = 0u;
1365 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1366 BeginWriteData(&write_buffer_ptr, &num_bytes));
1367 }
1368
1369 // Tests the behavior of "interrupting" a two-phase write by closing both the
1370 // producer and the consumer.
1371 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
1372 const uint32_t kTestDataSize = 15u;
1373
1374 const MojoCreateDataPipeOptions options = {
1375 kSizeOfOptions, // |struct_size|.
1376 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1377 1u, // |element_num_bytes|.
1378 1000u // |capacity_num_bytes|.
1379 };
1380 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1381
1382 // Start two-phase write.
1383 void* write_buffer_ptr = nullptr;
1384 uint32_t num_bytes = 0u;
1385 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1386 EXPECT_TRUE(write_buffer_ptr);
1387 ASSERT_GT(num_bytes, kTestDataSize);
1388 }
1389
1390 // Tests the behavior of writing, closing the producer, and then reading (with
1391 // and without data remaining).
1392 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
1393 const char kTestData[] = "hello world";
1394 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1395
1396 const MojoCreateDataPipeOptions options = {
1397 kSizeOfOptions, // |struct_size|.
1398 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1399 1u, // |element_num_bytes|.
1400 1000u // |capacity_num_bytes|.
1401 };
1402 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1403 MojoHandleSignalsState hss;
1404
1405 // Write some data, so we'll have something to read.
1406 uint32_t num_bytes = kTestDataSize;
1407 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1408 ASSERT_EQ(kTestDataSize, num_bytes);
1409
1410 // Close the producer.
1411 CloseProducer();
1412
1413 // Wait. (Note that once the consumer knows that the producer is closed, it
1414 // must also know about all the data that was sent.)
1415 hss = MojoHandleSignalsState();
1416 ASSERT_EQ(MOJO_RESULT_OK,
1417 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1418 MOJO_DEADLINE_INDEFINITE, &hss));
1419 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1420 hss.satisfied_signals);
1421 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1422 hss.satisfiable_signals);
1423
1424 // Peek that data.
1425 char buffer[1000];
1426 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1427 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
1428 ASSERT_EQ(kTestDataSize, num_bytes);
1429 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1430
1431 // Read that data.
1432 memset(buffer, 0, 1000);
1433 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1434 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
1435 ASSERT_EQ(kTestDataSize, num_bytes);
1436 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1437
1438 // A second read should fail.
1439 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1440 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
1441
1442 // A two-phase read should also fail.
1443 const void* read_buffer_ptr = nullptr;
1444 num_bytes = 0u;
1445 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1446 ReadData(&read_buffer_ptr, &num_bytes));
1447
1448 // Ditto for discard.
1449 num_bytes = 10u;
1450 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
1451 }
1452
1453 // Test that two-phase reads/writes behave correctly when given invalid
1454 // arguments.
1455 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
1456 const MojoCreateDataPipeOptions options = {
1457 kSizeOfOptions, // |struct_size|.
1458 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1459 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1460 10 * sizeof(int32_t) // |capacity_num_bytes|.
1461 };
1462 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1463 MojoHandleSignalsState hss;
1464
1465 // No data.
1466 uint32_t num_bytes = 1000u;
1467 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1468 ASSERT_EQ(0u, num_bytes);
1469
1470 // Try "ending" a two-phase write when one isn't active.
1471 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1472 EndWriteData(1u * sizeof(int32_t)));
1473
1474 // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
1475 // have time to propagate.
1476 test::Sleep(test::EpsilonDeadline());
1477
1478 // Still no data.
1479 num_bytes = 1000u;
1480 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1481 ASSERT_EQ(0u, num_bytes);
1482
1483 // Try ending a two-phase write with an invalid amount (too much).
1484 num_bytes = 0u;
1485 void* write_ptr = nullptr;
1486 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1487 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1488 EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1489
1490 // But the two-phase write still ended.
1491 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1492
1493 // Wait a bit (as above).
1494 test::Sleep(test::EpsilonDeadline());
1495
1496 // Still no data.
1497 num_bytes = 1000u;
1498 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1499 ASSERT_EQ(0u, num_bytes);
1500
1501 // Try ending a two-phase write with an invalid amount (not a multiple of the
1502 // element size).
1503 num_bytes = 0u;
1504 write_ptr = nullptr;
1505 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1506 EXPECT_GE(num_bytes, 1u);
1507 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
1508
1509 // But the two-phase write still ended.
1510 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1511
1512 // Wait a bit (as above).
1513 test::Sleep(test::EpsilonDeadline());
1514
1515 // Still no data.
1516 num_bytes = 1000u;
1517 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1518 ASSERT_EQ(0u, num_bytes);
1519
1520 // Now write some data, so we'll be able to try reading.
1521 int32_t element = 123;
1522 num_bytes = 1u * sizeof(int32_t);
1523 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
1524
1525 // Wait for data.
1526 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1527 hss = MojoHandleSignalsState();
1528 ASSERT_EQ(MOJO_RESULT_OK,
1529 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1530 MOJO_DEADLINE_INDEFINITE, &hss));
1531 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1532 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1533 hss.satisfiable_signals);
1534
1535 // One element available.
1536 num_bytes = 0u;
1537 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1538 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1539
1540 // Try "ending" a two-phase read when one isn't active.
1541 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
1542
1543 // Still one element available.
1544 num_bytes = 0u;
1545 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1546 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1547
1548 // Try ending a two-phase read with an invalid amount (too much).
1549 num_bytes = 0u;
1550 const void* read_ptr = nullptr;
1551 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1552 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1553 EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1554
1555 // Still one element available.
1556 num_bytes = 0u;
1557 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1558 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1559
1560 // Try ending a two-phase read with an invalid amount (not a multiple of the
1561 // element size).
1562 num_bytes = 0u;
1563 read_ptr = nullptr;
1564 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1565 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1566 ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
1567 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
1568
1569 // Still one element available.
1570 num_bytes = 0u;
1571 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1572 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1573 }
1574
1575 } // namespace
1576 } // namespace edk
1577 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698