Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(39)

Side by Side Diff: mojo/edk/system/data_pipe_impl_unittest.cc

Issue 2068483002: Add an unconditional version of DispatcherAddAwakable(): AddAwakableUnconditional(). (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: oops Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // This file contains tests that are shared between different implementations of 5 // This file contains tests that are shared between different implementations of
6 // |DataPipeImpl|. 6 // |DataPipeImpl|.
7 7
8 #include "mojo/edk/system/data_pipe_impl.h" 8 #include "mojo/edk/system/data_pipe_impl.h"
9 9
10 #include <stdint.h> 10 #include <stdint.h>
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
134 } 134 }
135 MojoResult ProducerBeginWriteData(UserPointer<void*> buffer, 135 MojoResult ProducerBeginWriteData(UserPointer<void*> buffer,
136 UserPointer<uint32_t> buffer_num_bytes) { 136 UserPointer<uint32_t> buffer_num_bytes) {
137 return dpp()->ProducerBeginWriteData(buffer, buffer_num_bytes); 137 return dpp()->ProducerBeginWriteData(buffer, buffer_num_bytes);
138 } 138 }
139 MojoResult ProducerEndWriteData(uint32_t num_bytes_written) { 139 MojoResult ProducerEndWriteData(uint32_t num_bytes_written) {
140 return dpp()->ProducerEndWriteData(num_bytes_written); 140 return dpp()->ProducerEndWriteData(num_bytes_written);
141 } 141 }
142 MojoResult ProducerAddAwakable(Awakable* awakable, 142 MojoResult ProducerAddAwakable(Awakable* awakable,
143 MojoHandleSignals signals, 143 MojoHandleSignals signals,
144 bool force,
144 uint64_t context, 145 uint64_t context,
145 HandleSignalsState* signals_state) { 146 HandleSignalsState* signals_state) {
146 return dpp()->ProducerAddAwakable(awakable, signals, context, 147 return dpp()->ProducerAddAwakable(awakable, signals, force, context,
147 signals_state); 148 signals_state);
148 } 149 }
149 void ProducerRemoveAwakable(Awakable* awakable, 150 void ProducerRemoveAwakable(Awakable* awakable,
150 HandleSignalsState* signals_state) { 151 HandleSignalsState* signals_state) {
151 return dpp()->ProducerRemoveAwakable(awakable, signals_state); 152 return dpp()->ProducerRemoveAwakable(awakable, signals_state);
152 } 153 }
153 154
154 void ConsumerClose() { helper_->ConsumerClose(); } 155 void ConsumerClose() { helper_->ConsumerClose(); }
155 MojoResult ConsumerSetOptions(uint32_t read_threshold_num_bytes) { 156 MojoResult ConsumerSetOptions(uint32_t read_threshold_num_bytes) {
156 return dpc()->ConsumerSetOptions(read_threshold_num_bytes); 157 return dpc()->ConsumerSetOptions(read_threshold_num_bytes);
(...skipping 16 matching lines...) Expand all
173 } 174 }
174 MojoResult ConsumerBeginReadData(UserPointer<const void*> buffer, 175 MojoResult ConsumerBeginReadData(UserPointer<const void*> buffer,
175 UserPointer<uint32_t> buffer_num_bytes) { 176 UserPointer<uint32_t> buffer_num_bytes) {
176 return dpc()->ConsumerBeginReadData(buffer, buffer_num_bytes); 177 return dpc()->ConsumerBeginReadData(buffer, buffer_num_bytes);
177 } 178 }
178 MojoResult ConsumerEndReadData(uint32_t num_bytes_read) { 179 MojoResult ConsumerEndReadData(uint32_t num_bytes_read) {
179 return dpc()->ConsumerEndReadData(num_bytes_read); 180 return dpc()->ConsumerEndReadData(num_bytes_read);
180 } 181 }
181 MojoResult ConsumerAddAwakable(Awakable* awakable, 182 MojoResult ConsumerAddAwakable(Awakable* awakable,
182 MojoHandleSignals signals, 183 MojoHandleSignals signals,
184 bool force,
183 uint64_t context, 185 uint64_t context,
184 HandleSignalsState* signals_state) { 186 HandleSignalsState* signals_state) {
185 return dpc()->ConsumerAddAwakable(awakable, signals, context, 187 return dpc()->ConsumerAddAwakable(awakable, signals, force, context,
186 signals_state); 188 signals_state);
187 } 189 }
188 void ConsumerRemoveAwakable(Awakable* awakable, 190 void ConsumerRemoveAwakable(Awakable* awakable,
189 HandleSignalsState* signals_state) { 191 HandleSignalsState* signals_state) {
190 return dpc()->ConsumerRemoveAwakable(awakable, signals_state); 192 return dpc()->ConsumerRemoveAwakable(awakable, signals_state);
191 } 193 }
192 194
193 private: 195 private:
194 DataPipe* dpp() { return helper_->DataPipeForProducer(); } 196 DataPipe* dpp() { return helper_->DataPipeForProducer(); }
195 DataPipe* dpc() { return helper_->DataPipeForConsumer(); } 197 DataPipe* dpc() { return helper_->DataPipeForConsumer(); }
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
271 const Handle& handle_to_send, 273 const Handle& handle_to_send,
272 Handle* handle_to_receive) { 274 Handle* handle_to_receive) {
273 DCHECK(source_i == 0 || source_i == 1); 275 DCHECK(source_i == 0 || source_i == 1);
274 size_t dest_i = source_i ^ 1; 276 size_t dest_i = source_i ^ 1;
275 277
276 // Write the dispatcher to MP |source_i| (port 0). Wait and receive on MP 278 // Write the dispatcher to MP |source_i| (port 0). Wait and receive on MP
277 // |dest_i| (port 0). (Add the waiter first, to avoid any handling the case 279 // |dest_i| (port 0). (Add the waiter first, to avoid any handling the case
278 // where it's already readable.) 280 // where it's already readable.)
279 Waiter waiter; 281 Waiter waiter;
280 waiter.Init(); 282 waiter.Init();
281 ASSERT_EQ(MOJO_RESULT_OK, 283 ASSERT_EQ(MOJO_RESULT_OK, message_pipe(dest_i)->AddAwakable(
282 message_pipe(dest_i)->AddAwakable( 284 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE,
283 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 987, nullptr)); 285 false, 987, nullptr));
284 { 286 {
285 HandleTransport transport(test::HandleTryStartTransport(handle_to_send)); 287 HandleTransport transport(test::HandleTryStartTransport(handle_to_send));
286 ASSERT_TRUE(transport.is_valid()); 288 ASSERT_TRUE(transport.is_valid());
287 289
288 std::vector<HandleTransport> transports; 290 std::vector<HandleTransport> transports;
289 transports.push_back(transport); 291 transports.push_back(transport);
290 ASSERT_EQ(MOJO_RESULT_OK, message_pipe(source_i)->WriteMessage( 292 ASSERT_EQ(MOJO_RESULT_OK, message_pipe(source_i)->WriteMessage(
291 0, NullUserPointer(), 0, &transports, 293 0, NullUserPointer(), 0, &transports,
292 MOJO_WRITE_MESSAGE_FLAG_NONE)); 294 MOJO_WRITE_MESSAGE_FLAG_NONE));
293 transport.End(); 295 transport.End();
(...skipping 369 matching lines...) Expand 10 before | Expand all | Expand 10 after
663 665
664 // Read with invalid |num_bytes|. 666 // Read with invalid |num_bytes|.
665 num_bytes = sizeof(elements[0]) + 1; 667 num_bytes = sizeof(elements[0]) + 1;
666 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, 668 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
667 this->ConsumerReadData(UserPointer<void>(elements), 669 this->ConsumerReadData(UserPointer<void>(elements),
668 MakeUserPointer(&num_bytes), false, false)); 670 MakeUserPointer(&num_bytes), false, false));
669 671
670 // For remote data pipes, we'll have to wait; add the waiter before writing. 672 // For remote data pipes, we'll have to wait; add the waiter before writing.
671 waiter.Init(); 673 waiter.Init();
672 ASSERT_EQ(MOJO_RESULT_OK, 674 ASSERT_EQ(MOJO_RESULT_OK,
673 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, 675 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
674 nullptr)); 676 false, 123, nullptr));
675 677
676 // Write two elements. 678 // Write two elements.
677 elements[0] = 123; 679 elements[0] = 123;
678 elements[1] = 456; 680 elements[1] = 456;
679 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 681 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
680 EXPECT_EQ(MOJO_RESULT_OK, 682 EXPECT_EQ(MOJO_RESULT_OK,
681 this->ProducerWriteData(UserPointer<const void>(elements), 683 this->ProducerWriteData(UserPointer<const void>(elements),
682 MakeUserPointer(&num_bytes), false)); 684 MakeUserPointer(&num_bytes), false));
683 // It should have written everything (even without "all or none"). 685 // It should have written everything (even without "all or none").
684 EXPECT_EQ(2u * sizeof(elements[0]), num_bytes); 686 EXPECT_EQ(2u * sizeof(elements[0]), num_bytes);
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
791 793
792 Waiter pwaiter; // For producer. 794 Waiter pwaiter; // For producer.
793 Waiter cwaiter; // For consumer. 795 Waiter cwaiter; // For consumer.
794 HandleSignalsState hss; 796 HandleSignalsState hss;
795 uint64_t context; 797 uint64_t context;
796 798
797 // Never readable. 799 // Never readable.
798 pwaiter.Init(); 800 pwaiter.Init();
799 hss = HandleSignalsState(); 801 hss = HandleSignalsState();
800 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 802 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
801 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 12, 803 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_READABLE,
802 &hss)); 804 false, 12, &hss));
803 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 805 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
804 hss.satisfied_signals); 806 hss.satisfied_signals);
805 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 807 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
806 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 808 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
807 hss.satisfiable_signals); 809 hss.satisfiable_signals);
808 810
809 // Already writable. 811 // Already writable.
810 pwaiter.Init(); 812 pwaiter.Init();
811 hss = HandleSignalsState(); 813 hss = HandleSignalsState();
812 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 814 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
813 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 34, 815 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
814 &hss)); 816 false, 34, &hss));
815 817
816 // We'll need to wait for readability for the remote cases. 818 // We'll need to wait for readability for the remote cases.
817 cwaiter.Init(); 819 cwaiter.Init();
818 ASSERT_EQ(MOJO_RESULT_OK, 820 ASSERT_EQ(MOJO_RESULT_OK,
819 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 821 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE,
820 1234, nullptr)); 822 false, 1234, nullptr));
821 823
822 // Write two elements. 824 // Write two elements.
823 int32_t elements[2] = {123, 456}; 825 int32_t elements[2] = {123, 456};
824 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 826 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
825 EXPECT_EQ(MOJO_RESULT_OK, 827 EXPECT_EQ(MOJO_RESULT_OK,
826 this->ProducerWriteData(UserPointer<const void>(elements), 828 this->ProducerWriteData(UserPointer<const void>(elements),
827 MakeUserPointer(&num_bytes), true)); 829 MakeUserPointer(&num_bytes), true));
828 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 830 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
829 831
830 // Adding a waiter should now succeed. 832 // Adding a waiter should now succeed.
831 pwaiter.Init(); 833 pwaiter.Init();
832 ASSERT_EQ(MOJO_RESULT_OK, 834 ASSERT_EQ(MOJO_RESULT_OK,
833 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, 835 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
834 nullptr)); 836 false, 56, nullptr));
835 // And it shouldn't be writable yet. 837 // And it shouldn't be writable yet.
836 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr)); 838 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
837 hss = HandleSignalsState(); 839 hss = HandleSignalsState();
838 this->ProducerRemoveAwakable(&pwaiter, &hss); 840 this->ProducerRemoveAwakable(&pwaiter, &hss);
839 EXPECT_EQ(0u, hss.satisfied_signals); 841 EXPECT_EQ(0u, hss.satisfied_signals);
840 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 842 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
841 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 843 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
842 hss.satisfiable_signals); 844 hss.satisfiable_signals);
843 845
844 // Wait for data to become available to the consumer. 846 // Wait for data to become available to the consumer.
(...skipping 15 matching lines...) Expand all
860 EXPECT_EQ(MOJO_RESULT_OK, 862 EXPECT_EQ(MOJO_RESULT_OK,
861 this->ConsumerReadData(UserPointer<void>(elements), 863 this->ConsumerReadData(UserPointer<void>(elements),
862 MakeUserPointer(&num_bytes), true, true)); 864 MakeUserPointer(&num_bytes), true, true));
863 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 865 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
864 EXPECT_EQ(123, elements[0]); 866 EXPECT_EQ(123, elements[0]);
865 EXPECT_EQ(-1, elements[1]); 867 EXPECT_EQ(-1, elements[1]);
866 868
867 // Add a waiter. 869 // Add a waiter.
868 pwaiter.Init(); 870 pwaiter.Init();
869 ASSERT_EQ(MOJO_RESULT_OK, 871 ASSERT_EQ(MOJO_RESULT_OK,
870 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, 872 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
871 nullptr)); 873 false, 56, nullptr));
872 // And it still shouldn't be writable yet. 874 // And it still shouldn't be writable yet.
873 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr)); 875 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
874 hss = HandleSignalsState(); 876 hss = HandleSignalsState();
875 this->ProducerRemoveAwakable(&pwaiter, &hss); 877 this->ProducerRemoveAwakable(&pwaiter, &hss);
876 EXPECT_EQ(0u, hss.satisfied_signals); 878 EXPECT_EQ(0u, hss.satisfied_signals);
877 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 879 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
878 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 880 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
879 hss.satisfiable_signals); 881 hss.satisfiable_signals);
880 882
881 // Do it again. 883 // Do it again.
882 pwaiter.Init(); 884 pwaiter.Init();
883 ASSERT_EQ(MOJO_RESULT_OK, 885 ASSERT_EQ(MOJO_RESULT_OK,
884 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 78, 886 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
885 nullptr)); 887 false, 78, nullptr));
886 888
887 // Read one element. 889 // Read one element.
888 elements[0] = -1; 890 elements[0] = -1;
889 elements[1] = -1; 891 elements[1] = -1;
890 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 892 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
891 EXPECT_EQ(MOJO_RESULT_OK, 893 EXPECT_EQ(MOJO_RESULT_OK,
892 this->ConsumerReadData(UserPointer<void>(elements), 894 this->ConsumerReadData(UserPointer<void>(elements),
893 MakeUserPointer(&num_bytes), true, false)); 895 MakeUserPointer(&num_bytes), true, false));
894 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 896 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
895 EXPECT_EQ(123, elements[0]); 897 EXPECT_EQ(123, elements[0]);
(...skipping 20 matching lines...) Expand all
916 EXPECT_TRUE(buffer); 918 EXPECT_TRUE(buffer);
917 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 919 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
918 920
919 static_cast<int32_t*>(buffer)[0] = 789; 921 static_cast<int32_t*>(buffer)[0] = 789;
920 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>( 922 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>(
921 1u * sizeof(elements[0])))); 923 1u * sizeof(elements[0]))));
922 924
923 // Add a waiter. 925 // Add a waiter.
924 pwaiter.Init(); 926 pwaiter.Init();
925 ASSERT_EQ(MOJO_RESULT_OK, 927 ASSERT_EQ(MOJO_RESULT_OK,
926 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 90, 928 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
927 nullptr)); 929 false, 90, nullptr));
928 930
929 // Read one element, using a two-phase read. 931 // Read one element, using a two-phase read.
930 const void* read_buffer = nullptr; 932 const void* read_buffer = nullptr;
931 num_bytes = 0u; 933 num_bytes = 0u;
932 EXPECT_EQ(MOJO_RESULT_OK, 934 EXPECT_EQ(MOJO_RESULT_OK,
933 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer), 935 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
934 MakeUserPointer(&num_bytes))); 936 MakeUserPointer(&num_bytes)));
935 EXPECT_TRUE(read_buffer); 937 EXPECT_TRUE(read_buffer);
936 // Since we only read one element (after having written three in all), the 938 // Since we only read one element (after having written three in all), the
937 // two-phase read should only allow us to read one. This checks an 939 // two-phase read should only allow us to read one. This checks an
(...skipping 19 matching lines...) Expand all
957 elements[0] = 123; 959 elements[0] = 123;
958 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 960 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
959 EXPECT_EQ(MOJO_RESULT_OK, 961 EXPECT_EQ(MOJO_RESULT_OK,
960 this->ProducerWriteData(UserPointer<const void>(elements), 962 this->ProducerWriteData(UserPointer<const void>(elements),
961 MakeUserPointer(&num_bytes), false)); 963 MakeUserPointer(&num_bytes), false));
962 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 964 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
963 965
964 // Add a waiter. 966 // Add a waiter.
965 pwaiter.Init(); 967 pwaiter.Init();
966 ASSERT_EQ(MOJO_RESULT_OK, 968 ASSERT_EQ(MOJO_RESULT_OK,
967 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, 969 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
968 nullptr)); 970 false, 12, nullptr));
969 971
970 // Close the consumer. 972 // Close the consumer.
971 this->ConsumerClose(); 973 this->ConsumerClose();
972 974
973 // It should now be never-writable. 975 // It should now be never-writable.
974 context = 0; 976 context = 0;
975 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 977 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
976 pwaiter.Wait(test::TinyTimeout(), &context)); 978 pwaiter.Wait(test::TinyTimeout(), &context));
977 EXPECT_EQ(12u, context); 979 EXPECT_EQ(12u, context);
978 hss = HandleSignalsState(); 980 hss = HandleSignalsState();
(...skipping 15 matching lines...) Expand all
994 this->DoTransfer(); 996 this->DoTransfer();
995 997
996 Waiter waiter; 998 Waiter waiter;
997 HandleSignalsState hss; 999 HandleSignalsState hss;
998 uint64_t context; 1000 uint64_t context;
999 1001
1000 // Add a waiter. 1002 // Add a waiter.
1001 waiter.Init(); 1003 waiter.Init();
1002 ASSERT_EQ(MOJO_RESULT_OK, 1004 ASSERT_EQ(MOJO_RESULT_OK,
1003 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1005 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1004 12, nullptr)); 1006 false, 12, nullptr));
1005 1007
1006 // Close the consumer. 1008 // Close the consumer.
1007 this->ConsumerClose(); 1009 this->ConsumerClose();
1008 1010
1009 // It should be signaled. 1011 // It should be signaled.
1010 context = 0; 1012 context = 0;
1011 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context)); 1013 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context));
1012 EXPECT_EQ(12u, context); 1014 EXPECT_EQ(12u, context);
1013 hss = HandleSignalsState(); 1015 hss = HandleSignalsState();
1014 this->ProducerRemoveAwakable(&waiter, &hss); 1016 this->ProducerRemoveAwakable(&waiter, &hss);
(...skipping 14 matching lines...) Expand all
1029 this->DoTransfer(); 1031 this->DoTransfer();
1030 1032
1031 Waiter waiter; 1033 Waiter waiter;
1032 HandleSignalsState hss; 1034 HandleSignalsState hss;
1033 uint64_t context; 1035 uint64_t context;
1034 1036
1035 // Add a waiter. 1037 // Add a waiter.
1036 waiter.Init(); 1038 waiter.Init();
1037 ASSERT_EQ(MOJO_RESULT_OK, 1039 ASSERT_EQ(MOJO_RESULT_OK,
1038 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1040 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1039 12, nullptr)); 1041 false, 12, nullptr));
1040 1042
1041 // Close the producer. 1043 // Close the producer.
1042 this->ProducerClose(); 1044 this->ProducerClose();
1043 1045
1044 // It should be signaled. 1046 // It should be signaled.
1045 context = 0; 1047 context = 0;
1046 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context)); 1048 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context));
1047 EXPECT_EQ(12u, context); 1049 EXPECT_EQ(12u, context);
1048 hss = HandleSignalsState(); 1050 hss = HandleSignalsState();
1049 this->ConsumerRemoveAwakable(&waiter, &hss); 1051 this->ConsumerRemoveAwakable(&waiter, &hss);
(...skipping 15 matching lines...) Expand all
1065 1067
1066 Waiter waiter; 1068 Waiter waiter;
1067 Waiter waiter2; 1069 Waiter waiter2;
1068 HandleSignalsState hss; 1070 HandleSignalsState hss;
1069 uint64_t context; 1071 uint64_t context;
1070 1072
1071 // Never writable. 1073 // Never writable.
1072 waiter.Init(); 1074 waiter.Init();
1073 hss = HandleSignalsState(); 1075 hss = HandleSignalsState();
1074 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1076 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1075 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, 1077 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE,
1076 &hss)); 1078 false, 12, &hss));
1077 EXPECT_EQ(0u, hss.satisfied_signals); 1079 EXPECT_EQ(0u, hss.satisfied_signals);
1078 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1080 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1079 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1081 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1080 hss.satisfiable_signals); 1082 hss.satisfiable_signals);
1081 1083
1082 // Add waiter: not yet readable. 1084 // Add waiter: not yet readable.
1083 waiter.Init(); 1085 waiter.Init();
1084 ASSERT_EQ(MOJO_RESULT_OK, 1086 ASSERT_EQ(MOJO_RESULT_OK,
1085 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, 1087 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1086 nullptr)); 1088 false, 34, nullptr));
1087 1089
1088 // Write two elements. 1090 // Write two elements.
1089 int32_t elements[2] = {123, 456}; 1091 int32_t elements[2] = {123, 456};
1090 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 1092 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
1091 EXPECT_EQ(MOJO_RESULT_OK, 1093 EXPECT_EQ(MOJO_RESULT_OK,
1092 this->ProducerWriteData(UserPointer<const void>(elements), 1094 this->ProducerWriteData(UserPointer<const void>(elements),
1093 MakeUserPointer(&num_bytes), true)); 1095 MakeUserPointer(&num_bytes), true));
1094 1096
1095 // Wait for readability (needed for remote cases). 1097 // Wait for readability (needed for remote cases).
1096 context = 0; 1098 context = 0;
(...skipping 10 matching lines...) Expand all
1107 // Discard one element. 1109 // Discard one element.
1108 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1110 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1109 EXPECT_EQ(MOJO_RESULT_OK, 1111 EXPECT_EQ(MOJO_RESULT_OK,
1110 this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true)); 1112 this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1111 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1113 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1112 1114
1113 // Should still be readable. 1115 // Should still be readable.
1114 waiter.Init(); 1116 waiter.Init();
1115 hss = HandleSignalsState(); 1117 hss = HandleSignalsState();
1116 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1118 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1117 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, 1119 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1118 &hss)); 1120 false, 78, &hss));
1119 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1121 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1120 hss.satisfied_signals); 1122 hss.satisfied_signals);
1121 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1123 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1122 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1124 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1123 hss.satisfiable_signals); 1125 hss.satisfiable_signals);
1124 1126
1125 // Peek one element. 1127 // Peek one element.
1126 elements[0] = -1; 1128 elements[0] = -1;
1127 elements[1] = -1; 1129 elements[1] = -1;
1128 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1130 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1129 EXPECT_EQ(MOJO_RESULT_OK, 1131 EXPECT_EQ(MOJO_RESULT_OK,
1130 this->ConsumerReadData(UserPointer<void>(elements), 1132 this->ConsumerReadData(UserPointer<void>(elements),
1131 MakeUserPointer(&num_bytes), true, true)); 1133 MakeUserPointer(&num_bytes), true, true));
1132 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1134 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1133 EXPECT_EQ(456, elements[0]); 1135 EXPECT_EQ(456, elements[0]);
1134 EXPECT_EQ(-1, elements[1]); 1136 EXPECT_EQ(-1, elements[1]);
1135 1137
1136 // Should still be readable. 1138 // Should still be readable.
1137 waiter.Init(); 1139 waiter.Init();
1138 hss = HandleSignalsState(); 1140 hss = HandleSignalsState();
1139 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1141 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1140 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, 1142 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1141 &hss)); 1143 false, 78, &hss));
1142 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1144 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1143 hss.satisfied_signals); 1145 hss.satisfied_signals);
1144 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1146 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1145 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1147 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1146 hss.satisfiable_signals); 1148 hss.satisfiable_signals);
1147 1149
1148 // Read one element. 1150 // Read one element.
1149 elements[0] = -1; 1151 elements[0] = -1;
1150 elements[1] = -1; 1152 elements[1] = -1;
1151 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1153 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1152 EXPECT_EQ(MOJO_RESULT_OK, 1154 EXPECT_EQ(MOJO_RESULT_OK,
1153 this->ConsumerReadData(UserPointer<void>(elements), 1155 this->ConsumerReadData(UserPointer<void>(elements),
1154 MakeUserPointer(&num_bytes), true, false)); 1156 MakeUserPointer(&num_bytes), true, false));
1155 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1157 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1156 EXPECT_EQ(456, elements[0]); 1158 EXPECT_EQ(456, elements[0]);
1157 EXPECT_EQ(-1, elements[1]); 1159 EXPECT_EQ(-1, elements[1]);
1158 1160
1159 // Adding a waiter should now succeed. 1161 // Adding a waiter should now succeed.
1160 waiter.Init(); 1162 waiter.Init();
1161 ASSERT_EQ(MOJO_RESULT_OK, 1163 ASSERT_EQ(MOJO_RESULT_OK,
1162 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 90, 1164 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1163 nullptr)); 1165 false, 90, nullptr));
1164 1166
1165 // Write one element. 1167 // Write one element.
1166 elements[0] = 789; 1168 elements[0] = 789;
1167 elements[1] = -1; 1169 elements[1] = -1;
1168 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1170 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1169 EXPECT_EQ(MOJO_RESULT_OK, 1171 EXPECT_EQ(MOJO_RESULT_OK,
1170 this->ProducerWriteData(UserPointer<const void>(elements), 1172 this->ProducerWriteData(UserPointer<const void>(elements),
1171 MakeUserPointer(&num_bytes), true)); 1173 MakeUserPointer(&num_bytes), true));
1172 1174
1173 // Waiting should now succeed. 1175 // Waiting should now succeed.
1174 context = 0; 1176 context = 0;
1175 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context)); 1177 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context));
1176 EXPECT_EQ(90u, context); 1178 EXPECT_EQ(90u, context);
1177 hss = HandleSignalsState(); 1179 hss = HandleSignalsState();
1178 this->ConsumerRemoveAwakable(&waiter, &hss); 1180 this->ConsumerRemoveAwakable(&waiter, &hss);
1179 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1181 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1180 hss.satisfied_signals); 1182 hss.satisfied_signals);
1181 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1183 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1182 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1184 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1183 hss.satisfiable_signals); 1185 hss.satisfiable_signals);
1184 1186
1185 // We'll want to wait for the peer closed signal to propagate. 1187 // We'll want to wait for the peer closed signal to propagate.
1186 waiter.Init(); 1188 waiter.Init();
1187 EXPECT_EQ(MOJO_RESULT_OK, 1189 EXPECT_EQ(MOJO_RESULT_OK,
1188 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1190 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1189 12, nullptr)); 1191 false, 12, nullptr));
1190 1192
1191 // Close the producer. 1193 // Close the producer.
1192 this->ProducerClose(); 1194 this->ProducerClose();
1193 1195
1194 // Should still be readable, even if the peer closed signal hasn't propagated 1196 // Should still be readable, even if the peer closed signal hasn't propagated
1195 // yet. 1197 // yet.
1196 waiter2.Init(); 1198 waiter2.Init();
1197 hss = HandleSignalsState(); 1199 hss = HandleSignalsState();
1198 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1200 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1199 this->ConsumerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_READABLE, 34, 1201 this->ConsumerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_READABLE,
1200 &hss)); 1202 false, 34, &hss));
1201 // We don't know if the peer closed signal has propagated yet (for the remote 1203 // We don't know if the peer closed signal has propagated yet (for the remote
1202 // cases). 1204 // cases).
1203 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE)); 1205 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
1204 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1206 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1205 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1207 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1206 hss.satisfiable_signals); 1208 hss.satisfiable_signals);
1207 1209
1208 // Wait for the peer closed signal. 1210 // Wait for the peer closed signal.
1209 context = 0; 1211 context = 0;
1210 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context)); 1212 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), &context));
(...skipping 15 matching lines...) Expand all
1226 this->ConsumerReadData(UserPointer<void>(elements), 1228 this->ConsumerReadData(UserPointer<void>(elements),
1227 MakeUserPointer(&num_bytes), true, false)); 1229 MakeUserPointer(&num_bytes), true, false));
1228 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1230 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1229 EXPECT_EQ(789, elements[0]); 1231 EXPECT_EQ(789, elements[0]);
1230 EXPECT_EQ(-1, elements[1]); 1232 EXPECT_EQ(-1, elements[1]);
1231 1233
1232 // Should be never-readable. 1234 // Should be never-readable.
1233 waiter.Init(); 1235 waiter.Init();
1234 hss = HandleSignalsState(); 1236 hss = HandleSignalsState();
1235 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1237 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1236 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, 1238 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1237 &hss)); 1239 false, 56, &hss));
1238 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1240 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1239 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 1241 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1240 1242
1241 this->ConsumerClose(); 1243 this->ConsumerClose();
1242 } 1244 }
1243 1245
1244 // Test with two-phase APIs and also closing the producer with an active 1246 // Test with two-phase APIs and also closing the producer with an active
1245 // consumer waiter. 1247 // consumer waiter.
1246 TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) { 1248 TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
1247 const MojoCreateDataPipeOptions options = { 1249 const MojoCreateDataPipeOptions options = {
1248 kSizeOfOptions, // |struct_size|. 1250 kSizeOfOptions, // |struct_size|.
1249 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1251 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1250 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1252 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1251 1000 * sizeof(int32_t) // |capacity_num_bytes|. 1253 1000 * sizeof(int32_t) // |capacity_num_bytes|.
1252 }; 1254 };
1253 this->Create(options); 1255 this->Create(options);
1254 this->DoTransfer(); 1256 this->DoTransfer();
1255 1257
1256 Waiter waiter; 1258 Waiter waiter;
1257 HandleSignalsState hss; 1259 HandleSignalsState hss;
1258 uint64_t context; 1260 uint64_t context;
1259 1261
1260 // Add waiter: not yet readable. 1262 // Add waiter: not yet readable.
1261 waiter.Init(); 1263 waiter.Init();
1262 ASSERT_EQ(MOJO_RESULT_OK, 1264 ASSERT_EQ(MOJO_RESULT_OK,
1263 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 12, 1265 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1264 nullptr)); 1266 false, 12, nullptr));
1265 1267
1266 // Write two elements. 1268 // Write two elements.
1267 int32_t* elements = nullptr; 1269 int32_t* elements = nullptr;
1268 void* buffer = nullptr; 1270 void* buffer = nullptr;
1269 uint32_t num_bytes = 0u; 1271 uint32_t num_bytes = 0u;
1270 EXPECT_EQ(MOJO_RESULT_OK, 1272 EXPECT_EQ(MOJO_RESULT_OK,
1271 this->ProducerBeginWriteData(MakeUserPointer(&buffer), 1273 this->ProducerBeginWriteData(MakeUserPointer(&buffer),
1272 MakeUserPointer(&num_bytes))); 1274 MakeUserPointer(&num_bytes)));
1273 EXPECT_TRUE(buffer); 1275 EXPECT_TRUE(buffer);
1274 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0]))); 1276 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
(...skipping 25 matching lines...) Expand all
1300 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 1302 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
1301 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer); 1303 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
1302 EXPECT_EQ(123, read_elements[0]); 1304 EXPECT_EQ(123, read_elements[0]);
1303 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>( 1305 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
1304 1u * sizeof(elements[0])))); 1306 1u * sizeof(elements[0]))));
1305 1307
1306 // Should still be readable. 1308 // Should still be readable.
1307 waiter.Init(); 1309 waiter.Init();
1308 hss = HandleSignalsState(); 1310 hss = HandleSignalsState();
1309 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1311 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1310 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, 1312 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1311 &hss)); 1313 false, 34, &hss));
1312 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1314 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1313 hss.satisfied_signals); 1315 hss.satisfied_signals);
1314 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1316 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1315 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1317 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1316 hss.satisfiable_signals); 1318 hss.satisfiable_signals);
1317 1319
1318 // Read one element. 1320 // Read one element.
1319 // Request three, but not in all-or-none mode. 1321 // Request three, but not in all-or-none mode.
1320 read_buffer = nullptr; 1322 read_buffer = nullptr;
1321 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 1323 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
1322 EXPECT_EQ(MOJO_RESULT_OK, 1324 EXPECT_EQ(MOJO_RESULT_OK,
1323 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer), 1325 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
1324 MakeUserPointer(&num_bytes))); 1326 MakeUserPointer(&num_bytes)));
1325 EXPECT_TRUE(read_buffer); 1327 EXPECT_TRUE(read_buffer);
1326 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1328 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1327 read_elements = static_cast<const int32_t*>(read_buffer); 1329 read_elements = static_cast<const int32_t*>(read_buffer);
1328 EXPECT_EQ(456, read_elements[0]); 1330 EXPECT_EQ(456, read_elements[0]);
1329 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>( 1331 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
1330 1u * sizeof(elements[0])))); 1332 1u * sizeof(elements[0]))));
1331 1333
1332 // Adding a waiter should now succeed. 1334 // Adding a waiter should now succeed.
1333 waiter.Init(); 1335 waiter.Init();
1334 ASSERT_EQ(MOJO_RESULT_OK, 1336 ASSERT_EQ(MOJO_RESULT_OK,
1335 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, 1337 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1336 nullptr)); 1338 false, 56, nullptr));
1337 1339
1338 // Close the producer. 1340 // Close the producer.
1339 this->ProducerClose(); 1341 this->ProducerClose();
1340 1342
1341 // Should be never-readable. 1343 // Should be never-readable.
1342 context = 0; 1344 context = 0;
1343 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1345 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1344 waiter.Wait(test::TinyTimeout(), &context)); 1346 waiter.Wait(test::TinyTimeout(), &context));
1345 EXPECT_EQ(56u, context); 1347 EXPECT_EQ(56u, context);
1346 hss = HandleSignalsState(); 1348 hss = HandleSignalsState();
(...skipping 16 matching lines...) Expand all
1363 this->DoTransfer(); 1365 this->DoTransfer();
1364 1366
1365 Waiter pwaiter; // For producer. 1367 Waiter pwaiter; // For producer.
1366 Waiter cwaiter; // For consumer. 1368 Waiter cwaiter; // For consumer.
1367 HandleSignalsState hss; 1369 HandleSignalsState hss;
1368 1370
1369 // It should be writable. 1371 // It should be writable.
1370 pwaiter.Init(); 1372 pwaiter.Init();
1371 hss = HandleSignalsState(); 1373 hss = HandleSignalsState();
1372 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1374 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1373 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0, 1375 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
1374 &hss)); 1376 false, 0, &hss));
1375 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1377 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1376 hss.satisfied_signals); 1378 hss.satisfied_signals);
1377 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1379 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1378 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1380 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1379 hss.satisfiable_signals); 1381 hss.satisfiable_signals);
1380 1382
1381 void* write_ptr = nullptr; 1383 void* write_ptr = nullptr;
1382 uint32_t num_bytes = 0u; 1384 uint32_t num_bytes = 0u;
1383 EXPECT_EQ(MOJO_RESULT_OK, 1385 EXPECT_EQ(MOJO_RESULT_OK,
1384 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr), 1386 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
1385 MakeUserPointer(&num_bytes))); 1387 MakeUserPointer(&num_bytes)));
1386 EXPECT_TRUE(write_ptr); 1388 EXPECT_TRUE(write_ptr);
1387 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 1389 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
1388 1390
1389 // At this point, it shouldn't be writable. 1391 // At this point, it shouldn't be writable.
1390 pwaiter.Init(); 1392 pwaiter.Init();
1391 ASSERT_EQ(MOJO_RESULT_OK, 1393 ASSERT_EQ(MOJO_RESULT_OK,
1392 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1, 1394 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
1393 nullptr)); 1395 false, 1, nullptr));
1394 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr)); 1396 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
1395 hss = HandleSignalsState(); 1397 hss = HandleSignalsState();
1396 this->ProducerRemoveAwakable(&pwaiter, &hss); 1398 this->ProducerRemoveAwakable(&pwaiter, &hss);
1397 EXPECT_EQ(0u, hss.satisfied_signals); 1399 EXPECT_EQ(0u, hss.satisfied_signals);
1398 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1400 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1399 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1401 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1400 hss.satisfiable_signals); 1402 hss.satisfiable_signals);
1401 1403
1402 // It shouldn't be readable yet either (we'll wait later). 1404 // It shouldn't be readable yet either (we'll wait later).
1403 cwaiter.Init(); 1405 cwaiter.Init();
1404 ASSERT_EQ(MOJO_RESULT_OK, 1406 ASSERT_EQ(MOJO_RESULT_OK,
1405 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 2, 1407 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE,
1406 nullptr)); 1408 false, 2, nullptr));
1407 1409
1408 static_cast<int32_t*>(write_ptr)[0] = 123; 1410 static_cast<int32_t*>(write_ptr)[0] = 123;
1409 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData( 1411 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(
1410 static_cast<uint32_t>(1u * sizeof(int32_t)))); 1412 static_cast<uint32_t>(1u * sizeof(int32_t))));
1411 1413
1412 // It should immediately be writable again. 1414 // It should immediately be writable again.
1413 pwaiter.Init(); 1415 pwaiter.Init();
1414 hss = HandleSignalsState(); 1416 hss = HandleSignalsState();
1415 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1417 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1416 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3, 1418 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
1417 &hss)); 1419 false, 3, &hss));
1418 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1420 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1419 hss.satisfied_signals); 1421 hss.satisfied_signals);
1420 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1422 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1421 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1423 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1422 hss.satisfiable_signals); 1424 hss.satisfiable_signals);
1423 1425
1424 // It should become readable. 1426 // It should become readable.
1425 EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyTimeout(), nullptr)); 1427 EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyTimeout(), nullptr));
1426 hss = HandleSignalsState(); 1428 hss = HandleSignalsState();
1427 this->ConsumerRemoveAwakable(&cwaiter, &hss); 1429 this->ConsumerRemoveAwakable(&cwaiter, &hss);
(...skipping 10 matching lines...) Expand all
1438 EXPECT_EQ(MOJO_RESULT_OK, 1440 EXPECT_EQ(MOJO_RESULT_OK,
1439 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr), 1441 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
1440 MakeUserPointer(&num_bytes))); 1442 MakeUserPointer(&num_bytes)));
1441 EXPECT_TRUE(write_ptr); 1443 EXPECT_TRUE(write_ptr);
1442 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 1444 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
1443 1445
1444 // It should be readable. 1446 // It should be readable.
1445 cwaiter.Init(); 1447 cwaiter.Init();
1446 hss = HandleSignalsState(); 1448 hss = HandleSignalsState();
1447 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1449 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1448 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5, 1450 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE,
1449 &hss)); 1451 false, 5, &hss));
1450 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1452 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1451 hss.satisfied_signals); 1453 hss.satisfied_signals);
1452 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1454 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1453 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1455 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1454 hss.satisfiable_signals); 1456 hss.satisfiable_signals);
1455 1457
1456 // End the two-phase write without writing anything. 1458 // End the two-phase write without writing anything.
1457 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u)); 1459 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
1458 1460
1459 // Start a two-phase read. 1461 // Start a two-phase read.
1460 const void* read_ptr = nullptr; 1462 const void* read_ptr = nullptr;
1461 num_bytes = 0u; 1463 num_bytes = 0u;
1462 EXPECT_EQ(MOJO_RESULT_OK, 1464 EXPECT_EQ(MOJO_RESULT_OK,
1463 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr), 1465 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
1464 MakeUserPointer(&num_bytes))); 1466 MakeUserPointer(&num_bytes)));
1465 EXPECT_TRUE(read_ptr); 1467 EXPECT_TRUE(read_ptr);
1466 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes); 1468 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
1467 1469
1468 // At this point, it should still be writable. 1470 // At this point, it should still be writable.
1469 pwaiter.Init(); 1471 pwaiter.Init();
1470 hss = HandleSignalsState(); 1472 hss = HandleSignalsState();
1471 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1473 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1472 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6, 1474 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE,
1473 &hss)); 1475 false, 6, &hss));
1474 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1476 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1475 hss.satisfied_signals); 1477 hss.satisfied_signals);
1476 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1478 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1477 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1479 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1478 hss.satisfiable_signals); 1480 hss.satisfiable_signals);
1479 1481
1480 // But not readable. 1482 // But not readable.
1481 cwaiter.Init(); 1483 cwaiter.Init();
1482 ASSERT_EQ(MOJO_RESULT_OK, 1484 ASSERT_EQ(MOJO_RESULT_OK,
1483 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 7, 1485 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE,
1484 nullptr)); 1486 false, 7, nullptr));
1485 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr)); 1487 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr));
1486 hss = HandleSignalsState(); 1488 hss = HandleSignalsState();
1487 this->ConsumerRemoveAwakable(&cwaiter, &hss); 1489 this->ConsumerRemoveAwakable(&cwaiter, &hss);
1488 EXPECT_EQ(0u, hss.satisfied_signals); 1490 EXPECT_EQ(0u, hss.satisfied_signals);
1489 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1491 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1490 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1492 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1491 hss.satisfiable_signals); 1493 hss.satisfiable_signals);
1492 1494
1493 // End the two-phase read without reading anything. 1495 // End the two-phase read without reading anything.
1494 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u)); 1496 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
1495 1497
1496 // It should be readable again. 1498 // It should be readable again.
1497 cwaiter.Init(); 1499 cwaiter.Init();
1498 hss = HandleSignalsState(); 1500 hss = HandleSignalsState();
1499 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1501 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1500 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8, 1502 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE,
1501 &hss)); 1503 false, 8, &hss));
1502 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1504 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1503 hss.satisfied_signals); 1505 hss.satisfied_signals);
1504 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1506 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1505 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1507 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1506 hss.satisfiable_signals); 1508 hss.satisfiable_signals);
1507 1509
1508 this->ProducerClose(); 1510 this->ProducerClose();
1509 this->ConsumerClose(); 1511 this->ConsumerClose();
1510 } 1512 }
1511 1513
(...skipping 25 matching lines...) Expand all
1537 1539
1538 // Should still be empty. 1540 // Should still be empty.
1539 num_bytes = ~0u; 1541 num_bytes = ~0u;
1540 EXPECT_EQ(MOJO_RESULT_OK, 1542 EXPECT_EQ(MOJO_RESULT_OK,
1541 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 1543 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1542 EXPECT_EQ(0u, num_bytes); 1544 EXPECT_EQ(0u, num_bytes);
1543 1545
1544 // Add waiter. 1546 // Add waiter.
1545 waiter.Init(); 1547 waiter.Init();
1546 ASSERT_EQ(MOJO_RESULT_OK, 1548 ASSERT_EQ(MOJO_RESULT_OK,
1547 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, 1549 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1548 nullptr)); 1550 false, 1, nullptr));
1549 1551
1550 // Write some data. 1552 // Write some data.
1551 num_bytes = 5u * sizeof(int32_t); 1553 num_bytes = 5u * sizeof(int32_t);
1552 Seq(100, MOJO_ARRAYSIZE(buffer), buffer); 1554 Seq(100, MOJO_ARRAYSIZE(buffer), buffer);
1553 EXPECT_EQ(MOJO_RESULT_OK, 1555 EXPECT_EQ(MOJO_RESULT_OK,
1554 this->ProducerWriteData(UserPointer<const void>(buffer), 1556 this->ProducerWriteData(UserPointer<const void>(buffer),
1555 MakeUserPointer(&num_bytes), true)); 1557 MakeUserPointer(&num_bytes), true));
1556 EXPECT_EQ(5u * sizeof(int32_t), num_bytes); 1558 EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1557 1559
1558 // Wait for data. 1560 // Wait for data.
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1660 // Three left. 1662 // Three left.
1661 num_bytes = 0u; 1663 num_bytes = 0u;
1662 EXPECT_EQ(MOJO_RESULT_OK, 1664 EXPECT_EQ(MOJO_RESULT_OK,
1663 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 1665 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1664 EXPECT_EQ(3u * sizeof(int32_t), num_bytes); 1666 EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
1665 1667
1666 // We'll need to wait for the peer closed to propagate. 1668 // We'll need to wait for the peer closed to propagate.
1667 waiter.Init(); 1669 waiter.Init();
1668 ASSERT_EQ(MOJO_RESULT_OK, 1670 ASSERT_EQ(MOJO_RESULT_OK,
1669 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1671 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1670 2, nullptr)); 1672 false, 2, nullptr));
1671 1673
1672 // Close the producer, then test producer-closed cases. 1674 // Close the producer, then test producer-closed cases.
1673 this->ProducerClose(); 1675 this->ProducerClose();
1674 1676
1675 // Wait. 1677 // Wait.
1676 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 1678 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
1677 hss = HandleSignalsState(); 1679 hss = HandleSignalsState();
1678 this->ConsumerRemoveAwakable(&waiter, &hss); 1680 this->ConsumerRemoveAwakable(&waiter, &hss);
1679 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1681 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1680 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1682 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
1746 ASSERT_EQ(100u, validated_options.capacity_num_bytes); 1748 ASSERT_EQ(100u, validated_options.capacity_num_bytes);
1747 this->Create(options); 1749 this->Create(options);
1748 this->DoTransfer(); 1750 this->DoTransfer();
1749 1751
1750 Waiter waiter; 1752 Waiter waiter;
1751 HandleSignalsState hss; 1753 HandleSignalsState hss;
1752 1754
1753 // Add waiter. 1755 // Add waiter.
1754 waiter.Init(); 1756 waiter.Init();
1755 ASSERT_EQ(MOJO_RESULT_OK, 1757 ASSERT_EQ(MOJO_RESULT_OK,
1756 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, 1758 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1757 nullptr)); 1759 false, 1, nullptr));
1758 1760
1759 // Write 20 bytes. 1761 // Write 20 bytes.
1760 uint32_t num_bytes = 20u; 1762 uint32_t num_bytes = 20u;
1761 EXPECT_EQ(MOJO_RESULT_OK, 1763 EXPECT_EQ(MOJO_RESULT_OK,
1762 this->ProducerWriteData(UserPointer<const void>(&test_data[0]), 1764 this->ProducerWriteData(UserPointer<const void>(&test_data[0]),
1763 MakeUserPointer(&num_bytes), false)); 1765 MakeUserPointer(&num_bytes), false));
1764 EXPECT_EQ(20u, num_bytes); 1766 EXPECT_EQ(20u, num_bytes);
1765 1767
1766 // Wait for data. 1768 // Wait for data.
1767 // TODO(vtl): (See corresponding TODO in AllOrNone.) 1769 // TODO(vtl): (See corresponding TODO in AllOrNone.)
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after
1951 }; 1953 };
1952 this->Create(options); 1954 this->Create(options);
1953 this->DoTransfer(); 1955 this->DoTransfer();
1954 1956
1955 Waiter waiter; 1957 Waiter waiter;
1956 HandleSignalsState hss; 1958 HandleSignalsState hss;
1957 1959
1958 // Add waiter. 1960 // Add waiter.
1959 waiter.Init(); 1961 waiter.Init();
1960 ASSERT_EQ(MOJO_RESULT_OK, 1962 ASSERT_EQ(MOJO_RESULT_OK,
1961 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, 1963 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
1962 nullptr)); 1964 false, 1, nullptr));
1963 1965
1964 // Write some data, so we'll have something to read. 1966 // Write some data, so we'll have something to read.
1965 uint32_t num_bytes = kTestDataSize; 1967 uint32_t num_bytes = kTestDataSize;
1966 EXPECT_EQ(MOJO_RESULT_OK, 1968 EXPECT_EQ(MOJO_RESULT_OK,
1967 this->ProducerWriteData(UserPointer<const void>(kTestData), 1969 this->ProducerWriteData(UserPointer<const void>(kTestData),
1968 MakeUserPointer(&num_bytes), false)); 1970 MakeUserPointer(&num_bytes), false));
1969 EXPECT_EQ(kTestDataSize, num_bytes); 1971 EXPECT_EQ(kTestDataSize, num_bytes);
1970 1972
1971 // Start two-phase write. 1973 // Start two-phase write.
1972 void* write_buffer_ptr = nullptr; 1974 void* write_buffer_ptr = nullptr;
(...skipping 21 matching lines...) Expand all
1994 EXPECT_EQ(MOJO_RESULT_OK, 1996 EXPECT_EQ(MOJO_RESULT_OK,
1995 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr), 1997 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
1996 MakeUserPointer(&num_bytes))); 1998 MakeUserPointer(&num_bytes)));
1997 EXPECT_TRUE(read_buffer_ptr); 1999 EXPECT_TRUE(read_buffer_ptr);
1998 EXPECT_EQ(kTestDataSize, num_bytes); 2000 EXPECT_EQ(kTestDataSize, num_bytes);
1999 2001
2000 // Add waiter. 2002 // Add waiter.
2001 waiter.Init(); 2003 waiter.Init();
2002 ASSERT_EQ(MOJO_RESULT_OK, 2004 ASSERT_EQ(MOJO_RESULT_OK,
2003 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2005 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2004 1, nullptr)); 2006 false, 1, nullptr));
2005 2007
2006 // Close the consumer. 2008 // Close the consumer.
2007 this->ConsumerClose(); 2009 this->ConsumerClose();
2008 2010
2009 // Wait for producer to know that the consumer is closed. 2011 // Wait for producer to know that the consumer is closed.
2010 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2012 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2011 hss = HandleSignalsState(); 2013 hss = HandleSignalsState();
2012 this->ProducerRemoveAwakable(&waiter, &hss); 2014 this->ProducerRemoveAwakable(&waiter, &hss);
2013 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2015 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2014 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2016 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
2085 uint32_t num_bytes = kTestDataSize; 2087 uint32_t num_bytes = kTestDataSize;
2086 EXPECT_EQ(MOJO_RESULT_OK, 2088 EXPECT_EQ(MOJO_RESULT_OK,
2087 this->ProducerWriteData(UserPointer<const void>(kTestData), 2089 this->ProducerWriteData(UserPointer<const void>(kTestData),
2088 MakeUserPointer(&num_bytes), false)); 2090 MakeUserPointer(&num_bytes), false));
2089 EXPECT_EQ(kTestDataSize, num_bytes); 2091 EXPECT_EQ(kTestDataSize, num_bytes);
2090 2092
2091 // Add waiter. 2093 // Add waiter.
2092 waiter.Init(); 2094 waiter.Init();
2093 ASSERT_EQ(MOJO_RESULT_OK, 2095 ASSERT_EQ(MOJO_RESULT_OK,
2094 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2096 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2095 1, nullptr)); 2097 false, 1, nullptr));
2096 2098
2097 // Close the producer. 2099 // Close the producer.
2098 this->ProducerClose(); 2100 this->ProducerClose();
2099 2101
2100 // Wait. (Note that once the consumer knows that the producer is closed, it 2102 // Wait. (Note that once the consumer knows that the producer is closed, it
2101 // must also know about all the data that was sent.) 2103 // must also know about all the data that was sent.)
2102 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2104 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2103 hss = HandleSignalsState(); 2105 hss = HandleSignalsState();
2104 this->ConsumerRemoveAwakable(&waiter, &hss); 2106 this->ConsumerRemoveAwakable(&waiter, &hss);
2105 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2107 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
2162 }; 2164 };
2163 this->Create(options); 2165 this->Create(options);
2164 this->DoTransfer(); 2166 this->DoTransfer();
2165 2167
2166 Waiter waiter; 2168 Waiter waiter;
2167 HandleSignalsState hss; 2169 HandleSignalsState hss;
2168 2170
2169 // Add waiter. 2171 // Add waiter.
2170 waiter.Init(); 2172 waiter.Init();
2171 ASSERT_EQ(MOJO_RESULT_OK, 2173 ASSERT_EQ(MOJO_RESULT_OK,
2172 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0, 2174 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
2173 nullptr)); 2175 false, 0, nullptr));
2174 2176
2175 // Write some data, so we'll have something to read. 2177 // Write some data, so we'll have something to read.
2176 uint32_t num_bytes = kTestDataSize; 2178 uint32_t num_bytes = kTestDataSize;
2177 EXPECT_EQ(MOJO_RESULT_OK, 2179 EXPECT_EQ(MOJO_RESULT_OK,
2178 this->ProducerWriteData(UserPointer<const void>(&kTestData), 2180 this->ProducerWriteData(UserPointer<const void>(&kTestData),
2179 MakeUserPointer(&num_bytes), false)); 2181 MakeUserPointer(&num_bytes), false));
2180 EXPECT_EQ(kTestDataSize, num_bytes); 2182 EXPECT_EQ(kTestDataSize, num_bytes);
2181 2183
2182 // Wait. 2184 // Wait.
2183 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2185 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
(...skipping 10 matching lines...) Expand all
2194 num_bytes = static_cast<uint32_t>(sizeof(data)); 2196 num_bytes = static_cast<uint32_t>(sizeof(data));
2195 EXPECT_EQ(MOJO_RESULT_OK, 2197 EXPECT_EQ(MOJO_RESULT_OK,
2196 this->ConsumerReadData(UserPointer<void>(data), 2198 this->ConsumerReadData(UserPointer<void>(data),
2197 MakeUserPointer(&num_bytes), false, false)); 2199 MakeUserPointer(&num_bytes), false, false));
2198 EXPECT_EQ(kTestDataSize, num_bytes); 2200 EXPECT_EQ(kTestDataSize, num_bytes);
2199 EXPECT_EQ(kTestData, data[0]); 2201 EXPECT_EQ(kTestData, data[0]);
2200 2202
2201 // Add waiter again. 2203 // Add waiter again.
2202 waiter.Init(); 2204 waiter.Init();
2203 ASSERT_EQ(MOJO_RESULT_OK, 2205 ASSERT_EQ(MOJO_RESULT_OK,
2204 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0, 2206 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
2205 nullptr)); 2207 false, 0, nullptr));
2206 2208
2207 // Close the producer. 2209 // Close the producer.
2208 this->ProducerClose(); 2210 this->ProducerClose();
2209 2211
2210 // Wait. 2212 // Wait.
2211 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2213 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2212 waiter.Wait(test::TinyTimeout(), nullptr)); 2214 waiter.Wait(test::TinyTimeout(), nullptr));
2213 hss = HandleSignalsState(); 2215 hss = HandleSignalsState();
2214 this->ConsumerRemoveAwakable(&waiter, &hss); 2216 this->ConsumerRemoveAwakable(&waiter, &hss);
2215 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2217 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
(...skipping 17 matching lines...) Expand all
2233 }; 2235 };
2234 this->Create(options); 2236 this->Create(options);
2235 this->DoTransfer(); 2237 this->DoTransfer();
2236 2238
2237 Waiter waiter; 2239 Waiter waiter;
2238 HandleSignalsState hss; 2240 HandleSignalsState hss;
2239 2241
2240 // Add waiter (for the consumer to become readable). 2242 // Add waiter (for the consumer to become readable).
2241 waiter.Init(); 2243 waiter.Init();
2242 ASSERT_EQ(MOJO_RESULT_OK, 2244 ASSERT_EQ(MOJO_RESULT_OK,
2243 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0, 2245 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
2244 nullptr)); 2246 false, 0, nullptr));
2245 2247
2246 // Write some data, so we'll have something to read. 2248 // Write some data, so we'll have something to read.
2247 uint32_t num_bytes = kTestDataSize; 2249 uint32_t num_bytes = kTestDataSize;
2248 EXPECT_EQ(MOJO_RESULT_OK, 2250 EXPECT_EQ(MOJO_RESULT_OK,
2249 this->ProducerWriteData(UserPointer<const void>(&kTestData), 2251 this->ProducerWriteData(UserPointer<const void>(&kTestData),
2250 MakeUserPointer(&num_bytes), false)); 2252 MakeUserPointer(&num_bytes), false));
2251 EXPECT_EQ(kTestDataSize, num_bytes); 2253 EXPECT_EQ(kTestDataSize, num_bytes);
2252 2254
2253 // Wait. 2255 // Wait.
2254 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2256 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
(...skipping 11 matching lines...) Expand all
2266 EXPECT_EQ(MOJO_RESULT_OK, 2268 EXPECT_EQ(MOJO_RESULT_OK,
2267 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr), 2269 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
2268 MakeUserPointer(&num_bytes))); 2270 MakeUserPointer(&num_bytes)));
2269 EXPECT_EQ(kTestDataSize, num_bytes); 2271 EXPECT_EQ(kTestDataSize, num_bytes);
2270 EXPECT_EQ(kTestData, static_cast<const int64_t*>(read_ptr)[0]); 2272 EXPECT_EQ(kTestData, static_cast<const int64_t*>(read_ptr)[0]);
2271 2273
2272 // Add waiter (for the producer to be closed). 2274 // Add waiter (for the producer to be closed).
2273 waiter.Init(); 2275 waiter.Init();
2274 ASSERT_EQ(MOJO_RESULT_OK, 2276 ASSERT_EQ(MOJO_RESULT_OK,
2275 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2277 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2276 0, nullptr)); 2278 false, 0, nullptr));
2277 2279
2278 // Close the producer. 2280 // Close the producer.
2279 this->ProducerClose(); 2281 this->ProducerClose();
2280 2282
2281 // Wait for producer close to be detected. 2283 // Wait for producer close to be detected.
2282 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2284 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
2283 hss = HandleSignalsState(); 2285 hss = HandleSignalsState();
2284 this->ConsumerRemoveAwakable(&waiter, &hss); 2286 this->ConsumerRemoveAwakable(&waiter, &hss);
2285 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2287 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2286 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2288 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2287 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2289 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2288 hss.satisfiable_signals); 2290 hss.satisfiable_signals);
2289 2291
2290 // Add waiter (for the consumer to become readable). 2292 // Add waiter (for the consumer to become readable).
2291 waiter.Init(); 2293 waiter.Init();
2292 ASSERT_EQ(MOJO_RESULT_OK, 2294 ASSERT_EQ(MOJO_RESULT_OK,
2293 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0, 2295 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
2294 nullptr)); 2296 false, 0, nullptr));
2295 2297
2296 // Complete the two-phase read. 2298 // Complete the two-phase read.
2297 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize)); 2299 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize));
2298 2300
2299 // Wait. 2301 // Wait.
2300 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2302 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2301 waiter.Wait(test::TinyTimeout(), nullptr)); 2303 waiter.Wait(test::TinyTimeout(), nullptr));
2302 hss = HandleSignalsState(); 2304 hss = HandleSignalsState();
2303 this->ConsumerRemoveAwakable(&waiter, &hss); 2305 this->ConsumerRemoveAwakable(&waiter, &hss);
2304 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2306 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
(...skipping 23 matching lines...) Expand all
2328 void* write_ptr = nullptr; 2330 void* write_ptr = nullptr;
2329 uint32_t num_bytes = 0u; 2331 uint32_t num_bytes = 0u;
2330 EXPECT_EQ(MOJO_RESULT_OK, 2332 EXPECT_EQ(MOJO_RESULT_OK,
2331 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr), 2333 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
2332 MakeUserPointer(&num_bytes))); 2334 MakeUserPointer(&num_bytes)));
2333 2335
2334 // Add waiter (for the consumer to be closed). 2336 // Add waiter (for the consumer to be closed).
2335 waiter1.Init(); 2337 waiter1.Init();
2336 ASSERT_EQ(MOJO_RESULT_OK, 2338 ASSERT_EQ(MOJO_RESULT_OK,
2337 this->ProducerAddAwakable(&waiter1, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2339 this->ProducerAddAwakable(&waiter1, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2338 0, nullptr)); 2340 false, 0, nullptr));
2339 2341
2340 // Add a separate waiter (for the producer to become writable). 2342 // Add a separate waiter (for the producer to become writable).
2341 waiter2.Init(); 2343 waiter2.Init();
2342 ASSERT_EQ(MOJO_RESULT_OK, 2344 ASSERT_EQ(MOJO_RESULT_OK,
2343 this->ProducerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_WRITABLE, 0, 2345 this->ProducerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_WRITABLE,
2344 nullptr)); 2346 false, 0, nullptr));
2345 2347
2346 // Close the consumer. 2348 // Close the consumer.
2347 this->ConsumerClose(); 2349 this->ConsumerClose();
2348 2350
2349 // Wait for the consumer close to be detected. 2351 // Wait for the consumer close to be detected.
2350 // Note: If we didn't wait for the consumer close to be detected before 2352 // Note: If we didn't wait for the consumer close to be detected before
2351 // completing the two-phase write, wait might succeed (in the remote cases). 2353 // completing the two-phase write, wait might succeed (in the remote cases).
2352 // This is because the first |Awake()| "wins". 2354 // This is because the first |Awake()| "wins".
2353 EXPECT_EQ(MOJO_RESULT_OK, waiter1.Wait(test::TinyTimeout(), nullptr)); 2355 EXPECT_EQ(MOJO_RESULT_OK, waiter1.Wait(test::TinyTimeout(), nullptr));
2354 hss = HandleSignalsState(); 2356 hss = HandleSignalsState();
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
2445 2447
2446 // Still no data. 2448 // Still no data.
2447 num_bytes = 1000u; 2449 num_bytes = 1000u;
2448 EXPECT_EQ(MOJO_RESULT_OK, 2450 EXPECT_EQ(MOJO_RESULT_OK,
2449 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 2451 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
2450 EXPECT_EQ(0u, num_bytes); 2452 EXPECT_EQ(0u, num_bytes);
2451 2453
2452 // Add waiter. 2454 // Add waiter.
2453 waiter.Init(); 2455 waiter.Init();
2454 ASSERT_EQ(MOJO_RESULT_OK, 2456 ASSERT_EQ(MOJO_RESULT_OK,
2455 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, 2457 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE,
2456 nullptr)); 2458 false, 1, nullptr));
2457 2459
2458 // Now write some data, so we'll be able to try reading. 2460 // Now write some data, so we'll be able to try reading.
2459 int32_t element = 123; 2461 int32_t element = 123;
2460 num_bytes = 1u * sizeof(int32_t); 2462 num_bytes = 1u * sizeof(int32_t);
2461 EXPECT_EQ(MOJO_RESULT_OK, 2463 EXPECT_EQ(MOJO_RESULT_OK,
2462 this->ProducerWriteData(UserPointer<const void>(&element), 2464 this->ProducerWriteData(UserPointer<const void>(&element),
2463 MakeUserPointer(&num_bytes), false)); 2465 MakeUserPointer(&num_bytes), false));
2464 2466
2465 // Wait for data. 2467 // Wait for data.
2466 // TODO(vtl): (See corresponding TODO in AllOrNone.) 2468 // TODO(vtl): (See corresponding TODO in AllOrNone.)
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after
2603 this->ProducerGetOptions(&write_threshold_num_bytes); 2605 this->ProducerGetOptions(&write_threshold_num_bytes);
2604 EXPECT_EQ(0u, write_threshold_num_bytes); 2606 EXPECT_EQ(0u, write_threshold_num_bytes);
2605 2607
2606 Waiter waiter; 2608 Waiter waiter;
2607 HandleSignalsState hss; 2609 HandleSignalsState hss;
2608 2610
2609 // Try to wait to the write threshold signal; it should already have it. 2611 // Try to wait to the write threshold signal; it should already have it.
2610 waiter.Init(); 2612 waiter.Init();
2611 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, 2613 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
2612 this->ProducerAddAwakable( 2614 this->ProducerAddAwakable(
2613 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, &hss)); 2615 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, false, 0, &hss));
2614 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2616 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2615 hss.satisfied_signals); 2617 hss.satisfied_signals);
2616 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2618 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2617 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2619 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2618 hss.satisfiable_signals); 2620 hss.satisfiable_signals);
2619 2621
2620 // Before writing, add a waiter on the consumer (since we'll need to know when 2622 // Before writing, add a waiter on the consumer (since we'll need to know when
2621 // the written bytes have propagated). 2623 // the written bytes have propagated).
2622 Waiter read_waiter; 2624 Waiter read_waiter;
2623 read_waiter.Init(); 2625 read_waiter.Init();
2624 ASSERT_EQ(MOJO_RESULT_OK, 2626 ASSERT_EQ(MOJO_RESULT_OK,
2625 this->ConsumerAddAwakable(&read_waiter, MOJO_HANDLE_SIGNAL_READABLE, 2627 this->ConsumerAddAwakable(&read_waiter, MOJO_HANDLE_SIGNAL_READABLE,
2626 0, nullptr)); 2628 false, 0, nullptr));
2627 2629
2628 // Write 5 bytes. 2630 // Write 5 bytes.
2629 static const char kTestData[5] = {'A', 'B', 'C', 'D', 'E'}; 2631 static const char kTestData[5] = {'A', 'B', 'C', 'D', 'E'};
2630 uint32_t num_bytes = 5; 2632 uint32_t num_bytes = 5;
2631 EXPECT_EQ(MOJO_RESULT_OK, 2633 EXPECT_EQ(MOJO_RESULT_OK,
2632 this->ProducerWriteData(UserPointer<const void>(kTestData), 2634 this->ProducerWriteData(UserPointer<const void>(kTestData),
2633 MakeUserPointer(&num_bytes), false)); 2635 MakeUserPointer(&num_bytes), false));
2634 EXPECT_EQ(5u, num_bytes); 2636 EXPECT_EQ(5u, num_bytes);
2635 2637
2636 // It should still have the write threshold signal. 2638 // It should still have the write threshold signal.
2637 waiter.Init(); 2639 waiter.Init();
2638 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, 2640 ASSERT_EQ(
2639 this->ProducerAddAwakable( 2641 MOJO_RESULT_ALREADY_EXISTS,
2640 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr)); 2642 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2643 false, 0, nullptr));
2641 2644
2642 // Set the write threshold to 5. 2645 // Set the write threshold to 5.
2643 this->ProducerSetOptions(5); 2646 this->ProducerSetOptions(5);
2644 write_threshold_num_bytes = 123u; 2647 write_threshold_num_bytes = 123u;
2645 this->ProducerGetOptions(&write_threshold_num_bytes); 2648 this->ProducerGetOptions(&write_threshold_num_bytes);
2646 EXPECT_EQ(5u, write_threshold_num_bytes); 2649 EXPECT_EQ(5u, write_threshold_num_bytes);
2647 2650
2648 // Should still have the write threshold signal. 2651 // Should still have the write threshold signal.
2649 waiter.Init(); 2652 waiter.Init();
2650 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, 2653 ASSERT_EQ(
2651 this->ProducerAddAwakable( 2654 MOJO_RESULT_ALREADY_EXISTS,
2652 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr)); 2655 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2656 false, 0, nullptr));
2653 2657
2654 // Set the write threshold to 6. 2658 // Set the write threshold to 6.
2655 this->ProducerSetOptions(6); 2659 this->ProducerSetOptions(6);
2656 2660
2657 // Should no longer have the write threshold signal. 2661 // Should no longer have the write threshold signal.
2658 waiter.Init(); 2662 waiter.Init();
2659 ASSERT_EQ(MOJO_RESULT_OK, 2663 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2660 this->ProducerAddAwakable( 2664 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2661 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr)); 2665 false, 0, nullptr));
2662 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr)); 2666 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2663 hss = HandleSignalsState(); 2667 hss = HandleSignalsState();
2664 this->ProducerRemoveAwakable(&waiter, &hss); 2668 this->ProducerRemoveAwakable(&waiter, &hss);
2665 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 2669 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
2666 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2670 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2667 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2671 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2668 hss.satisfiable_signals); 2672 hss.satisfiable_signals);
2669 2673
2670 // Add a waiter. 2674 // Add a waiter.
2671 waiter.Init(); 2675 waiter.Init();
2672 ASSERT_EQ(MOJO_RESULT_OK, 2676 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2673 this->ProducerAddAwakable( 2677 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2674 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr)); 2678 false, 0, nullptr));
2675 2679
2676 // Wait for the consumer to be readable. 2680 // Wait for the consumer to be readable.
2677 EXPECT_EQ(MOJO_RESULT_OK, read_waiter.Wait(test::TinyTimeout(), nullptr)); 2681 EXPECT_EQ(MOJO_RESULT_OK, read_waiter.Wait(test::TinyTimeout(), nullptr));
2678 this->ConsumerRemoveAwakable(&read_waiter, nullptr); 2682 this->ConsumerRemoveAwakable(&read_waiter, nullptr);
2679 2683
2680 // Read a byte. 2684 // Read a byte.
2681 char read_byte = 'a'; 2685 char read_byte = 'a';
2682 num_bytes = sizeof(read_byte); 2686 num_bytes = sizeof(read_byte);
2683 EXPECT_EQ(MOJO_RESULT_OK, 2687 EXPECT_EQ(MOJO_RESULT_OK,
2684 this->ConsumerReadData(UserPointer<void>(&read_byte), 2688 this->ConsumerReadData(UserPointer<void>(&read_byte),
(...skipping 14 matching lines...) Expand all
2699 // Write 6 more bytes. 2703 // Write 6 more bytes.
2700 static const char kMoreTestData[6] = {'1', '2', '3', '4', '5', '6'}; 2704 static const char kMoreTestData[6] = {'1', '2', '3', '4', '5', '6'};
2701 num_bytes = 6; 2705 num_bytes = 6;
2702 EXPECT_EQ(MOJO_RESULT_OK, 2706 EXPECT_EQ(MOJO_RESULT_OK,
2703 this->ProducerWriteData(UserPointer<const void>(kMoreTestData), 2707 this->ProducerWriteData(UserPointer<const void>(kMoreTestData),
2704 MakeUserPointer(&num_bytes), false)); 2708 MakeUserPointer(&num_bytes), false));
2705 EXPECT_EQ(6u, num_bytes); 2709 EXPECT_EQ(6u, num_bytes);
2706 2710
2707 // Should no longer have the write threshold signal. 2711 // Should no longer have the write threshold signal.
2708 waiter.Init(); 2712 waiter.Init();
2709 ASSERT_EQ(MOJO_RESULT_OK, 2713 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2710 this->ProducerAddAwakable( 2714 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2711 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr)); 2715 false, 0, nullptr));
2712 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr)); 2716 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2713 this->ProducerRemoveAwakable(&waiter, nullptr); 2717 this->ProducerRemoveAwakable(&waiter, nullptr);
2714 2718
2715 // Set the write threshold to 0 (which means the element size, 1). 2719 // Set the write threshold to 0 (which means the element size, 1).
2716 this->ProducerSetOptions(0); 2720 this->ProducerSetOptions(0);
2717 write_threshold_num_bytes = 123u; 2721 write_threshold_num_bytes = 123u;
2718 this->ProducerGetOptions(&write_threshold_num_bytes); 2722 this->ProducerGetOptions(&write_threshold_num_bytes);
2719 EXPECT_EQ(0u, write_threshold_num_bytes); 2723 EXPECT_EQ(0u, write_threshold_num_bytes);
2720 2724
2721 // Should still not have the write threshold signal. 2725 // Should still not have the write threshold signal.
2722 waiter.Init(); 2726 waiter.Init();
2723 ASSERT_EQ(MOJO_RESULT_OK, 2727 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2724 this->ProducerAddAwakable( 2728 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2725 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr)); 2729 false, 0, nullptr));
2726 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr)); 2730 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2727 this->ProducerRemoveAwakable(&waiter, nullptr); 2731 this->ProducerRemoveAwakable(&waiter, nullptr);
2728 2732
2729 // Add a waiter. 2733 // Add a waiter.
2730 waiter.Init(); 2734 waiter.Init();
2731 ASSERT_EQ(MOJO_RESULT_OK, 2735 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2732 this->ProducerAddAwakable( 2736 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2733 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 0, nullptr)); 2737 false, 0, nullptr));
2734 2738
2735 // Close the consumer. 2739 // Close the consumer.
2736 this->ConsumerClose(); 2740 this->ConsumerClose();
2737 2741
2738 // Wait; the condition should now never be satisfiable. 2742 // Wait; the condition should now never be satisfiable.
2739 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2743 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2740 waiter.Wait(test::TinyTimeout(), nullptr)); 2744 waiter.Wait(test::TinyTimeout(), nullptr));
2741 hss = HandleSignalsState(); 2745 hss = HandleSignalsState();
2742 this->ProducerRemoveAwakable(&waiter, &hss); 2746 this->ProducerRemoveAwakable(&waiter, &hss);
2743 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2747 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
(...skipping 18 matching lines...) Expand all
2762 this->ConsumerGetOptions(&read_threshold_num_bytes); 2766 this->ConsumerGetOptions(&read_threshold_num_bytes);
2763 EXPECT_EQ(0u, read_threshold_num_bytes); 2767 EXPECT_EQ(0u, read_threshold_num_bytes);
2764 2768
2765 Waiter waiter; 2769 Waiter waiter;
2766 HandleSignalsState hss; 2770 HandleSignalsState hss;
2767 2771
2768 // Add a waiter. 2772 // Add a waiter.
2769 waiter.Init(); 2773 waiter.Init();
2770 ASSERT_EQ(MOJO_RESULT_OK, 2774 ASSERT_EQ(MOJO_RESULT_OK,
2771 this->ConsumerAddAwakable( 2775 this->ConsumerAddAwakable(
2772 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr)); 2776 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr));
2773 // Trivial wait: it shouldn't have the read threshold signal. 2777 // Trivial wait: it shouldn't have the read threshold signal.
2774 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr)); 2778 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2775 hss = HandleSignalsState(); 2779 hss = HandleSignalsState();
2776 this->ConsumerRemoveAwakable(&waiter, &hss); 2780 this->ConsumerRemoveAwakable(&waiter, &hss);
2777 EXPECT_EQ(0u, hss.satisfied_signals); 2781 EXPECT_EQ(0u, hss.satisfied_signals);
2778 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2782 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2779 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2783 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2780 hss.satisfiable_signals); 2784 hss.satisfiable_signals);
2781 2785
2782 // Add a waiter. 2786 // Add a waiter.
2783 waiter.Init(); 2787 waiter.Init();
2784 ASSERT_EQ(MOJO_RESULT_OK, 2788 ASSERT_EQ(MOJO_RESULT_OK,
2785 this->ConsumerAddAwakable( 2789 this->ConsumerAddAwakable(
2786 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr)); 2790 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr));
2787 2791
2788 // Write a byte. 2792 // Write a byte.
2789 const char kTestData[] = {'x'}; 2793 const char kTestData[] = {'x'};
2790 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 2794 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
2791 uint32_t num_bytes = kTestDataSize; 2795 uint32_t num_bytes = kTestDataSize;
2792 EXPECT_EQ(MOJO_RESULT_OK, 2796 EXPECT_EQ(MOJO_RESULT_OK,
2793 this->ProducerWriteData(UserPointer<const void>(kTestData), 2797 this->ProducerWriteData(UserPointer<const void>(kTestData),
2794 MakeUserPointer(&num_bytes), false)); 2798 MakeUserPointer(&num_bytes), false));
2795 EXPECT_EQ(kTestDataSize, num_bytes); 2799 EXPECT_EQ(kTestDataSize, num_bytes);
2796 2800
(...skipping 12 matching lines...) Expand all
2809 read_threshold_num_bytes = 123u; 2813 read_threshold_num_bytes = 123u;
2810 this->ConsumerGetOptions(&read_threshold_num_bytes); 2814 this->ConsumerGetOptions(&read_threshold_num_bytes);
2811 EXPECT_EQ(1u, read_threshold_num_bytes); 2815 EXPECT_EQ(1u, read_threshold_num_bytes);
2812 2816
2813 // Try to add a waiter: it should (still) already have the read threshold 2817 // Try to add a waiter: it should (still) already have the read threshold
2814 // signal. 2818 // signal.
2815 waiter.Init(); 2819 waiter.Init();
2816 hss = HandleSignalsState(); 2820 hss = HandleSignalsState();
2817 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, 2821 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
2818 this->ConsumerAddAwakable( 2822 this->ConsumerAddAwakable(
2819 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss)); 2823 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, &hss));
2820 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2824 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2821 hss.satisfied_signals); 2825 hss.satisfied_signals);
2822 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2826 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2823 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2827 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2824 hss.satisfiable_signals); 2828 hss.satisfiable_signals);
2825 2829
2826 // Set the read threshold to 2. 2830 // Set the read threshold to 2.
2827 this->ConsumerSetOptions(2); 2831 this->ConsumerSetOptions(2);
2828 read_threshold_num_bytes = 123u; 2832 read_threshold_num_bytes = 123u;
2829 this->ConsumerGetOptions(&read_threshold_num_bytes); 2833 this->ConsumerGetOptions(&read_threshold_num_bytes);
2830 EXPECT_EQ(2u, read_threshold_num_bytes); 2834 EXPECT_EQ(2u, read_threshold_num_bytes);
2831 2835
2832 // Add a waiter. 2836 // Add a waiter.
2833 waiter.Init(); 2837 waiter.Init();
2834 ASSERT_EQ(MOJO_RESULT_OK, 2838 ASSERT_EQ(MOJO_RESULT_OK,
2835 this->ConsumerAddAwakable( 2839 this->ConsumerAddAwakable(
2836 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr)); 2840 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr));
2837 2841
2838 // Write another byte. 2842 // Write another byte.
2839 num_bytes = kTestDataSize; 2843 num_bytes = kTestDataSize;
2840 EXPECT_EQ(MOJO_RESULT_OK, 2844 EXPECT_EQ(MOJO_RESULT_OK,
2841 this->ProducerWriteData(UserPointer<const void>(kTestData), 2845 this->ProducerWriteData(UserPointer<const void>(kTestData),
2842 MakeUserPointer(&num_bytes), false)); 2846 MakeUserPointer(&num_bytes), false));
2843 EXPECT_EQ(kTestDataSize, num_bytes); 2847 EXPECT_EQ(kTestDataSize, num_bytes);
2844 2848
2845 // Wait for the read threshold signal. 2849 // Wait for the read threshold signal.
2846 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr)); 2850 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
(...skipping 11 matching lines...) Expand all
2858 EXPECT_EQ(MOJO_RESULT_OK, 2862 EXPECT_EQ(MOJO_RESULT_OK,
2859 this->ConsumerReadData(UserPointer<void>(&read_byte), 2863 this->ConsumerReadData(UserPointer<void>(&read_byte),
2860 MakeUserPointer(&num_bytes), true, false)); 2864 MakeUserPointer(&num_bytes), true, false));
2861 EXPECT_EQ(1u, num_bytes); 2865 EXPECT_EQ(1u, num_bytes);
2862 EXPECT_EQ(kTestData[0], read_byte); 2866 EXPECT_EQ(kTestData[0], read_byte);
2863 2867
2864 // Add a waiter. 2868 // Add a waiter.
2865 waiter.Init(); 2869 waiter.Init();
2866 ASSERT_EQ(MOJO_RESULT_OK, 2870 ASSERT_EQ(MOJO_RESULT_OK,
2867 this->ConsumerAddAwakable( 2871 this->ConsumerAddAwakable(
2868 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr)); 2872 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr));
2869 // Trivial wait: it shouldn't have the read threshold signal. 2873 // Trivial wait: it shouldn't have the read threshold signal.
2870 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr)); 2874 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2871 hss = HandleSignalsState(); 2875 hss = HandleSignalsState();
2872 this->ConsumerRemoveAwakable(&waiter, &hss); 2876 this->ConsumerRemoveAwakable(&waiter, &hss);
2873 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 2877 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
2874 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2878 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2875 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2879 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2876 hss.satisfiable_signals); 2880 hss.satisfiable_signals);
2877 2881
2878 // Add a waiter. 2882 // Add a waiter.
2879 waiter.Init(); 2883 waiter.Init();
2880 ASSERT_EQ(MOJO_RESULT_OK, 2884 ASSERT_EQ(MOJO_RESULT_OK,
2881 this->ConsumerAddAwakable( 2885 this->ConsumerAddAwakable(
2882 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr)); 2886 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr));
2883 // Trivial wait: it shouldn't have the read threshold signal. 2887 // Trivial wait: it shouldn't have the read threshold signal.
2884 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr)); 2888 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
2885 hss = HandleSignalsState(); 2889 hss = HandleSignalsState();
2886 this->ConsumerRemoveAwakable(&waiter, &hss); 2890 this->ConsumerRemoveAwakable(&waiter, &hss);
2887 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 2891 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
2888 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2892 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2889 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2893 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2890 hss.satisfiable_signals); 2894 hss.satisfiable_signals);
2891 2895
2892 // Add a waiter. 2896 // Add a waiter.
2893 waiter.Init(); 2897 waiter.Init();
2894 ASSERT_EQ(MOJO_RESULT_OK, 2898 ASSERT_EQ(MOJO_RESULT_OK,
2895 this->ConsumerAddAwakable( 2899 this->ConsumerAddAwakable(
2896 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr)); 2900 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr));
2897 2901
2898 // Close the producer. 2902 // Close the producer.
2899 this->ProducerClose(); 2903 this->ProducerClose();
2900 2904
2901 // Wait; the current read threshold becomes never satisfiable. 2905 // Wait; the current read threshold becomes never satisfiable.
2902 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2906 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2903 waiter.Wait(test::TinyTimeout(), nullptr)); 2907 waiter.Wait(test::TinyTimeout(), nullptr));
2904 hss = HandleSignalsState(); 2908 hss = HandleSignalsState();
2905 this->ConsumerRemoveAwakable(&waiter, &hss); 2909 this->ConsumerRemoveAwakable(&waiter, &hss);
2906 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2910 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2907 hss.satisfied_signals); 2911 hss.satisfied_signals);
2908 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2912 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2909 hss.satisfiable_signals); 2913 hss.satisfiable_signals);
2910 2914
2911 // Set the read threshold back to zero to 0. 2915 // Set the read threshold back to zero to 0.
2912 this->ConsumerSetOptions(0); 2916 this->ConsumerSetOptions(0);
2913 read_threshold_num_bytes = 123u; 2917 read_threshold_num_bytes = 123u;
2914 this->ConsumerGetOptions(&read_threshold_num_bytes); 2918 this->ConsumerGetOptions(&read_threshold_num_bytes);
2915 // "Get options" should preserve 0 (and not set it to the element size). 2919 // "Get options" should preserve 0 (and not set it to the element size).
2916 EXPECT_EQ(0u, read_threshold_num_bytes); 2920 EXPECT_EQ(0u, read_threshold_num_bytes);
2917 2921
2918 // Try to add a waiter: it should have the read threshold signal. 2922 // Try to add a waiter: it should have the read threshold signal.
2919 waiter.Init(); 2923 waiter.Init();
2920 hss = HandleSignalsState(); 2924 hss = HandleSignalsState();
2921 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, 2925 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
2922 this->ConsumerAddAwakable( 2926 this->ConsumerAddAwakable(
2923 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss)); 2927 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, &hss));
2924 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2928 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2925 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2929 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2926 hss.satisfied_signals); 2930 hss.satisfied_signals);
2927 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2931 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2928 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2932 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2929 hss.satisfiable_signals); 2933 hss.satisfiable_signals);
2930 2934
2931 // Read the other byte. 2935 // Read the other byte.
2932 read_byte = 'a'; 2936 read_byte = 'a';
2933 num_bytes = sizeof(read_byte); 2937 num_bytes = sizeof(read_byte);
2934 EXPECT_EQ(MOJO_RESULT_OK, 2938 EXPECT_EQ(MOJO_RESULT_OK,
2935 this->ConsumerReadData(UserPointer<void>(&read_byte), 2939 this->ConsumerReadData(UserPointer<void>(&read_byte),
2936 MakeUserPointer(&num_bytes), true, false)); 2940 MakeUserPointer(&num_bytes), true, false));
2937 EXPECT_EQ(1u, num_bytes); 2941 EXPECT_EQ(1u, num_bytes);
2938 EXPECT_EQ(kTestData[0], read_byte); 2942 EXPECT_EQ(kTestData[0], read_byte);
2939 2943
2940 // Try to add a waiter: the read threshold signal should be unsatisfiable. 2944 // Try to add a waiter: the read threshold signal should be unsatisfiable.
2941 waiter.Init(); 2945 waiter.Init();
2942 hss = HandleSignalsState(); 2946 hss = HandleSignalsState();
2943 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2947 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2944 this->ConsumerAddAwakable( 2948 this->ConsumerAddAwakable(
2945 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss)); 2949 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, &hss));
2946 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2950 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2947 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2951 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2948 2952
2949 this->ConsumerClose(); 2953 this->ConsumerClose();
2950 } 2954 }
2951 2955
2952 } // namespace 2956 } // namespace
2953 } // namespace system 2957 } // namespace system
2954 } // namespace mojo 2958 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.cc ('k') | mojo/edk/system/data_pipe_producer_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698