Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(43)

Side by Side Diff: mojo/edk/system/data_pipe_impl_unittest.cc

Issue 1856113002: EDK: Add implementation of data pipe consumer read threshold stuff. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/data_pipe_impl.h ('k') | mojo/edk/system/dispatcher.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // This file contains tests that are shared between different implementations of 5 // This file contains tests that are shared between different implementations of
6 // |DataPipeImpl|. 6 // |DataPipeImpl|.
7 7
8 #include "mojo/edk/system/data_pipe_impl.h" 8 #include "mojo/edk/system/data_pipe_impl.h"
9 9
10 #include <stdint.h> 10 #include <stdint.h>
(...skipping 22 matching lines...) Expand all
33 33
34 using mojo::platform::PlatformPipe; 34 using mojo::platform::PlatformPipe;
35 using mojo::platform::ThreadSleep; 35 using mojo::platform::ThreadSleep;
36 using mojo::util::MakeRefCounted; 36 using mojo::util::MakeRefCounted;
37 using mojo::util::RefPtr; 37 using mojo::util::RefPtr;
38 38
39 namespace mojo { 39 namespace mojo {
40 namespace system { 40 namespace system {
41 namespace { 41 namespace {
42 42
43 const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
44 MOJO_HANDLE_SIGNAL_WRITABLE |
45 MOJO_HANDLE_SIGNAL_PEER_CLOSED;
46 const uint32_t kSizeOfOptions = 43 const uint32_t kSizeOfOptions =
47 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)); 44 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
48 45
49 // In various places, we have to poll (since, e.g., we can't yet wait for a 46 // In various places, we have to poll (since, e.g., we can't yet wait for a
50 // certain amount of data to be available). This is the maximum number of 47 // certain amount of data to be available). This is the maximum number of
51 // iterations (separated by a short sleep). 48 // iterations (separated by a short sleep).
52 // TODO(vtl): Get rid of this. 49 // TODO(vtl): Get rid of this.
53 const size_t kMaxPoll = 100; 50 const size_t kMaxPoll = 100;
54 51
55 // DataPipeImplTestHelper ------------------------------------------------------ 52 // DataPipeImplTestHelper ------------------------------------------------------
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
141 HandleSignalsState* signals_state) { 138 HandleSignalsState* signals_state) {
142 return dpp()->ProducerAddAwakable(awakable, signals, context, 139 return dpp()->ProducerAddAwakable(awakable, signals, context,
143 signals_state); 140 signals_state);
144 } 141 }
145 void ProducerRemoveAwakable(Awakable* awakable, 142 void ProducerRemoveAwakable(Awakable* awakable,
146 HandleSignalsState* signals_state) { 143 HandleSignalsState* signals_state) {
147 return dpp()->ProducerRemoveAwakable(awakable, signals_state); 144 return dpp()->ProducerRemoveAwakable(awakable, signals_state);
148 } 145 }
149 146
150 void ConsumerClose() { helper_->ConsumerClose(); } 147 void ConsumerClose() { helper_->ConsumerClose(); }
148 MojoResult ConsumerSetOptions(uint32_t read_threshold_num_bytes) {
149 return dpc()->ConsumerSetOptions(read_threshold_num_bytes);
150 }
151 void ConsumerGetOptions(uint32_t* read_threshold_num_bytes) {
152 dpc()->ConsumerGetOptions(read_threshold_num_bytes);
153 }
151 MojoResult ConsumerReadData(UserPointer<void> elements, 154 MojoResult ConsumerReadData(UserPointer<void> elements,
152 UserPointer<uint32_t> num_bytes, 155 UserPointer<uint32_t> num_bytes,
153 bool all_or_none, 156 bool all_or_none,
154 bool peek) { 157 bool peek) {
155 return dpc()->ConsumerReadData(elements, num_bytes, all_or_none, peek); 158 return dpc()->ConsumerReadData(elements, num_bytes, all_or_none, peek);
156 } 159 }
157 MojoResult ConsumerDiscardData(UserPointer<uint32_t> num_bytes, 160 MojoResult ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
158 bool all_or_none) { 161 bool all_or_none) {
159 return dpc()->ConsumerDiscardData(num_bytes, all_or_none); 162 return dpc()->ConsumerDiscardData(num_bytes, all_or_none);
160 } 163 }
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
281 MOJO_WRITE_MESSAGE_FLAG_NONE)); 284 MOJO_WRITE_MESSAGE_FLAG_NONE));
282 transport.End(); 285 transport.End();
283 } 286 }
284 uint32_t context = 0; 287 uint32_t context = 0;
285 ASSERT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context)); 288 ASSERT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
286 EXPECT_EQ(987u, context); 289 EXPECT_EQ(987u, context);
287 HandleSignalsState hss = HandleSignalsState(); 290 HandleSignalsState hss = HandleSignalsState();
288 message_pipe(dest_i)->RemoveAwakable(0, &waiter, &hss); 291 message_pipe(dest_i)->RemoveAwakable(0, &waiter, &hss);
289 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 292 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
290 hss.satisfied_signals); 293 hss.satisfied_signals);
291 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); 294 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
295 MOJO_HANDLE_SIGNAL_PEER_CLOSED,
296 hss.satisfiable_signals);
292 char read_buffer[100] = {}; 297 char read_buffer[100] = {};
293 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 298 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
294 DispatcherVector read_dispatchers; 299 DispatcherVector read_dispatchers;
295 uint32_t read_num_dispatchers = 10; // Maximum to get. 300 uint32_t read_num_dispatchers = 10; // Maximum to get.
296 ASSERT_EQ(MOJO_RESULT_OK, 301 ASSERT_EQ(MOJO_RESULT_OK,
297 message_pipe(dest_i)->ReadMessage( 302 message_pipe(dest_i)->ReadMessage(
298 0, UserPointer<void>(read_buffer), 303 0, UserPointer<void>(read_buffer),
299 MakeUserPointer(&read_buffer_size), &read_dispatchers, 304 MakeUserPointer(&read_buffer_size), &read_dispatchers,
300 &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE)); 305 &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE));
301 EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size)); 306 EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size));
(...skipping 352 matching lines...) Expand 10 before | Expand all | Expand 10 after
654 MakeUserPointer(&num_bytes), false)); 659 MakeUserPointer(&num_bytes), false));
655 // It should have written everything (even without "all or none"). 660 // It should have written everything (even without "all or none").
656 EXPECT_EQ(2u * sizeof(elements[0]), num_bytes); 661 EXPECT_EQ(2u * sizeof(elements[0]), num_bytes);
657 662
658 // Wait. 663 // Wait.
659 context = 0; 664 context = 0;
660 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context)); 665 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionTimeout(), &context));
661 EXPECT_EQ(123u, context); 666 EXPECT_EQ(123u, context);
662 hss = HandleSignalsState(); 667 hss = HandleSignalsState();
663 this->ConsumerRemoveAwakable(&waiter, &hss); 668 this->ConsumerRemoveAwakable(&waiter, &hss);
664 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 669 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
665 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 670 hss.satisfied_signals);
671 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
672 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
666 hss.satisfiable_signals); 673 hss.satisfiable_signals);
667 674
668 // Query. 675 // Query.
669 // TODO(vtl): It's theoretically possible (though not with the current 676 // TODO(vtl): It's theoretically possible (though not with the current
670 // implementation/configured limits) that not all the data has arrived yet. 677 // implementation/configured limits) that not all the data has arrived yet.
671 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...| 678 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
672 // or |2 * ...|.) 679 // or |2 * ...|.)
673 num_bytes = 0u; 680 num_bytes = 0u;
674 EXPECT_EQ(MOJO_RESULT_OK, 681 EXPECT_EQ(MOJO_RESULT_OK,
675 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 682 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
807 EXPECT_EQ(0u, hss.satisfied_signals); 814 EXPECT_EQ(0u, hss.satisfied_signals);
808 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 815 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
809 hss.satisfiable_signals); 816 hss.satisfiable_signals);
810 817
811 // Wait for data to become available to the consumer. 818 // Wait for data to become available to the consumer.
812 context = 0; 819 context = 0;
813 EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyTimeout(), &context)); 820 EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyTimeout(), &context));
814 EXPECT_EQ(1234u, context); 821 EXPECT_EQ(1234u, context);
815 hss = HandleSignalsState(); 822 hss = HandleSignalsState();
816 this->ConsumerRemoveAwakable(&cwaiter, &hss); 823 this->ConsumerRemoveAwakable(&cwaiter, &hss);
817 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 824 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
818 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 825 hss.satisfied_signals);
826 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
827 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
819 hss.satisfiable_signals); 828 hss.satisfiable_signals);
820 829
821 // Peek one element. 830 // Peek one element.
822 elements[0] = -1; 831 elements[0] = -1;
823 elements[1] = -1; 832 elements[1] = -1;
824 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 833 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
825 EXPECT_EQ(MOJO_RESULT_OK, 834 EXPECT_EQ(MOJO_RESULT_OK,
826 this->ConsumerReadData(UserPointer<void>(elements), 835 this->ConsumerReadData(UserPointer<void>(elements),
827 MakeUserPointer(&num_bytes), true, true)); 836 MakeUserPointer(&num_bytes), true, true));
828 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 837 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after
1028 HandleSignalsState hss; 1037 HandleSignalsState hss;
1029 uint32_t context; 1038 uint32_t context;
1030 1039
1031 // Never writable. 1040 // Never writable.
1032 waiter.Init(); 1041 waiter.Init();
1033 hss = HandleSignalsState(); 1042 hss = HandleSignalsState();
1034 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1043 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1035 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, 1044 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12,
1036 &hss)); 1045 &hss));
1037 EXPECT_EQ(0u, hss.satisfied_signals); 1046 EXPECT_EQ(0u, hss.satisfied_signals);
1038 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1047 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1048 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1039 hss.satisfiable_signals); 1049 hss.satisfiable_signals);
1040 1050
1041 // Add waiter: not yet readable. 1051 // Add waiter: not yet readable.
1042 waiter.Init(); 1052 waiter.Init();
1043 ASSERT_EQ(MOJO_RESULT_OK, 1053 ASSERT_EQ(MOJO_RESULT_OK,
1044 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, 1054 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34,
1045 nullptr)); 1055 nullptr));
1046 1056
1047 // Write two elements. 1057 // Write two elements.
1048 int32_t elements[2] = {123, 456}; 1058 int32_t elements[2] = {123, 456};
1049 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 1059 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
1050 EXPECT_EQ(MOJO_RESULT_OK, 1060 EXPECT_EQ(MOJO_RESULT_OK,
1051 this->ProducerWriteData(UserPointer<const void>(elements), 1061 this->ProducerWriteData(UserPointer<const void>(elements),
1052 MakeUserPointer(&num_bytes), true)); 1062 MakeUserPointer(&num_bytes), true));
1053 1063
1054 // Wait for readability (needed for remote cases). 1064 // Wait for readability (needed for remote cases).
1055 context = 0; 1065 context = 0;
1056 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context)); 1066 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context));
1057 EXPECT_EQ(34u, context); 1067 EXPECT_EQ(34u, context);
1058 hss = HandleSignalsState(); 1068 hss = HandleSignalsState();
1059 this->ConsumerRemoveAwakable(&waiter, &hss); 1069 this->ConsumerRemoveAwakable(&waiter, &hss);
1060 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1070 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1061 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1071 hss.satisfied_signals);
1072 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1073 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1062 hss.satisfiable_signals); 1074 hss.satisfiable_signals);
1063 1075
1064 // Discard one element. 1076 // Discard one element.
1065 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1077 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1066 EXPECT_EQ(MOJO_RESULT_OK, 1078 EXPECT_EQ(MOJO_RESULT_OK,
1067 this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true)); 1079 this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1068 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1080 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1069 1081
1070 // Should still be readable. 1082 // Should still be readable.
1071 waiter.Init(); 1083 waiter.Init();
1072 hss = HandleSignalsState(); 1084 hss = HandleSignalsState();
1073 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1085 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1074 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, 1086 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78,
1075 &hss)); 1087 &hss));
1076 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1088 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1077 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1089 hss.satisfied_signals);
1090 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1091 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1078 hss.satisfiable_signals); 1092 hss.satisfiable_signals);
1079 1093
1080 // Peek one element. 1094 // Peek one element.
1081 elements[0] = -1; 1095 elements[0] = -1;
1082 elements[1] = -1; 1096 elements[1] = -1;
1083 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1097 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1084 EXPECT_EQ(MOJO_RESULT_OK, 1098 EXPECT_EQ(MOJO_RESULT_OK,
1085 this->ConsumerReadData(UserPointer<void>(elements), 1099 this->ConsumerReadData(UserPointer<void>(elements),
1086 MakeUserPointer(&num_bytes), true, true)); 1100 MakeUserPointer(&num_bytes), true, true));
1087 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1101 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1088 EXPECT_EQ(456, elements[0]); 1102 EXPECT_EQ(456, elements[0]);
1089 EXPECT_EQ(-1, elements[1]); 1103 EXPECT_EQ(-1, elements[1]);
1090 1104
1091 // Should still be readable. 1105 // Should still be readable.
1092 waiter.Init(); 1106 waiter.Init();
1093 hss = HandleSignalsState(); 1107 hss = HandleSignalsState();
1094 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1108 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1095 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, 1109 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78,
1096 &hss)); 1110 &hss));
1097 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1111 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1098 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1112 hss.satisfied_signals);
1113 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1114 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1099 hss.satisfiable_signals); 1115 hss.satisfiable_signals);
1100 1116
1101 // Read one element. 1117 // Read one element.
1102 elements[0] = -1; 1118 elements[0] = -1;
1103 elements[1] = -1; 1119 elements[1] = -1;
1104 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1120 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1105 EXPECT_EQ(MOJO_RESULT_OK, 1121 EXPECT_EQ(MOJO_RESULT_OK,
1106 this->ConsumerReadData(UserPointer<void>(elements), 1122 this->ConsumerReadData(UserPointer<void>(elements),
1107 MakeUserPointer(&num_bytes), true, false)); 1123 MakeUserPointer(&num_bytes), true, false));
1108 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1124 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
(...skipping 13 matching lines...) Expand all
1122 EXPECT_EQ(MOJO_RESULT_OK, 1138 EXPECT_EQ(MOJO_RESULT_OK,
1123 this->ProducerWriteData(UserPointer<const void>(elements), 1139 this->ProducerWriteData(UserPointer<const void>(elements),
1124 MakeUserPointer(&num_bytes), true)); 1140 MakeUserPointer(&num_bytes), true));
1125 1141
1126 // Waiting should now succeed. 1142 // Waiting should now succeed.
1127 context = 0; 1143 context = 0;
1128 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context)); 1144 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context));
1129 EXPECT_EQ(90u, context); 1145 EXPECT_EQ(90u, context);
1130 hss = HandleSignalsState(); 1146 hss = HandleSignalsState();
1131 this->ConsumerRemoveAwakable(&waiter, &hss); 1147 this->ConsumerRemoveAwakable(&waiter, &hss);
1132 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1148 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1133 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1149 hss.satisfied_signals);
1150 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1151 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1134 hss.satisfiable_signals); 1152 hss.satisfiable_signals);
1135 1153
1136 // We'll want to wait for the peer closed signal to propagate. 1154 // We'll want to wait for the peer closed signal to propagate.
1137 waiter.Init(); 1155 waiter.Init();
1138 EXPECT_EQ(MOJO_RESULT_OK, 1156 EXPECT_EQ(MOJO_RESULT_OK,
1139 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1157 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1140 12, nullptr)); 1158 12, nullptr));
1141 1159
1142 // Close the producer. 1160 // Close the producer.
1143 this->ProducerClose(); 1161 this->ProducerClose();
1144 1162
1145 // Should still be readable, even if the peer closed signal hasn't propagated 1163 // Should still be readable, even if the peer closed signal hasn't propagated
1146 // yet. 1164 // yet.
1147 waiter2.Init(); 1165 waiter2.Init();
1148 hss = HandleSignalsState(); 1166 hss = HandleSignalsState();
1149 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1167 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1150 this->ConsumerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_READABLE, 34, 1168 this->ConsumerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_READABLE, 34,
1151 &hss)); 1169 &hss));
1152 // We don't know if the peer closed signal has propagated yet (for the remote 1170 // We don't know if the peer closed signal has propagated yet (for the remote
1153 // cases). 1171 // cases).
1154 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE)); 1172 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
1155 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1173 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1174 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1156 hss.satisfiable_signals); 1175 hss.satisfiable_signals);
1157 1176
1158 // Wait for the peer closed signal. 1177 // Wait for the peer closed signal.
1159 context = 0; 1178 context = 0;
1160 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context)); 1179 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context));
1161 EXPECT_EQ(12u, context); 1180 EXPECT_EQ(12u, context);
1162 hss = HandleSignalsState(); 1181 hss = HandleSignalsState();
1163 this->ConsumerRemoveAwakable(&waiter, &hss); 1182 this->ConsumerRemoveAwakable(&waiter, &hss);
1164 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1183 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1184 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1165 hss.satisfied_signals); 1185 hss.satisfied_signals);
1166 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1186 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1187 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1167 hss.satisfiable_signals); 1188 hss.satisfiable_signals);
1168 1189
1169 // Read one element. 1190 // Read one element.
1170 elements[0] = -1; 1191 elements[0] = -1;
1171 elements[1] = -1; 1192 elements[1] = -1;
1172 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1193 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1173 EXPECT_EQ(MOJO_RESULT_OK, 1194 EXPECT_EQ(MOJO_RESULT_OK,
1174 this->ConsumerReadData(UserPointer<void>(elements), 1195 this->ConsumerReadData(UserPointer<void>(elements),
1175 MakeUserPointer(&num_bytes), true, false)); 1196 MakeUserPointer(&num_bytes), true, false));
1176 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1197 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
1225 elements[1] = 456; 1246 elements[1] = 456;
1226 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>( 1247 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>(
1227 2u * sizeof(elements[0])))); 1248 2u * sizeof(elements[0]))));
1228 1249
1229 // Wait for readability (needed for remote cases). 1250 // Wait for readability (needed for remote cases).
1230 context = 0; 1251 context = 0;
1231 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context)); 1252 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context));
1232 EXPECT_EQ(12u, context); 1253 EXPECT_EQ(12u, context);
1233 hss = HandleSignalsState(); 1254 hss = HandleSignalsState();
1234 this->ConsumerRemoveAwakable(&waiter, &hss); 1255 this->ConsumerRemoveAwakable(&waiter, &hss);
1235 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1256 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1236 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1257 hss.satisfied_signals);
1258 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1259 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1237 hss.satisfiable_signals); 1260 hss.satisfiable_signals);
1238 1261
1239 // Read one element. 1262 // Read one element.
1240 const void* read_buffer = nullptr; 1263 const void* read_buffer = nullptr;
1241 num_bytes = 0u; 1264 num_bytes = 0u;
1242 EXPECT_EQ(MOJO_RESULT_OK, 1265 EXPECT_EQ(MOJO_RESULT_OK,
1243 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer), 1266 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
1244 MakeUserPointer(&num_bytes))); 1267 MakeUserPointer(&num_bytes)));
1245 EXPECT_TRUE(read_buffer); 1268 EXPECT_TRUE(read_buffer);
1246 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 1269 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
1247 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer); 1270 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
1248 EXPECT_EQ(123, read_elements[0]); 1271 EXPECT_EQ(123, read_elements[0]);
1249 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>( 1272 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
1250 1u * sizeof(elements[0])))); 1273 1u * sizeof(elements[0]))));
1251 1274
1252 // Should still be readable. 1275 // Should still be readable.
1253 waiter.Init(); 1276 waiter.Init();
1254 hss = HandleSignalsState(); 1277 hss = HandleSignalsState();
1255 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1278 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1256 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, 1279 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34,
1257 &hss)); 1280 &hss));
1258 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1281 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1259 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1282 hss.satisfied_signals);
1283 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1284 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1260 hss.satisfiable_signals); 1285 hss.satisfiable_signals);
1261 1286
1262 // Read one element. 1287 // Read one element.
1263 // Request three, but not in all-or-none mode. 1288 // Request three, but not in all-or-none mode.
1264 read_buffer = nullptr; 1289 read_buffer = nullptr;
1265 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 1290 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
1266 EXPECT_EQ(MOJO_RESULT_OK, 1291 EXPECT_EQ(MOJO_RESULT_OK,
1267 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer), 1292 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
1268 MakeUserPointer(&num_bytes))); 1293 MakeUserPointer(&num_bytes)));
1269 EXPECT_TRUE(read_buffer); 1294 EXPECT_TRUE(read_buffer);
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
1357 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3, 1382 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3,
1358 &hss)); 1383 &hss));
1359 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 1384 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
1360 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1385 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1361 hss.satisfiable_signals); 1386 hss.satisfiable_signals);
1362 1387
1363 // It should become readable. 1388 // It should become readable.
1364 EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyTimeout(), nullptr)); 1389 EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyTimeout(), nullptr));
1365 hss = HandleSignalsState(); 1390 hss = HandleSignalsState();
1366 this->ConsumerRemoveAwakable(&cwaiter, &hss); 1391 this->ConsumerRemoveAwakable(&cwaiter, &hss);
1367 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1392 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1368 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1393 hss.satisfied_signals);
1394 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1395 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1369 hss.satisfiable_signals); 1396 hss.satisfiable_signals);
1370 1397
1371 // Start another two-phase write and check that it's readable even in the 1398 // Start another two-phase write and check that it's readable even in the
1372 // middle of it. 1399 // middle of it.
1373 write_ptr = nullptr; 1400 write_ptr = nullptr;
1374 num_bytes = 0u; 1401 num_bytes = 0u;
1375 EXPECT_EQ(MOJO_RESULT_OK, 1402 EXPECT_EQ(MOJO_RESULT_OK,
1376 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr), 1403 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
1377 MakeUserPointer(&num_bytes))); 1404 MakeUserPointer(&num_bytes)));
1378 EXPECT_TRUE(write_ptr); 1405 EXPECT_TRUE(write_ptr);
1379 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 1406 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
1380 1407
1381 // It should be readable. 1408 // It should be readable.
1382 cwaiter.Init(); 1409 cwaiter.Init();
1383 hss = HandleSignalsState(); 1410 hss = HandleSignalsState();
1384 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1411 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1385 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5, 1412 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5,
1386 &hss)); 1413 &hss));
1387 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1414 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1388 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1415 hss.satisfied_signals);
1416 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1417 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1389 hss.satisfiable_signals); 1418 hss.satisfiable_signals);
1390 1419
1391 // End the two-phase write without writing anything. 1420 // End the two-phase write without writing anything.
1392 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u)); 1421 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
1393 1422
1394 // Start a two-phase read. 1423 // Start a two-phase read.
1395 const void* read_ptr = nullptr; 1424 const void* read_ptr = nullptr;
1396 num_bytes = 0u; 1425 num_bytes = 0u;
1397 EXPECT_EQ(MOJO_RESULT_OK, 1426 EXPECT_EQ(MOJO_RESULT_OK,
1398 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr), 1427 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
(...skipping 13 matching lines...) Expand all
1412 1441
1413 // But not readable. 1442 // But not readable.
1414 cwaiter.Init(); 1443 cwaiter.Init();
1415 ASSERT_EQ(MOJO_RESULT_OK, 1444 ASSERT_EQ(MOJO_RESULT_OK,
1416 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 7, 1445 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 7,
1417 nullptr)); 1446 nullptr));
1418 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr)); 1447 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr));
1419 hss = HandleSignalsState(); 1448 hss = HandleSignalsState();
1420 this->ConsumerRemoveAwakable(&cwaiter, &hss); 1449 this->ConsumerRemoveAwakable(&cwaiter, &hss);
1421 EXPECT_EQ(0u, hss.satisfied_signals); 1450 EXPECT_EQ(0u, hss.satisfied_signals);
1422 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1451 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1452 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1423 hss.satisfiable_signals); 1453 hss.satisfiable_signals);
1424 1454
1425 // End the two-phase read without reading anything. 1455 // End the two-phase read without reading anything.
1426 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u)); 1456 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
1427 1457
1428 // It should be readable again. 1458 // It should be readable again.
1429 cwaiter.Init(); 1459 cwaiter.Init();
1430 hss = HandleSignalsState(); 1460 hss = HandleSignalsState();
1431 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1461 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1432 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8, 1462 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8,
1433 &hss)); 1463 &hss));
1434 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1464 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1435 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1465 hss.satisfied_signals);
1466 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1467 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1436 hss.satisfiable_signals); 1468 hss.satisfiable_signals);
1437 1469
1438 this->ProducerClose(); 1470 this->ProducerClose();
1439 this->ConsumerClose(); 1471 this->ConsumerClose();
1440 } 1472 }
1441 1473
1442 void Seq(int32_t start, size_t count, int32_t* out) { 1474 void Seq(int32_t start, size_t count, int32_t* out) {
1443 for (size_t i = 0; i < count; i++) 1475 for (size_t i = 0; i < count; i++)
1444 out[i] = start + static_cast<int32_t>(i); 1476 out[i] = start + static_cast<int32_t>(i);
1445 } 1477 }
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1486 EXPECT_EQ(5u * sizeof(int32_t), num_bytes); 1518 EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1487 1519
1488 // Wait for data. 1520 // Wait for data.
1489 // TODO(vtl): There's no real guarantee that all the data will become 1521 // TODO(vtl): There's no real guarantee that all the data will become
1490 // available at once (except that in current implementations, with reasonable 1522 // available at once (except that in current implementations, with reasonable
1491 // limits, it will). Eventually, we'll be able to wait for a specified amount 1523 // limits, it will). Eventually, we'll be able to wait for a specified amount
1492 // of data to become available. 1524 // of data to become available.
1493 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 1525 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
1494 hss = HandleSignalsState(); 1526 hss = HandleSignalsState();
1495 this->ConsumerRemoveAwakable(&waiter, &hss); 1527 this->ConsumerRemoveAwakable(&waiter, &hss);
1496 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1528 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1497 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1529 hss.satisfied_signals);
1530 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1531 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1498 hss.satisfiable_signals); 1532 hss.satisfiable_signals);
1499 1533
1500 // Half full. 1534 // Half full.
1501 num_bytes = 0u; 1535 num_bytes = 0u;
1502 EXPECT_EQ(MOJO_RESULT_OK, 1536 EXPECT_EQ(MOJO_RESULT_OK,
1503 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 1537 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1504 EXPECT_EQ(5u * sizeof(int32_t), num_bytes); 1538 EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1505 1539
1506 // Too much. 1540 // Too much.
1507 num_bytes = 6u * sizeof(int32_t); 1541 num_bytes = 6u * sizeof(int32_t);
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
1597 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1631 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1598 2, nullptr)); 1632 2, nullptr));
1599 1633
1600 // Close the producer, then test producer-closed cases. 1634 // Close the producer, then test producer-closed cases.
1601 this->ProducerClose(); 1635 this->ProducerClose();
1602 1636
1603 // Wait. 1637 // Wait.
1604 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 1638 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
1605 hss = HandleSignalsState(); 1639 hss = HandleSignalsState();
1606 this->ConsumerRemoveAwakable(&waiter, &hss); 1640 this->ConsumerRemoveAwakable(&waiter, &hss);
1607 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1641 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1642 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1608 hss.satisfied_signals); 1643 hss.satisfied_signals);
1609 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1644 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1645 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1610 hss.satisfiable_signals); 1646 hss.satisfiable_signals);
1611 1647
1612 // Try reading too much; "failed precondition" since the producer is closed. 1648 // Try reading too much; "failed precondition" since the producer is closed.
1613 num_bytes = 4u * sizeof(int32_t); 1649 num_bytes = 4u * sizeof(int32_t);
1614 memset(buffer, 0xab, sizeof(buffer)); 1650 memset(buffer, 0xab, sizeof(buffer));
1615 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1651 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1616 this->ConsumerReadData(UserPointer<void>(buffer), 1652 this->ConsumerReadData(UserPointer<void>(buffer),
1617 MakeUserPointer(&num_bytes), true, false)); 1653 MakeUserPointer(&num_bytes), true, false));
1618 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 1654 memset(expected_buffer, 0xab, sizeof(expected_buffer));
1619 EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 1655 EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
1687 EXPECT_EQ(MOJO_RESULT_OK, 1723 EXPECT_EQ(MOJO_RESULT_OK,
1688 this->ProducerWriteData(UserPointer<const void>(&test_data[0]), 1724 this->ProducerWriteData(UserPointer<const void>(&test_data[0]),
1689 MakeUserPointer(&num_bytes), false)); 1725 MakeUserPointer(&num_bytes), false));
1690 EXPECT_EQ(20u, num_bytes); 1726 EXPECT_EQ(20u, num_bytes);
1691 1727
1692 // Wait for data. 1728 // Wait for data.
1693 // TODO(vtl): (See corresponding TODO in AllOrNone.) 1729 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1694 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 1730 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
1695 hss = HandleSignalsState(); 1731 hss = HandleSignalsState();
1696 this->ConsumerRemoveAwakable(&waiter, &hss); 1732 this->ConsumerRemoveAwakable(&waiter, &hss);
1697 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1733 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1698 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1734 hss.satisfied_signals);
1735 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1736 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1699 hss.satisfiable_signals); 1737 hss.satisfiable_signals);
1700 1738
1701 // Read 10 bytes. 1739 // Read 10 bytes.
1702 unsigned char read_buffer[1000] = {0}; 1740 unsigned char read_buffer[1000] = {0};
1703 num_bytes = 10u; 1741 num_bytes = 10u;
1704 EXPECT_EQ(MOJO_RESULT_OK, 1742 EXPECT_EQ(MOJO_RESULT_OK,
1705 this->ConsumerReadData(UserPointer<void>(read_buffer), 1743 this->ConsumerReadData(UserPointer<void>(read_buffer),
1706 MakeUserPointer(&num_bytes), false, false)); 1744 MakeUserPointer(&num_bytes), false, false));
1707 EXPECT_EQ(10u, num_bytes); 1745 EXPECT_EQ(10u, num_bytes);
1708 EXPECT_EQ(0, memcmp(read_buffer, &test_data[0], 10u)); 1746 EXPECT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after
1899 this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr), 1937 this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
1900 MakeUserPointer(&num_bytes))); 1938 MakeUserPointer(&num_bytes)));
1901 EXPECT_TRUE(write_buffer_ptr); 1939 EXPECT_TRUE(write_buffer_ptr);
1902 ASSERT_GT(num_bytes, kTestDataSize); 1940 ASSERT_GT(num_bytes, kTestDataSize);
1903 1941
1904 // Wait for data. 1942 // Wait for data.
1905 // TODO(vtl): (See corresponding TODO in AllOrNone.) 1943 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1906 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 1944 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
1907 hss = HandleSignalsState(); 1945 hss = HandleSignalsState();
1908 this->ConsumerRemoveAwakable(&waiter, &hss); 1946 this->ConsumerRemoveAwakable(&waiter, &hss);
1909 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1947 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1910 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1948 hss.satisfied_signals);
1949 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1950 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1911 hss.satisfiable_signals); 1951 hss.satisfiable_signals);
1912 1952
1913 // Start two-phase read. 1953 // Start two-phase read.
1914 const void* read_buffer_ptr = nullptr; 1954 const void* read_buffer_ptr = nullptr;
1915 num_bytes = 0u; 1955 num_bytes = 0u;
1916 EXPECT_EQ(MOJO_RESULT_OK, 1956 EXPECT_EQ(MOJO_RESULT_OK,
1917 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr), 1957 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
1918 MakeUserPointer(&num_bytes))); 1958 MakeUserPointer(&num_bytes)));
1919 EXPECT_TRUE(read_buffer_ptr); 1959 EXPECT_TRUE(read_buffer_ptr);
1920 EXPECT_EQ(kTestDataSize, num_bytes); 1960 EXPECT_EQ(kTestDataSize, num_bytes);
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
2017 1, nullptr)); 2057 1, nullptr));
2018 2058
2019 // Close the producer. 2059 // Close the producer.
2020 this->ProducerClose(); 2060 this->ProducerClose();
2021 2061
2022 // Wait. (Note that once the consumer knows that the producer is closed, it 2062 // Wait. (Note that once the consumer knows that the producer is closed, it
2023 // must also know about all the data that was sent.) 2063 // must also know about all the data that was sent.)
2024 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2064 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2025 hss = HandleSignalsState(); 2065 hss = HandleSignalsState();
2026 this->ConsumerRemoveAwakable(&waiter, &hss); 2066 this->ConsumerRemoveAwakable(&waiter, &hss);
2027 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2067 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2068 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2028 hss.satisfied_signals); 2069 hss.satisfied_signals);
2029 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2070 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2071 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2030 hss.satisfiable_signals); 2072 hss.satisfiable_signals);
2031 2073
2032 // Peek that data. 2074 // Peek that data.
2033 char buffer[1000]; 2075 char buffer[1000];
2034 num_bytes = static_cast<uint32_t>(sizeof(buffer)); 2076 num_bytes = static_cast<uint32_t>(sizeof(buffer));
2035 EXPECT_EQ(MOJO_RESULT_OK, 2077 EXPECT_EQ(MOJO_RESULT_OK,
2036 this->ConsumerReadData(UserPointer<void>(buffer), 2078 this->ConsumerReadData(UserPointer<void>(buffer),
2037 MakeUserPointer(&num_bytes), false, true)); 2079 MakeUserPointer(&num_bytes), false, true));
2038 EXPECT_EQ(kTestDataSize, num_bytes); 2080 EXPECT_EQ(kTestDataSize, num_bytes);
2039 EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize)); 2081 EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
2096 uint32_t num_bytes = kTestDataSize; 2138 uint32_t num_bytes = kTestDataSize;
2097 EXPECT_EQ(MOJO_RESULT_OK, 2139 EXPECT_EQ(MOJO_RESULT_OK,
2098 this->ProducerWriteData(UserPointer<const void>(&kTestData), 2140 this->ProducerWriteData(UserPointer<const void>(&kTestData),
2099 MakeUserPointer(&num_bytes), false)); 2141 MakeUserPointer(&num_bytes), false));
2100 EXPECT_EQ(kTestDataSize, num_bytes); 2142 EXPECT_EQ(kTestDataSize, num_bytes);
2101 2143
2102 // Wait. 2144 // Wait.
2103 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2145 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2104 hss = HandleSignalsState(); 2146 hss = HandleSignalsState();
2105 this->ConsumerRemoveAwakable(&waiter, &hss); 2147 this->ConsumerRemoveAwakable(&waiter, &hss);
2106 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 2148 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2107 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2149 hss.satisfied_signals);
2150 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2151 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2108 hss.satisfiable_signals); 2152 hss.satisfiable_signals);
2109 2153
2110 // Read that data. 2154 // Read that data.
2111 int64_t data[10] = {}; 2155 int64_t data[10] = {};
2112 num_bytes = static_cast<uint32_t>(sizeof(data)); 2156 num_bytes = static_cast<uint32_t>(sizeof(data));
2113 EXPECT_EQ(MOJO_RESULT_OK, 2157 EXPECT_EQ(MOJO_RESULT_OK,
2114 this->ConsumerReadData(UserPointer<void>(data), 2158 this->ConsumerReadData(UserPointer<void>(data),
2115 MakeUserPointer(&num_bytes), false, false)); 2159 MakeUserPointer(&num_bytes), false, false));
2116 EXPECT_EQ(kTestDataSize, num_bytes); 2160 EXPECT_EQ(kTestDataSize, num_bytes);
2117 EXPECT_EQ(kTestData, data[0]); 2161 EXPECT_EQ(kTestData, data[0]);
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
2165 uint32_t num_bytes = kTestDataSize; 2209 uint32_t num_bytes = kTestDataSize;
2166 EXPECT_EQ(MOJO_RESULT_OK, 2210 EXPECT_EQ(MOJO_RESULT_OK,
2167 this->ProducerWriteData(UserPointer<const void>(&kTestData), 2211 this->ProducerWriteData(UserPointer<const void>(&kTestData),
2168 MakeUserPointer(&num_bytes), false)); 2212 MakeUserPointer(&num_bytes), false));
2169 EXPECT_EQ(kTestDataSize, num_bytes); 2213 EXPECT_EQ(kTestDataSize, num_bytes);
2170 2214
2171 // Wait. 2215 // Wait.
2172 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2216 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2173 hss = HandleSignalsState(); 2217 hss = HandleSignalsState();
2174 this->ConsumerRemoveAwakable(&waiter, &hss); 2218 this->ConsumerRemoveAwakable(&waiter, &hss);
2175 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 2219 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2176 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2220 hss.satisfied_signals);
2221 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2222 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2177 hss.satisfiable_signals); 2223 hss.satisfiable_signals);
2178 2224
2179 // Start a two-phase read. 2225 // Start a two-phase read.
2180 num_bytes = 0u; 2226 num_bytes = 0u;
2181 const void* read_ptr = nullptr; 2227 const void* read_ptr = nullptr;
2182 EXPECT_EQ(MOJO_RESULT_OK, 2228 EXPECT_EQ(MOJO_RESULT_OK,
2183 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr), 2229 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
2184 MakeUserPointer(&num_bytes))); 2230 MakeUserPointer(&num_bytes)));
2185 EXPECT_EQ(kTestDataSize, num_bytes); 2231 EXPECT_EQ(kTestDataSize, num_bytes);
2186 EXPECT_EQ(kTestData, static_cast<const int64_t*>(read_ptr)[0]); 2232 EXPECT_EQ(kTestData, static_cast<const int64_t*>(read_ptr)[0]);
2187 2233
2188 // Add waiter (for the producer to be closed). 2234 // Add waiter (for the producer to be closed).
2189 waiter.Init(); 2235 waiter.Init();
2190 ASSERT_EQ(MOJO_RESULT_OK, 2236 ASSERT_EQ(MOJO_RESULT_OK,
2191 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2237 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2192 0, nullptr)); 2238 0, nullptr));
2193 2239
2194 // Close the producer. 2240 // Close the producer.
2195 this->ProducerClose(); 2241 this->ProducerClose();
2196 2242
2197 // Wait for producer close to be detected. 2243 // Wait for producer close to be detected.
2198 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2244 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2199 hss = HandleSignalsState(); 2245 hss = HandleSignalsState();
2200 this->ConsumerRemoveAwakable(&waiter, &hss); 2246 this->ConsumerRemoveAwakable(&waiter, &hss);
2201 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2247 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2202 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2248 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2249 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2203 hss.satisfiable_signals); 2250 hss.satisfiable_signals);
2204 2251
2205 // Add waiter (for the consumer to become readable). 2252 // Add waiter (for the consumer to become readable).
2206 waiter.Init(); 2253 waiter.Init();
2207 ASSERT_EQ(MOJO_RESULT_OK, 2254 ASSERT_EQ(MOJO_RESULT_OK,
2208 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0, 2255 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0,
2209 nullptr)); 2256 nullptr));
2210 2257
2211 // Complete the two-phase read. 2258 // Complete the two-phase read.
2212 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize)); 2259 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize));
(...skipping 162 matching lines...) Expand 10 before | Expand all | Expand 10 after
2375 num_bytes = 1u * sizeof(int32_t); 2422 num_bytes = 1u * sizeof(int32_t);
2376 EXPECT_EQ(MOJO_RESULT_OK, 2423 EXPECT_EQ(MOJO_RESULT_OK,
2377 this->ProducerWriteData(UserPointer<const void>(&element), 2424 this->ProducerWriteData(UserPointer<const void>(&element),
2378 MakeUserPointer(&num_bytes), false)); 2425 MakeUserPointer(&num_bytes), false));
2379 2426
2380 // Wait for data. 2427 // Wait for data.
2381 // TODO(vtl): (See corresponding TODO in AllOrNone.) 2428 // TODO(vtl): (See corresponding TODO in AllOrNone.)
2382 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2429 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2383 hss = HandleSignalsState(); 2430 hss = HandleSignalsState();
2384 this->ConsumerRemoveAwakable(&waiter, &hss); 2431 this->ConsumerRemoveAwakable(&waiter, &hss);
2385 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 2432 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2386 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2433 hss.satisfied_signals);
2434 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2435 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2387 hss.satisfiable_signals); 2436 hss.satisfiable_signals);
2388 2437
2389 // One element available. 2438 // One element available.
2390 num_bytes = 0u; 2439 num_bytes = 0u;
2391 EXPECT_EQ(MOJO_RESULT_OK, 2440 EXPECT_EQ(MOJO_RESULT_OK,
2392 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 2441 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
2393 EXPECT_EQ(1u * sizeof(int32_t), num_bytes); 2442 EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
2394 2443
2395 // Try "ending" a two-phase read when one isn't active. 2444 // Try "ending" a two-phase read when one isn't active.
2396 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2445 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
2493 // |ConsumerEndReadData()| below) we want |producer_open()| to be false but 2542 // |ConsumerEndReadData()| below) we want |producer_open()| to be false but
2494 // the call to |channel_endpoint_->EnqueueMessage()| to fail. (This race can 2543 // the call to |channel_endpoint_->EnqueueMessage()| to fail. (This race can
2495 // occur without the sleep, but is much less likely.) 2544 // occur without the sleep, but is much less likely.)
2496 ThreadSleep(10u); 2545 ThreadSleep(10u);
2497 2546
2498 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(num_bytes)); 2547 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(num_bytes));
2499 2548
2500 this->ConsumerClose(); 2549 this->ConsumerClose();
2501 } 2550 }
2502 2551
2552 TYPED_TEST(DataPipeImplTest, ReadThreshold) {
2553 const MojoCreateDataPipeOptions options = {
2554 kSizeOfOptions, // |struct_size|.
2555 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
2556 1u, // |element_num_bytes|.
2557 1000u // |capacity_num_bytes|.
2558 };
2559 this->Create(options);
2560 this->DoTransfer();
2561
2562 // The default read threshold should be 0 (which means "default", i.e., one
2563 // element).
2564 uint32_t read_threshold_num_bytes = 123u;
2565 this->ConsumerGetOptions(&read_threshold_num_bytes);
2566 EXPECT_EQ(0u, read_threshold_num_bytes);
2567
2568 Waiter waiter;
2569 HandleSignalsState hss;
2570
2571 // Add a waiter.
2572 waiter.Init();
2573 ASSERT_EQ(MOJO_RESULT_OK,
2574 this->ConsumerAddAwakable(
2575 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
2576 // Trivial wait: it shouldn't have the read threshold signal.
2577 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2578 hss = HandleSignalsState();
2579 this->ConsumerRemoveAwakable(&waiter, &hss);
2580 EXPECT_EQ(0u, hss.satisfied_signals);
2581 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2582 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2583 hss.satisfiable_signals);
2584
2585 // Add a waiter.
2586 waiter.Init();
2587 ASSERT_EQ(MOJO_RESULT_OK,
2588 this->ConsumerAddAwakable(
2589 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
2590
2591 // Write a byte.
2592 const char kTestData[] = {'x'};
2593 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
2594 uint32_t num_bytes = kTestDataSize;
2595 EXPECT_EQ(MOJO_RESULT_OK,
2596 this->ProducerWriteData(UserPointer<const void>(kTestData),
2597 MakeUserPointer(&num_bytes), false));
2598 EXPECT_EQ(kTestDataSize, num_bytes);
2599
2600 // Wait for the read threshold signal.
2601 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2602 hss = HandleSignalsState();
2603 this->ConsumerRemoveAwakable(&waiter, &hss);
2604 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2605 hss.satisfied_signals);
2606 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2607 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2608 hss.satisfiable_signals);
2609
2610 // Set the read threshold to 1.
2611 this->ConsumerSetOptions(1);
2612 read_threshold_num_bytes = 123u;
2613 this->ConsumerGetOptions(&read_threshold_num_bytes);
2614 EXPECT_EQ(1u, read_threshold_num_bytes);
2615
2616 // Try to add a waiter: it should (still) already have the read threshold
2617 // signal.
2618 waiter.Init();
2619 hss = HandleSignalsState();
2620 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
2621 this->ConsumerAddAwakable(
2622 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss));
2623 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2624 hss.satisfied_signals);
2625 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2626 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2627 hss.satisfiable_signals);
2628
2629 // Set the read threshold to 2.
2630 this->ConsumerSetOptions(2);
2631 read_threshold_num_bytes = 123u;
2632 this->ConsumerGetOptions(&read_threshold_num_bytes);
2633 EXPECT_EQ(2u, read_threshold_num_bytes);
2634
2635 // Add a waiter.
2636 waiter.Init();
2637 ASSERT_EQ(MOJO_RESULT_OK,
2638 this->ConsumerAddAwakable(
2639 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
2640
2641 // Write another byte.
2642 num_bytes = kTestDataSize;
2643 EXPECT_EQ(MOJO_RESULT_OK,
2644 this->ProducerWriteData(UserPointer<const void>(kTestData),
2645 MakeUserPointer(&num_bytes), false));
2646 EXPECT_EQ(kTestDataSize, num_bytes);
2647
2648 // Wait for the read threshold signal.
2649 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2650 hss = HandleSignalsState();
2651 this->ConsumerRemoveAwakable(&waiter, &hss);
2652 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2653 hss.satisfied_signals);
2654 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2655 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2656 hss.satisfiable_signals);
2657
2658 // Read one byte.
2659 char read_byte = 'a';
2660 num_bytes = sizeof(read_byte);
2661 EXPECT_EQ(MOJO_RESULT_OK,
2662 this->ConsumerReadData(UserPointer<void>(&read_byte),
2663 MakeUserPointer(&num_bytes), true, false));
2664 EXPECT_EQ(1u, num_bytes);
2665 EXPECT_EQ(kTestData[0], read_byte);
2666
2667 // Add a waiter.
2668 waiter.Init();
2669 ASSERT_EQ(MOJO_RESULT_OK,
2670 this->ConsumerAddAwakable(
2671 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
2672 // Trivial wait: it shouldn't have the read threshold signal.
2673 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2674 hss = HandleSignalsState();
2675 this->ConsumerRemoveAwakable(&waiter, &hss);
2676 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
2677 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2678 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2679 hss.satisfiable_signals);
2680
2681 // Add a waiter.
2682 waiter.Init();
2683 ASSERT_EQ(MOJO_RESULT_OK,
2684 this->ConsumerAddAwakable(
2685 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
2686 // Trivial wait: it shouldn't have the read threshold signal.
2687 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2688 hss = HandleSignalsState();
2689 this->ConsumerRemoveAwakable(&waiter, &hss);
2690 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
2691 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2692 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2693 hss.satisfiable_signals);
2694
2695 // Add a waiter.
2696 waiter.Init();
2697 ASSERT_EQ(MOJO_RESULT_OK,
2698 this->ConsumerAddAwakable(
2699 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
2700
2701 // Close the producer.
2702 this->ProducerClose();
2703
2704 // Wait; the current read threshold becomes never satisfiable.
2705 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2706 waiter.Wait(test::TinyTimeout(), nullptr));
2707 hss = HandleSignalsState();
2708 this->ConsumerRemoveAwakable(&waiter, &hss);
2709 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2710 hss.satisfied_signals);
2711 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2712 hss.satisfiable_signals);
2713
2714 // Set the read threshold back to zero to 0.
2715 this->ConsumerSetOptions(0);
2716 read_threshold_num_bytes = 123u;
2717 this->ConsumerGetOptions(&read_threshold_num_bytes);
2718 // "Get options" should preserve 0 (and not set it to the element size).
2719 EXPECT_EQ(0u, read_threshold_num_bytes);
2720
2721 // Try to add a waiter: it should have the read threshold signal.
2722 waiter.Init();
2723 hss = HandleSignalsState();
2724 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
2725 this->ConsumerAddAwakable(
2726 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss));
2727 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2728 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2729 hss.satisfied_signals);
2730 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2731 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2732 hss.satisfiable_signals);
2733
2734 // Read the other byte.
2735 read_byte = 'a';
2736 num_bytes = sizeof(read_byte);
2737 EXPECT_EQ(MOJO_RESULT_OK,
2738 this->ConsumerReadData(UserPointer<void>(&read_byte),
2739 MakeUserPointer(&num_bytes), true, false));
2740 EXPECT_EQ(1u, num_bytes);
2741 EXPECT_EQ(kTestData[0], read_byte);
2742
2743 // Try to add a waiter: the read threshold signal should be unsatisfiable.
2744 waiter.Init();
2745 hss = HandleSignalsState();
2746 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2747 this->ConsumerAddAwakable(
2748 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss));
2749 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2750 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2751
2752 this->ConsumerClose();
2753 }
2754
2503 } // namespace 2755 } // namespace
2504 } // namespace system 2756 } // namespace system
2505 } // namespace mojo 2757 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_impl.h ('k') | mojo/edk/system/dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698