Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(380)

Side by Side Diff: mojo/edk/system/data_pipe_impl_unittest.cc

Issue 2084593005: Rationalize AddAwakable...() and RemoveAwakable...() methods. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: doh Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // This file contains tests that are shared between different implementations of 5 // This file contains tests that are shared between different implementations of
6 // |DataPipeImpl|. 6 // |DataPipeImpl|.
7 7
8 #include "mojo/edk/system/data_pipe_impl.h" 8 #include "mojo/edk/system/data_pipe_impl.h"
9 9
10 #include <stdint.h> 10 #include <stdint.h>
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
133 return dpp()->ProducerWriteData(elements, num_bytes, all_or_none); 133 return dpp()->ProducerWriteData(elements, num_bytes, all_or_none);
134 } 134 }
135 MojoResult ProducerBeginWriteData(UserPointer<void*> buffer, 135 MojoResult ProducerBeginWriteData(UserPointer<void*> buffer,
136 UserPointer<uint32_t> buffer_num_bytes) { 136 UserPointer<uint32_t> buffer_num_bytes) {
137 return dpp()->ProducerBeginWriteData(buffer, buffer_num_bytes); 137 return dpp()->ProducerBeginWriteData(buffer, buffer_num_bytes);
138 } 138 }
139 MojoResult ProducerEndWriteData(uint32_t num_bytes_written) { 139 MojoResult ProducerEndWriteData(uint32_t num_bytes_written) {
140 return dpp()->ProducerEndWriteData(num_bytes_written); 140 return dpp()->ProducerEndWriteData(num_bytes_written);
141 } 141 }
142 MojoResult ProducerAddAwakable(Awakable* awakable, 142 MojoResult ProducerAddAwakable(Awakable* awakable,
143 uint64_t context,
144 bool force,
143 MojoHandleSignals signals, 145 MojoHandleSignals signals,
144 bool force,
145 uint64_t context,
146 HandleSignalsState* signals_state) { 146 HandleSignalsState* signals_state) {
147 return dpp()->ProducerAddAwakable(awakable, signals, force, context, 147 return dpp()->ProducerAddAwakable(awakable, context, force, signals,
148 signals_state); 148 signals_state);
149 } 149 }
150 void ProducerRemoveAwakable(Awakable* awakable, 150 void ProducerRemoveAwakable(bool match_context,
151 Awakable* awakable,
152 uint64_t context,
151 HandleSignalsState* signals_state) { 153 HandleSignalsState* signals_state) {
152 return dpp()->ProducerRemoveAwakable(awakable, signals_state); 154 return dpp()->ProducerRemoveAwakable(match_context, awakable, context,
155 signals_state);
153 } 156 }
154 157
155 void ConsumerClose() { helper_->ConsumerClose(); } 158 void ConsumerClose() { helper_->ConsumerClose(); }
156 MojoResult ConsumerSetOptions(uint32_t read_threshold_num_bytes) { 159 MojoResult ConsumerSetOptions(uint32_t read_threshold_num_bytes) {
157 return dpc()->ConsumerSetOptions(read_threshold_num_bytes); 160 return dpc()->ConsumerSetOptions(read_threshold_num_bytes);
158 } 161 }
159 void ConsumerGetOptions(uint32_t* read_threshold_num_bytes) { 162 void ConsumerGetOptions(uint32_t* read_threshold_num_bytes) {
160 dpc()->ConsumerGetOptions(read_threshold_num_bytes); 163 dpc()->ConsumerGetOptions(read_threshold_num_bytes);
161 } 164 }
162 MojoResult ConsumerReadData(UserPointer<void> elements, 165 MojoResult ConsumerReadData(UserPointer<void> elements,
(...skipping 10 matching lines...) Expand all
173 return dpc()->ConsumerQueryData(num_bytes); 176 return dpc()->ConsumerQueryData(num_bytes);
174 } 177 }
175 MojoResult ConsumerBeginReadData(UserPointer<const void*> buffer, 178 MojoResult ConsumerBeginReadData(UserPointer<const void*> buffer,
176 UserPointer<uint32_t> buffer_num_bytes) { 179 UserPointer<uint32_t> buffer_num_bytes) {
177 return dpc()->ConsumerBeginReadData(buffer, buffer_num_bytes); 180 return dpc()->ConsumerBeginReadData(buffer, buffer_num_bytes);
178 } 181 }
179 MojoResult ConsumerEndReadData(uint32_t num_bytes_read) { 182 MojoResult ConsumerEndReadData(uint32_t num_bytes_read) {
180 return dpc()->ConsumerEndReadData(num_bytes_read); 183 return dpc()->ConsumerEndReadData(num_bytes_read);
181 } 184 }
182 MojoResult ConsumerAddAwakable(Awakable* awakable, 185 MojoResult ConsumerAddAwakable(Awakable* awakable,
186 uint64_t context,
187 bool force,
183 MojoHandleSignals signals, 188 MojoHandleSignals signals,
184 bool force,
185 uint64_t context,
186 HandleSignalsState* signals_state) { 189 HandleSignalsState* signals_state) {
187 return dpc()->ConsumerAddAwakable(awakable, signals, force, context, 190 return dpc()->ConsumerAddAwakable(awakable, context, force, signals,
188 signals_state); 191 signals_state);
189 } 192 }
190 void ConsumerRemoveAwakable(Awakable* awakable, 193 void ConsumerRemoveAwakable(bool match_context,
194 Awakable* awakable,
195 uint64_t context,
191 HandleSignalsState* signals_state) { 196 HandleSignalsState* signals_state) {
192 return dpc()->ConsumerRemoveAwakable(awakable, signals_state); 197 return dpc()->ConsumerRemoveAwakable(match_context, awakable, context,
198 signals_state);
193 } 199 }
194 200
195 private: 201 private:
196 DataPipe* dpp() { return helper_->DataPipeForProducer(); } 202 DataPipe* dpp() { return helper_->DataPipeForProducer(); }
197 DataPipe* dpc() { return helper_->DataPipeForConsumer(); } 203 DataPipe* dpc() { return helper_->DataPipeForConsumer(); }
198 204
199 std::unique_ptr<Helper> helper_; 205 std::unique_ptr<Helper> helper_;
200 206
201 MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeImplTest); 207 MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeImplTest);
202 }; 208 };
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
274 Handle* handle_to_receive) { 280 Handle* handle_to_receive) {
275 DCHECK(source_i == 0 || source_i == 1); 281 DCHECK(source_i == 0 || source_i == 1);
276 size_t dest_i = source_i ^ 1; 282 size_t dest_i = source_i ^ 1;
277 283
278 // Write the dispatcher to MP |source_i| (port 0). Wait and receive on MP 284 // Write the dispatcher to MP |source_i| (port 0). Wait and receive on MP
279 // |dest_i| (port 0). (Add the waiter first, to avoid any handling the case 285 // |dest_i| (port 0). (Add the waiter first, to avoid any handling the case
280 // where it's already readable.) 286 // where it's already readable.)
281 Waiter waiter; 287 Waiter waiter;
282 waiter.Init(); 288 waiter.Init();
283 ASSERT_EQ(MOJO_RESULT_OK, message_pipe(dest_i)->AddAwakable( 289 ASSERT_EQ(MOJO_RESULT_OK, message_pipe(dest_i)->AddAwakable(
284 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 290 0, &waiter, 987, false,
285 false, 987, nullptr)); 291 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
286 { 292 {
287 HandleTransport transport(test::HandleTryStartTransport(handle_to_send)); 293 HandleTransport transport(test::HandleTryStartTransport(handle_to_send));
288 ASSERT_TRUE(transport.is_valid()); 294 ASSERT_TRUE(transport.is_valid());
289 295
290 std::vector<HandleTransport> transports; 296 std::vector<HandleTransport> transports;
291 transports.push_back(transport); 297 transports.push_back(transport);
292 ASSERT_EQ(MOJO_RESULT_OK, message_pipe(source_i)->WriteMessage( 298 ASSERT_EQ(MOJO_RESULT_OK, message_pipe(source_i)->WriteMessage(
293 0, NullUserPointer(), 0, &transports, 299 0, NullUserPointer(), 0, &transports,
294 MOJO_WRITE_MESSAGE_FLAG_NONE)); 300 MOJO_WRITE_MESSAGE_FLAG_NONE));
295 transport.End(); 301 transport.End();
296 } 302 }
297 uint64_t context = 0; 303 uint64_t context = 0;
298 ASSERT_EQ(MOJO_RESULT_OK, 304 ASSERT_EQ(MOJO_RESULT_OK,
299 waiter.Wait(test::ActionTimeout(), &context, nullptr)); 305 waiter.Wait(test::ActionTimeout(), &context, nullptr));
300 EXPECT_EQ(987u, context); 306 EXPECT_EQ(987u, context);
301 HandleSignalsState hss = HandleSignalsState(); 307 HandleSignalsState hss = HandleSignalsState();
302 message_pipe(dest_i)->RemoveAwakable(0, &waiter, &hss); 308 message_pipe(dest_i)->RemoveAwakable(0, false, &waiter, 0, &hss);
303 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 309 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
304 hss.satisfied_signals); 310 hss.satisfied_signals);
305 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE | 311 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
306 MOJO_HANDLE_SIGNAL_PEER_CLOSED, 312 MOJO_HANDLE_SIGNAL_PEER_CLOSED,
307 hss.satisfiable_signals); 313 hss.satisfiable_signals);
308 char read_buffer[100] = {}; 314 char read_buffer[100] = {};
309 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 315 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
310 HandleVector read_handles; 316 HandleVector read_handles;
311 uint32_t read_num_handles = 10; // Maximum to get. 317 uint32_t read_num_handles = 10; // Maximum to get.
312 ASSERT_EQ(MOJO_RESULT_OK, 318 ASSERT_EQ(MOJO_RESULT_OK,
(...skipping 353 matching lines...) Expand 10 before | Expand all | Expand 10 after
666 672
667 // Read with invalid |num_bytes|. 673 // Read with invalid |num_bytes|.
668 num_bytes = sizeof(elements[0]) + 1; 674 num_bytes = sizeof(elements[0]) + 1;
669 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, 675 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
670 this->ConsumerReadData(UserPointer<void>(elements), 676 this->ConsumerReadData(UserPointer<void>(elements),
671 MakeUserPointer(&num_bytes), false, false)); 677 MakeUserPointer(&num_bytes), false, false));
672 678
673 // For remote data pipes, we'll have to wait; add the waiter before writing. 679 // For remote data pipes, we'll have to wait; add the waiter before writing.
674 waiter.Init(); 680 waiter.Init();
675 ASSERT_EQ(MOJO_RESULT_OK, 681 ASSERT_EQ(MOJO_RESULT_OK,
676 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 682 this->ConsumerAddAwakable(&waiter, 123, false,
677 false, 123, nullptr)); 683 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
678 684
679 // Write two elements. 685 // Write two elements.
680 elements[0] = 123; 686 elements[0] = 123;
681 elements[1] = 456; 687 elements[1] = 456;
682 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 688 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
683 EXPECT_EQ(MOJO_RESULT_OK, 689 EXPECT_EQ(MOJO_RESULT_OK,
684 this->ProducerWriteData(UserPointer<const void>(elements), 690 this->ProducerWriteData(UserPointer<const void>(elements),
685 MakeUserPointer(&num_bytes), false)); 691 MakeUserPointer(&num_bytes), false));
686 // It should have written everything (even without "all or none"). 692 // It should have written everything (even without "all or none").
687 EXPECT_EQ(2u * sizeof(elements[0]), num_bytes); 693 EXPECT_EQ(2u * sizeof(elements[0]), num_bytes);
688 694
689 // Wait. 695 // Wait.
690 context = 0; 696 context = 0;
691 EXPECT_EQ(MOJO_RESULT_OK, 697 EXPECT_EQ(MOJO_RESULT_OK,
692 waiter.Wait(test::ActionTimeout(), &context, nullptr)); 698 waiter.Wait(test::ActionTimeout(), &context, nullptr));
693 EXPECT_EQ(123u, context); 699 EXPECT_EQ(123u, context);
694 hss = HandleSignalsState(); 700 hss = HandleSignalsState();
695 this->ConsumerRemoveAwakable(&waiter, &hss); 701 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
696 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 702 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
697 hss.satisfied_signals); 703 hss.satisfied_signals);
698 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 704 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
699 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 705 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
700 hss.satisfiable_signals); 706 hss.satisfiable_signals);
701 707
702 // Query. 708 // Query.
703 // TODO(vtl): It's theoretically possible (though not with the current 709 // TODO(vtl): It's theoretically possible (though not with the current
704 // implementation/configured limits) that not all the data has arrived yet. 710 // implementation/configured limits) that not all the data has arrived yet.
705 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...| 711 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
795 801
796 Waiter pwaiter; // For producer. 802 Waiter pwaiter; // For producer.
797 Waiter cwaiter; // For consumer. 803 Waiter cwaiter; // For consumer.
798 HandleSignalsState hss; 804 HandleSignalsState hss;
799 uint64_t context; 805 uint64_t context;
800 806
801 // Never readable. 807 // Never readable.
802 pwaiter.Init(); 808 pwaiter.Init();
803 hss = HandleSignalsState(); 809 hss = HandleSignalsState();
804 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 810 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
805 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 811 this->ProducerAddAwakable(&pwaiter, 12, false,
806 false, 12, &hss)); 812 MOJO_HANDLE_SIGNAL_READABLE, &hss));
807 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 813 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
808 hss.satisfied_signals); 814 hss.satisfied_signals);
809 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 815 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
810 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 816 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
811 hss.satisfiable_signals); 817 hss.satisfiable_signals);
812 818
813 // Already writable. 819 // Already writable.
814 pwaiter.Init(); 820 pwaiter.Init();
815 hss = HandleSignalsState(); 821 hss = HandleSignalsState();
816 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 822 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
817 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 823 this->ProducerAddAwakable(&pwaiter, 34, false,
818 false, 34, &hss)); 824 MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
819 825
820 // We'll need to wait for readability for the remote cases. 826 // We'll need to wait for readability for the remote cases.
821 cwaiter.Init(); 827 cwaiter.Init();
822 ASSERT_EQ(MOJO_RESULT_OK, 828 ASSERT_EQ(MOJO_RESULT_OK,
823 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 829 this->ConsumerAddAwakable(&cwaiter, 1234, false,
824 false, 1234, nullptr)); 830 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
825 831
826 // Write two elements. 832 // Write two elements.
827 int32_t elements[2] = {123, 456}; 833 int32_t elements[2] = {123, 456};
828 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 834 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
829 EXPECT_EQ(MOJO_RESULT_OK, 835 EXPECT_EQ(MOJO_RESULT_OK,
830 this->ProducerWriteData(UserPointer<const void>(elements), 836 this->ProducerWriteData(UserPointer<const void>(elements),
831 MakeUserPointer(&num_bytes), true)); 837 MakeUserPointer(&num_bytes), true));
832 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 838 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
833 839
834 // Adding a waiter should now succeed. 840 // Adding a waiter should now succeed.
835 pwaiter.Init(); 841 pwaiter.Init();
836 ASSERT_EQ(MOJO_RESULT_OK, 842 ASSERT_EQ(MOJO_RESULT_OK,
837 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 843 this->ProducerAddAwakable(&pwaiter, 56, false,
838 false, 56, nullptr)); 844 MOJO_HANDLE_SIGNAL_WRITABLE, nullptr));
839 // And it shouldn't be writable yet. 845 // And it shouldn't be writable yet.
840 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr, nullptr)); 846 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr, nullptr));
841 hss = HandleSignalsState(); 847 hss = HandleSignalsState();
842 this->ProducerRemoveAwakable(&pwaiter, &hss); 848 this->ProducerRemoveAwakable(false, &pwaiter, 0, &hss);
843 EXPECT_EQ(0u, hss.satisfied_signals); 849 EXPECT_EQ(0u, hss.satisfied_signals);
844 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 850 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
845 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 851 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
846 hss.satisfiable_signals); 852 hss.satisfiable_signals);
847 853
848 // Wait for data to become available to the consumer. 854 // Wait for data to become available to the consumer.
849 context = 0; 855 context = 0;
850 EXPECT_EQ(MOJO_RESULT_OK, 856 EXPECT_EQ(MOJO_RESULT_OK,
851 cwaiter.Wait(test::TinyTimeout(), &context, nullptr)); 857 cwaiter.Wait(test::TinyTimeout(), &context, nullptr));
852 EXPECT_EQ(1234u, context); 858 EXPECT_EQ(1234u, context);
853 hss = HandleSignalsState(); 859 hss = HandleSignalsState();
854 this->ConsumerRemoveAwakable(&cwaiter, &hss); 860 this->ConsumerRemoveAwakable(false, &cwaiter, 0, &hss);
855 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 861 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
856 hss.satisfied_signals); 862 hss.satisfied_signals);
857 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 863 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
858 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 864 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
859 hss.satisfiable_signals); 865 hss.satisfiable_signals);
860 866
861 // Peek one element. 867 // Peek one element.
862 elements[0] = -1; 868 elements[0] = -1;
863 elements[1] = -1; 869 elements[1] = -1;
864 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 870 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
865 EXPECT_EQ(MOJO_RESULT_OK, 871 EXPECT_EQ(MOJO_RESULT_OK,
866 this->ConsumerReadData(UserPointer<void>(elements), 872 this->ConsumerReadData(UserPointer<void>(elements),
867 MakeUserPointer(&num_bytes), true, true)); 873 MakeUserPointer(&num_bytes), true, true));
868 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 874 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
869 EXPECT_EQ(123, elements[0]); 875 EXPECT_EQ(123, elements[0]);
870 EXPECT_EQ(-1, elements[1]); 876 EXPECT_EQ(-1, elements[1]);
871 877
872 // Add a waiter. 878 // Add a waiter.
873 pwaiter.Init(); 879 pwaiter.Init();
874 ASSERT_EQ(MOJO_RESULT_OK, 880 ASSERT_EQ(MOJO_RESULT_OK,
875 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 881 this->ProducerAddAwakable(&pwaiter, 56, false,
876 false, 56, nullptr)); 882 MOJO_HANDLE_SIGNAL_WRITABLE, nullptr));
877 // And it still shouldn't be writable yet. 883 // And it still shouldn't be writable yet.
878 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr, nullptr)); 884 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr, nullptr));
879 hss = HandleSignalsState(); 885 hss = HandleSignalsState();
880 this->ProducerRemoveAwakable(&pwaiter, &hss); 886 this->ProducerRemoveAwakable(false, &pwaiter, 0, &hss);
881 EXPECT_EQ(0u, hss.satisfied_signals); 887 EXPECT_EQ(0u, hss.satisfied_signals);
882 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 888 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
883 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 889 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
884 hss.satisfiable_signals); 890 hss.satisfiable_signals);
885 891
886 // Do it again. 892 // Do it again.
887 pwaiter.Init(); 893 pwaiter.Init();
888 ASSERT_EQ(MOJO_RESULT_OK, 894 ASSERT_EQ(MOJO_RESULT_OK,
889 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 895 this->ProducerAddAwakable(&pwaiter, 78, false,
890 false, 78, nullptr)); 896 MOJO_HANDLE_SIGNAL_WRITABLE, nullptr));
891 897
892 // Read one element. 898 // Read one element.
893 elements[0] = -1; 899 elements[0] = -1;
894 elements[1] = -1; 900 elements[1] = -1;
895 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 901 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
896 EXPECT_EQ(MOJO_RESULT_OK, 902 EXPECT_EQ(MOJO_RESULT_OK,
897 this->ConsumerReadData(UserPointer<void>(elements), 903 this->ConsumerReadData(UserPointer<void>(elements),
898 MakeUserPointer(&num_bytes), true, false)); 904 MakeUserPointer(&num_bytes), true, false));
899 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 905 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
900 EXPECT_EQ(123, elements[0]); 906 EXPECT_EQ(123, elements[0]);
901 EXPECT_EQ(-1, elements[1]); 907 EXPECT_EQ(-1, elements[1]);
902 908
903 // Waiting should now succeed. 909 // Waiting should now succeed.
904 context = 0; 910 context = 0;
905 EXPECT_EQ(MOJO_RESULT_OK, 911 EXPECT_EQ(MOJO_RESULT_OK,
906 pwaiter.Wait(test::TinyTimeout(), &context, nullptr)); 912 pwaiter.Wait(test::TinyTimeout(), &context, nullptr));
907 EXPECT_EQ(78u, context); 913 EXPECT_EQ(78u, context);
908 hss = HandleSignalsState(); 914 hss = HandleSignalsState();
909 this->ProducerRemoveAwakable(&pwaiter, &hss); 915 this->ProducerRemoveAwakable(false, &pwaiter, 0, &hss);
910 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 916 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
911 hss.satisfied_signals); 917 hss.satisfied_signals);
912 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 918 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
913 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 919 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
914 hss.satisfiable_signals); 920 hss.satisfiable_signals);
915 921
916 // Try writing, using a two-phase write. 922 // Try writing, using a two-phase write.
917 void* buffer = nullptr; 923 void* buffer = nullptr;
918 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 924 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
919 EXPECT_EQ(MOJO_RESULT_OK, 925 EXPECT_EQ(MOJO_RESULT_OK,
920 this->ProducerBeginWriteData(MakeUserPointer(&buffer), 926 this->ProducerBeginWriteData(MakeUserPointer(&buffer),
921 MakeUserPointer(&num_bytes))); 927 MakeUserPointer(&num_bytes)));
922 EXPECT_TRUE(buffer); 928 EXPECT_TRUE(buffer);
923 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 929 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
924 930
925 static_cast<int32_t*>(buffer)[0] = 789; 931 static_cast<int32_t*>(buffer)[0] = 789;
926 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>( 932 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>(
927 1u * sizeof(elements[0])))); 933 1u * sizeof(elements[0]))));
928 934
929 // Add a waiter. 935 // Add a waiter.
930 pwaiter.Init(); 936 pwaiter.Init();
931 ASSERT_EQ(MOJO_RESULT_OK, 937 ASSERT_EQ(MOJO_RESULT_OK,
932 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 938 this->ProducerAddAwakable(&pwaiter, 90, false,
933 false, 90, nullptr)); 939 MOJO_HANDLE_SIGNAL_WRITABLE, nullptr));
934 940
935 // Read one element, using a two-phase read. 941 // Read one element, using a two-phase read.
936 const void* read_buffer = nullptr; 942 const void* read_buffer = nullptr;
937 num_bytes = 0u; 943 num_bytes = 0u;
938 EXPECT_EQ(MOJO_RESULT_OK, 944 EXPECT_EQ(MOJO_RESULT_OK,
939 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer), 945 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
940 MakeUserPointer(&num_bytes))); 946 MakeUserPointer(&num_bytes)));
941 EXPECT_TRUE(read_buffer); 947 EXPECT_TRUE(read_buffer);
942 // Since we only read one element (after having written three in all), the 948 // Since we only read one element (after having written three in all), the
943 // two-phase read should only allow us to read one. This checks an 949 // two-phase read should only allow us to read one. This checks an
944 // implementation detail! 950 // implementation detail!
945 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 951 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
946 EXPECT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]); 952 EXPECT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
947 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>( 953 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
948 1u * sizeof(elements[0])))); 954 1u * sizeof(elements[0]))));
949 955
950 // Waiting should succeed. 956 // Waiting should succeed.
951 context = 0; 957 context = 0;
952 EXPECT_EQ(MOJO_RESULT_OK, 958 EXPECT_EQ(MOJO_RESULT_OK,
953 pwaiter.Wait(test::TinyTimeout(), &context, nullptr)); 959 pwaiter.Wait(test::TinyTimeout(), &context, nullptr));
954 EXPECT_EQ(90u, context); 960 EXPECT_EQ(90u, context);
955 hss = HandleSignalsState(); 961 hss = HandleSignalsState();
956 this->ProducerRemoveAwakable(&pwaiter, &hss); 962 this->ProducerRemoveAwakable(false, &pwaiter, 0, &hss);
957 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 963 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
958 hss.satisfied_signals); 964 hss.satisfied_signals);
959 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 965 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
960 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 966 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
961 hss.satisfiable_signals); 967 hss.satisfiable_signals);
962 968
963 // Write one element. 969 // Write one element.
964 elements[0] = 123; 970 elements[0] = 123;
965 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 971 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
966 EXPECT_EQ(MOJO_RESULT_OK, 972 EXPECT_EQ(MOJO_RESULT_OK,
967 this->ProducerWriteData(UserPointer<const void>(elements), 973 this->ProducerWriteData(UserPointer<const void>(elements),
968 MakeUserPointer(&num_bytes), false)); 974 MakeUserPointer(&num_bytes), false));
969 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 975 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
970 976
971 // Add a waiter. 977 // Add a waiter.
972 pwaiter.Init(); 978 pwaiter.Init();
973 ASSERT_EQ(MOJO_RESULT_OK, 979 ASSERT_EQ(MOJO_RESULT_OK,
974 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 980 this->ProducerAddAwakable(&pwaiter, 12, false,
975 false, 12, nullptr)); 981 MOJO_HANDLE_SIGNAL_WRITABLE, nullptr));
976 982
977 // Close the consumer. 983 // Close the consumer.
978 this->ConsumerClose(); 984 this->ConsumerClose();
979 985
980 // It should now be never-writable. 986 // It should now be never-writable.
981 context = 0; 987 context = 0;
982 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 988 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
983 pwaiter.Wait(test::TinyTimeout(), &context, nullptr)); 989 pwaiter.Wait(test::TinyTimeout(), &context, nullptr));
984 EXPECT_EQ(12u, context); 990 EXPECT_EQ(12u, context);
985 hss = HandleSignalsState(); 991 hss = HandleSignalsState();
986 this->ProducerRemoveAwakable(&pwaiter, &hss); 992 this->ProducerRemoveAwakable(false, &pwaiter, 0, &hss);
987 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 993 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
988 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 994 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
989 995
990 this->ProducerClose(); 996 this->ProducerClose();
991 } 997 }
992 998
993 TYPED_TEST(DataPipeImplTest, PeerClosedProducerWaiting) { 999 TYPED_TEST(DataPipeImplTest, PeerClosedProducerWaiting) {
994 const MojoCreateDataPipeOptions options = { 1000 const MojoCreateDataPipeOptions options = {
995 kSizeOfOptions, // |struct_size|. 1001 kSizeOfOptions, // |struct_size|.
996 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1002 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
997 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1003 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
998 2 * sizeof(int32_t) // |capacity_num_bytes|. 1004 2 * sizeof(int32_t) // |capacity_num_bytes|.
999 }; 1005 };
1000 this->Create(options); 1006 this->Create(options);
1001 this->DoTransfer(); 1007 this->DoTransfer();
1002 1008
1003 Waiter waiter; 1009 Waiter waiter;
1004 HandleSignalsState hss; 1010 HandleSignalsState hss;
1005 uint64_t context; 1011 uint64_t context;
1006 1012
1007 // Add a waiter. 1013 // Add a waiter.
1008 waiter.Init(); 1014 waiter.Init();
1009 ASSERT_EQ(MOJO_RESULT_OK, 1015 ASSERT_EQ(MOJO_RESULT_OK,
1010 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1016 this->ProducerAddAwakable(&waiter, 12, false,
1011 false, 12, nullptr)); 1017 MOJO_HANDLE_SIGNAL_PEER_CLOSED, nullptr));
1012 1018
1013 // Close the consumer. 1019 // Close the consumer.
1014 this->ConsumerClose(); 1020 this->ConsumerClose();
1015 1021
1016 // It should be signaled. 1022 // It should be signaled.
1017 context = 0; 1023 context = 0;
1018 EXPECT_EQ(MOJO_RESULT_OK, 1024 EXPECT_EQ(MOJO_RESULT_OK,
1019 waiter.Wait(test::TinyTimeout(), &context, nullptr)); 1025 waiter.Wait(test::TinyTimeout(), &context, nullptr));
1020 EXPECT_EQ(12u, context); 1026 EXPECT_EQ(12u, context);
1021 hss = HandleSignalsState(); 1027 hss = HandleSignalsState();
1022 this->ProducerRemoveAwakable(&waiter, &hss); 1028 this->ProducerRemoveAwakable(false, &waiter, 0, &hss);
1023 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1029 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1024 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 1030 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1025 1031
1026 this->ProducerClose(); 1032 this->ProducerClose();
1027 } 1033 }
1028 1034
1029 TYPED_TEST(DataPipeImplTest, PeerClosedConsumerWaiting) { 1035 TYPED_TEST(DataPipeImplTest, PeerClosedConsumerWaiting) {
1030 const MojoCreateDataPipeOptions options = { 1036 const MojoCreateDataPipeOptions options = {
1031 kSizeOfOptions, // |struct_size|. 1037 kSizeOfOptions, // |struct_size|.
1032 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1038 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1033 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1039 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1034 2 * sizeof(int32_t) // |capacity_num_bytes|. 1040 2 * sizeof(int32_t) // |capacity_num_bytes|.
1035 }; 1041 };
1036 this->Create(options); 1042 this->Create(options);
1037 this->DoTransfer(); 1043 this->DoTransfer();
1038 1044
1039 Waiter waiter; 1045 Waiter waiter;
1040 HandleSignalsState hss; 1046 HandleSignalsState hss;
1041 uint64_t context; 1047 uint64_t context;
1042 1048
1043 // Add a waiter. 1049 // Add a waiter.
1044 waiter.Init(); 1050 waiter.Init();
1045 ASSERT_EQ(MOJO_RESULT_OK, 1051 ASSERT_EQ(MOJO_RESULT_OK,
1046 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1052 this->ConsumerAddAwakable(&waiter, 12, false,
1047 false, 12, nullptr)); 1053 MOJO_HANDLE_SIGNAL_PEER_CLOSED, nullptr));
1048 1054
1049 // Close the producer. 1055 // Close the producer.
1050 this->ProducerClose(); 1056 this->ProducerClose();
1051 1057
1052 // It should be signaled. 1058 // It should be signaled.
1053 context = 0; 1059 context = 0;
1054 EXPECT_EQ(MOJO_RESULT_OK, 1060 EXPECT_EQ(MOJO_RESULT_OK,
1055 waiter.Wait(test::TinyTimeout(), &context, nullptr)); 1061 waiter.Wait(test::TinyTimeout(), &context, nullptr));
1056 EXPECT_EQ(12u, context); 1062 EXPECT_EQ(12u, context);
1057 hss = HandleSignalsState(); 1063 hss = HandleSignalsState();
1058 this->ConsumerRemoveAwakable(&waiter, &hss); 1064 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1059 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1065 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1060 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 1066 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1061 1067
1062 this->ConsumerClose(); 1068 this->ConsumerClose();
1063 } 1069 }
1064 1070
1065 TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) { 1071 TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
1066 const MojoCreateDataPipeOptions options = { 1072 const MojoCreateDataPipeOptions options = {
1067 kSizeOfOptions, // |struct_size|. 1073 kSizeOfOptions, // |struct_size|.
1068 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1074 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1069 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1075 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1070 1000 * sizeof(int32_t) // |capacity_num_bytes|. 1076 1000 * sizeof(int32_t) // |capacity_num_bytes|.
1071 }; 1077 };
1072 this->Create(options); 1078 this->Create(options);
1073 this->DoTransfer(); 1079 this->DoTransfer();
1074 1080
1075 Waiter waiter; 1081 Waiter waiter;
1076 Waiter waiter2; 1082 Waiter waiter2;
1077 HandleSignalsState hss; 1083 HandleSignalsState hss;
1078 uint64_t context; 1084 uint64_t context;
1079 1085
1080 // Never writable. 1086 // Never writable.
1081 waiter.Init(); 1087 waiter.Init();
1082 hss = HandleSignalsState(); 1088 hss = HandleSignalsState();
1083 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1089 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1084 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1090 this->ConsumerAddAwakable(&waiter, 12, false,
1085 false, 12, &hss)); 1091 MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
1086 EXPECT_EQ(0u, hss.satisfied_signals); 1092 EXPECT_EQ(0u, hss.satisfied_signals);
1087 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1093 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1088 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1094 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1089 hss.satisfiable_signals); 1095 hss.satisfiable_signals);
1090 1096
1091 // Add waiter: not yet readable. 1097 // Add waiter: not yet readable.
1092 waiter.Init(); 1098 waiter.Init();
1093 ASSERT_EQ(MOJO_RESULT_OK, 1099 ASSERT_EQ(MOJO_RESULT_OK,
1094 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1100 this->ConsumerAddAwakable(&waiter, 34, false,
1095 false, 34, nullptr)); 1101 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1096 1102
1097 // Write two elements. 1103 // Write two elements.
1098 int32_t elements[2] = {123, 456}; 1104 int32_t elements[2] = {123, 456};
1099 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 1105 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
1100 EXPECT_EQ(MOJO_RESULT_OK, 1106 EXPECT_EQ(MOJO_RESULT_OK,
1101 this->ProducerWriteData(UserPointer<const void>(elements), 1107 this->ProducerWriteData(UserPointer<const void>(elements),
1102 MakeUserPointer(&num_bytes), true)); 1108 MakeUserPointer(&num_bytes), true));
1103 1109
1104 // Wait for readability (needed for remote cases). 1110 // Wait for readability (needed for remote cases).
1105 context = 0; 1111 context = 0;
1106 EXPECT_EQ(MOJO_RESULT_OK, 1112 EXPECT_EQ(MOJO_RESULT_OK,
1107 waiter.Wait(test::TinyTimeout(), &context, nullptr)); 1113 waiter.Wait(test::TinyTimeout(), &context, nullptr));
1108 EXPECT_EQ(34u, context); 1114 EXPECT_EQ(34u, context);
1109 hss = HandleSignalsState(); 1115 hss = HandleSignalsState();
1110 this->ConsumerRemoveAwakable(&waiter, &hss); 1116 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1111 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1117 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1112 hss.satisfied_signals); 1118 hss.satisfied_signals);
1113 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1119 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1114 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1120 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1115 hss.satisfiable_signals); 1121 hss.satisfiable_signals);
1116 1122
1117 // Discard one element. 1123 // Discard one element.
1118 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1124 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1119 EXPECT_EQ(MOJO_RESULT_OK, 1125 EXPECT_EQ(MOJO_RESULT_OK,
1120 this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true)); 1126 this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1121 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1127 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1122 1128
1123 // Should still be readable. 1129 // Should still be readable.
1124 waiter.Init(); 1130 waiter.Init();
1125 hss = HandleSignalsState(); 1131 hss = HandleSignalsState();
1126 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1132 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1127 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1133 this->ConsumerAddAwakable(&waiter, 78, false,
1128 false, 78, &hss)); 1134 MOJO_HANDLE_SIGNAL_READABLE, &hss));
1129 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1135 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1130 hss.satisfied_signals); 1136 hss.satisfied_signals);
1131 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1137 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1132 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1138 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1133 hss.satisfiable_signals); 1139 hss.satisfiable_signals);
1134 1140
1135 // Peek one element. 1141 // Peek one element.
1136 elements[0] = -1; 1142 elements[0] = -1;
1137 elements[1] = -1; 1143 elements[1] = -1;
1138 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1144 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1139 EXPECT_EQ(MOJO_RESULT_OK, 1145 EXPECT_EQ(MOJO_RESULT_OK,
1140 this->ConsumerReadData(UserPointer<void>(elements), 1146 this->ConsumerReadData(UserPointer<void>(elements),
1141 MakeUserPointer(&num_bytes), true, true)); 1147 MakeUserPointer(&num_bytes), true, true));
1142 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1148 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1143 EXPECT_EQ(456, elements[0]); 1149 EXPECT_EQ(456, elements[0]);
1144 EXPECT_EQ(-1, elements[1]); 1150 EXPECT_EQ(-1, elements[1]);
1145 1151
1146 // Should still be readable. 1152 // Should still be readable.
1147 waiter.Init(); 1153 waiter.Init();
1148 hss = HandleSignalsState(); 1154 hss = HandleSignalsState();
1149 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1155 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1150 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1156 this->ConsumerAddAwakable(&waiter, 78, false,
1151 false, 78, &hss)); 1157 MOJO_HANDLE_SIGNAL_READABLE, &hss));
1152 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1158 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1153 hss.satisfied_signals); 1159 hss.satisfied_signals);
1154 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1160 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1155 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1161 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1156 hss.satisfiable_signals); 1162 hss.satisfiable_signals);
1157 1163
1158 // Read one element. 1164 // Read one element.
1159 elements[0] = -1; 1165 elements[0] = -1;
1160 elements[1] = -1; 1166 elements[1] = -1;
1161 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1167 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1162 EXPECT_EQ(MOJO_RESULT_OK, 1168 EXPECT_EQ(MOJO_RESULT_OK,
1163 this->ConsumerReadData(UserPointer<void>(elements), 1169 this->ConsumerReadData(UserPointer<void>(elements),
1164 MakeUserPointer(&num_bytes), true, false)); 1170 MakeUserPointer(&num_bytes), true, false));
1165 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1171 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1166 EXPECT_EQ(456, elements[0]); 1172 EXPECT_EQ(456, elements[0]);
1167 EXPECT_EQ(-1, elements[1]); 1173 EXPECT_EQ(-1, elements[1]);
1168 1174
1169 // Adding a waiter should now succeed. 1175 // Adding a waiter should now succeed.
1170 waiter.Init(); 1176 waiter.Init();
1171 ASSERT_EQ(MOJO_RESULT_OK, 1177 ASSERT_EQ(MOJO_RESULT_OK,
1172 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1178 this->ConsumerAddAwakable(&waiter, 90, false,
1173 false, 90, nullptr)); 1179 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1174 1180
1175 // Write one element. 1181 // Write one element.
1176 elements[0] = 789; 1182 elements[0] = 789;
1177 elements[1] = -1; 1183 elements[1] = -1;
1178 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1184 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1179 EXPECT_EQ(MOJO_RESULT_OK, 1185 EXPECT_EQ(MOJO_RESULT_OK,
1180 this->ProducerWriteData(UserPointer<const void>(elements), 1186 this->ProducerWriteData(UserPointer<const void>(elements),
1181 MakeUserPointer(&num_bytes), true)); 1187 MakeUserPointer(&num_bytes), true));
1182 1188
1183 // Waiting should now succeed. 1189 // Waiting should now succeed.
1184 context = 0; 1190 context = 0;
1185 EXPECT_EQ(MOJO_RESULT_OK, 1191 EXPECT_EQ(MOJO_RESULT_OK,
1186 waiter.Wait(test::TinyTimeout(), &context, nullptr)); 1192 waiter.Wait(test::TinyTimeout(), &context, nullptr));
1187 EXPECT_EQ(90u, context); 1193 EXPECT_EQ(90u, context);
1188 hss = HandleSignalsState(); 1194 hss = HandleSignalsState();
1189 this->ConsumerRemoveAwakable(&waiter, &hss); 1195 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1190 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1196 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1191 hss.satisfied_signals); 1197 hss.satisfied_signals);
1192 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1198 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1193 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1199 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1194 hss.satisfiable_signals); 1200 hss.satisfiable_signals);
1195 1201
1196 // We'll want to wait for the peer closed signal to propagate. 1202 // We'll want to wait for the peer closed signal to propagate.
1197 waiter.Init(); 1203 waiter.Init();
1198 EXPECT_EQ(MOJO_RESULT_OK, 1204 EXPECT_EQ(MOJO_RESULT_OK,
1199 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1205 this->ConsumerAddAwakable(&waiter, 12, false,
1200 false, 12, nullptr)); 1206 MOJO_HANDLE_SIGNAL_PEER_CLOSED, nullptr));
1201 1207
1202 // Close the producer. 1208 // Close the producer.
1203 this->ProducerClose(); 1209 this->ProducerClose();
1204 1210
1205 // Should still be readable, even if the peer closed signal hasn't propagated 1211 // Should still be readable, even if the peer closed signal hasn't propagated
1206 // yet. 1212 // yet.
1207 waiter2.Init(); 1213 waiter2.Init();
1208 hss = HandleSignalsState(); 1214 hss = HandleSignalsState();
1209 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1215 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1210 this->ConsumerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_READABLE, 1216 this->ConsumerAddAwakable(&waiter2, 34, false,
1211 false, 34, &hss)); 1217 MOJO_HANDLE_SIGNAL_READABLE, &hss));
1212 // We don't know if the peer closed signal has propagated yet (for the remote 1218 // We don't know if the peer closed signal has propagated yet (for the remote
1213 // cases). 1219 // cases).
1214 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE)); 1220 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
1215 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1221 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1216 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1222 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1217 hss.satisfiable_signals); 1223 hss.satisfiable_signals);
1218 1224
1219 // Wait for the peer closed signal. 1225 // Wait for the peer closed signal.
1220 context = 0; 1226 context = 0;
1221 EXPECT_EQ(MOJO_RESULT_OK, 1227 EXPECT_EQ(MOJO_RESULT_OK,
1222 waiter.Wait(test::TinyTimeout(), &context, nullptr)); 1228 waiter.Wait(test::TinyTimeout(), &context, nullptr));
1223 EXPECT_EQ(12u, context); 1229 EXPECT_EQ(12u, context);
1224 hss = HandleSignalsState(); 1230 hss = HandleSignalsState();
1225 this->ConsumerRemoveAwakable(&waiter, &hss); 1231 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1226 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1232 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1227 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1233 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1228 hss.satisfied_signals); 1234 hss.satisfied_signals);
1229 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1235 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1230 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1236 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1231 hss.satisfiable_signals); 1237 hss.satisfiable_signals);
1232 1238
1233 // Read one element. 1239 // Read one element.
1234 elements[0] = -1; 1240 elements[0] = -1;
1235 elements[1] = -1; 1241 elements[1] = -1;
1236 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 1242 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
1237 EXPECT_EQ(MOJO_RESULT_OK, 1243 EXPECT_EQ(MOJO_RESULT_OK,
1238 this->ConsumerReadData(UserPointer<void>(elements), 1244 this->ConsumerReadData(UserPointer<void>(elements),
1239 MakeUserPointer(&num_bytes), true, false)); 1245 MakeUserPointer(&num_bytes), true, false));
1240 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1246 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1241 EXPECT_EQ(789, elements[0]); 1247 EXPECT_EQ(789, elements[0]);
1242 EXPECT_EQ(-1, elements[1]); 1248 EXPECT_EQ(-1, elements[1]);
1243 1249
1244 // Should be never-readable. 1250 // Should be never-readable.
1245 waiter.Init(); 1251 waiter.Init();
1246 hss = HandleSignalsState(); 1252 hss = HandleSignalsState();
1247 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1253 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1248 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1254 this->ConsumerAddAwakable(&waiter, 56, false,
1249 false, 56, &hss)); 1255 MOJO_HANDLE_SIGNAL_READABLE, &hss));
1250 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1256 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1251 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 1257 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1252 1258
1253 this->ConsumerClose(); 1259 this->ConsumerClose();
1254 } 1260 }
1255 1261
1256 // Test with two-phase APIs and also closing the producer with an active 1262 // Test with two-phase APIs and also closing the producer with an active
1257 // consumer waiter. 1263 // consumer waiter.
1258 TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) { 1264 TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
1259 const MojoCreateDataPipeOptions options = { 1265 const MojoCreateDataPipeOptions options = {
1260 kSizeOfOptions, // |struct_size|. 1266 kSizeOfOptions, // |struct_size|.
1261 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1267 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1262 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1268 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1263 1000 * sizeof(int32_t) // |capacity_num_bytes|. 1269 1000 * sizeof(int32_t) // |capacity_num_bytes|.
1264 }; 1270 };
1265 this->Create(options); 1271 this->Create(options);
1266 this->DoTransfer(); 1272 this->DoTransfer();
1267 1273
1268 Waiter waiter; 1274 Waiter waiter;
1269 HandleSignalsState hss; 1275 HandleSignalsState hss;
1270 uint64_t context; 1276 uint64_t context;
1271 1277
1272 // Add waiter: not yet readable. 1278 // Add waiter: not yet readable.
1273 waiter.Init(); 1279 waiter.Init();
1274 ASSERT_EQ(MOJO_RESULT_OK, 1280 ASSERT_EQ(MOJO_RESULT_OK,
1275 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1281 this->ConsumerAddAwakable(&waiter, 12, false,
1276 false, 12, nullptr)); 1282 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1277 1283
1278 // Write two elements. 1284 // Write two elements.
1279 int32_t* elements = nullptr; 1285 int32_t* elements = nullptr;
1280 void* buffer = nullptr; 1286 void* buffer = nullptr;
1281 uint32_t num_bytes = 0u; 1287 uint32_t num_bytes = 0u;
1282 EXPECT_EQ(MOJO_RESULT_OK, 1288 EXPECT_EQ(MOJO_RESULT_OK,
1283 this->ProducerBeginWriteData(MakeUserPointer(&buffer), 1289 this->ProducerBeginWriteData(MakeUserPointer(&buffer),
1284 MakeUserPointer(&num_bytes))); 1290 MakeUserPointer(&num_bytes)));
1285 EXPECT_TRUE(buffer); 1291 EXPECT_TRUE(buffer);
1286 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0]))); 1292 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
1287 elements = static_cast<int32_t*>(buffer); 1293 elements = static_cast<int32_t*>(buffer);
1288 elements[0] = 123; 1294 elements[0] = 123;
1289 elements[1] = 456; 1295 elements[1] = 456;
1290 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>( 1296 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>(
1291 2u * sizeof(elements[0])))); 1297 2u * sizeof(elements[0]))));
1292 1298
1293 // Wait for readability (needed for remote cases). 1299 // Wait for readability (needed for remote cases).
1294 context = 0; 1300 context = 0;
1295 EXPECT_EQ(MOJO_RESULT_OK, 1301 EXPECT_EQ(MOJO_RESULT_OK,
1296 waiter.Wait(test::TinyTimeout(), &context, nullptr)); 1302 waiter.Wait(test::TinyTimeout(), &context, nullptr));
1297 EXPECT_EQ(12u, context); 1303 EXPECT_EQ(12u, context);
1298 hss = HandleSignalsState(); 1304 hss = HandleSignalsState();
1299 this->ConsumerRemoveAwakable(&waiter, &hss); 1305 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1300 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1306 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1301 hss.satisfied_signals); 1307 hss.satisfied_signals);
1302 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1308 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1303 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1309 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1304 hss.satisfiable_signals); 1310 hss.satisfiable_signals);
1305 1311
1306 // Read one element. 1312 // Read one element.
1307 const void* read_buffer = nullptr; 1313 const void* read_buffer = nullptr;
1308 num_bytes = 0u; 1314 num_bytes = 0u;
1309 EXPECT_EQ(MOJO_RESULT_OK, 1315 EXPECT_EQ(MOJO_RESULT_OK,
1310 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer), 1316 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
1311 MakeUserPointer(&num_bytes))); 1317 MakeUserPointer(&num_bytes)));
1312 EXPECT_TRUE(read_buffer); 1318 EXPECT_TRUE(read_buffer);
1313 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 1319 EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
1314 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer); 1320 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
1315 EXPECT_EQ(123, read_elements[0]); 1321 EXPECT_EQ(123, read_elements[0]);
1316 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>( 1322 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
1317 1u * sizeof(elements[0])))); 1323 1u * sizeof(elements[0]))));
1318 1324
1319 // Should still be readable. 1325 // Should still be readable.
1320 waiter.Init(); 1326 waiter.Init();
1321 hss = HandleSignalsState(); 1327 hss = HandleSignalsState();
1322 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1328 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1323 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1329 this->ConsumerAddAwakable(&waiter, 34, false,
1324 false, 34, &hss)); 1330 MOJO_HANDLE_SIGNAL_READABLE, &hss));
1325 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1331 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1326 hss.satisfied_signals); 1332 hss.satisfied_signals);
1327 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1333 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1328 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1334 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1329 hss.satisfiable_signals); 1335 hss.satisfiable_signals);
1330 1336
1331 // Read one element. 1337 // Read one element.
1332 // Request three, but not in all-or-none mode. 1338 // Request three, but not in all-or-none mode.
1333 read_buffer = nullptr; 1339 read_buffer = nullptr;
1334 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 1340 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
1335 EXPECT_EQ(MOJO_RESULT_OK, 1341 EXPECT_EQ(MOJO_RESULT_OK,
1336 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer), 1342 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
1337 MakeUserPointer(&num_bytes))); 1343 MakeUserPointer(&num_bytes)));
1338 EXPECT_TRUE(read_buffer); 1344 EXPECT_TRUE(read_buffer);
1339 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 1345 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
1340 read_elements = static_cast<const int32_t*>(read_buffer); 1346 read_elements = static_cast<const int32_t*>(read_buffer);
1341 EXPECT_EQ(456, read_elements[0]); 1347 EXPECT_EQ(456, read_elements[0]);
1342 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>( 1348 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
1343 1u * sizeof(elements[0])))); 1349 1u * sizeof(elements[0]))));
1344 1350
1345 // Adding a waiter should now succeed. 1351 // Adding a waiter should now succeed.
1346 waiter.Init(); 1352 waiter.Init();
1347 ASSERT_EQ(MOJO_RESULT_OK, 1353 ASSERT_EQ(MOJO_RESULT_OK,
1348 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1354 this->ConsumerAddAwakable(&waiter, 56, false,
1349 false, 56, nullptr)); 1355 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1350 1356
1351 // Close the producer. 1357 // Close the producer.
1352 this->ProducerClose(); 1358 this->ProducerClose();
1353 1359
1354 // Should be never-readable. 1360 // Should be never-readable.
1355 context = 0; 1361 context = 0;
1356 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1362 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1357 waiter.Wait(test::TinyTimeout(), &context, nullptr)); 1363 waiter.Wait(test::TinyTimeout(), &context, nullptr));
1358 EXPECT_EQ(56u, context); 1364 EXPECT_EQ(56u, context);
1359 hss = HandleSignalsState(); 1365 hss = HandleSignalsState();
1360 this->ConsumerRemoveAwakable(&waiter, &hss); 1366 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1361 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1367 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1362 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 1368 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1363 1369
1364 this->ConsumerClose(); 1370 this->ConsumerClose();
1365 } 1371 }
1366 1372
1367 // Tests that data pipes aren't writable/readable during two-phase writes/reads. 1373 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
1368 TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) { 1374 TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
1369 const MojoCreateDataPipeOptions options = { 1375 const MojoCreateDataPipeOptions options = {
1370 kSizeOfOptions, // |struct_size|. 1376 kSizeOfOptions, // |struct_size|.
1371 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1377 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1372 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1378 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1373 1000 * sizeof(int32_t) // |capacity_num_bytes|. 1379 1000 * sizeof(int32_t) // |capacity_num_bytes|.
1374 }; 1380 };
1375 this->Create(options); 1381 this->Create(options);
1376 this->DoTransfer(); 1382 this->DoTransfer();
1377 1383
1378 Waiter pwaiter; // For producer. 1384 Waiter pwaiter; // For producer.
1379 Waiter cwaiter; // For consumer. 1385 Waiter cwaiter; // For consumer.
1380 HandleSignalsState hss; 1386 HandleSignalsState hss;
1381 1387
1382 // It should be writable. 1388 // It should be writable.
1383 pwaiter.Init(); 1389 pwaiter.Init();
1384 hss = HandleSignalsState(); 1390 hss = HandleSignalsState();
1385 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1391 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1386 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1392 this->ProducerAddAwakable(&pwaiter, 0, false,
1387 false, 0, &hss)); 1393 MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
1388 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1394 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1389 hss.satisfied_signals); 1395 hss.satisfied_signals);
1390 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1396 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1391 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1397 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1392 hss.satisfiable_signals); 1398 hss.satisfiable_signals);
1393 1399
1394 void* write_ptr = nullptr; 1400 void* write_ptr = nullptr;
1395 uint32_t num_bytes = 0u; 1401 uint32_t num_bytes = 0u;
1396 EXPECT_EQ(MOJO_RESULT_OK, 1402 EXPECT_EQ(MOJO_RESULT_OK,
1397 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr), 1403 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
1398 MakeUserPointer(&num_bytes))); 1404 MakeUserPointer(&num_bytes)));
1399 EXPECT_TRUE(write_ptr); 1405 EXPECT_TRUE(write_ptr);
1400 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 1406 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
1401 1407
1402 // At this point, it shouldn't be writable. 1408 // At this point, it shouldn't be writable.
1403 pwaiter.Init(); 1409 pwaiter.Init();
1404 ASSERT_EQ(MOJO_RESULT_OK, 1410 ASSERT_EQ(MOJO_RESULT_OK,
1405 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1411 this->ProducerAddAwakable(&pwaiter, 1, false,
1406 false, 1, nullptr)); 1412 MOJO_HANDLE_SIGNAL_WRITABLE, nullptr));
1407 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr, nullptr)); 1413 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr, nullptr));
1408 hss = HandleSignalsState(); 1414 hss = HandleSignalsState();
1409 this->ProducerRemoveAwakable(&pwaiter, &hss); 1415 this->ProducerRemoveAwakable(false, &pwaiter, 0, &hss);
1410 EXPECT_EQ(0u, hss.satisfied_signals); 1416 EXPECT_EQ(0u, hss.satisfied_signals);
1411 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1417 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1412 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1418 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1413 hss.satisfiable_signals); 1419 hss.satisfiable_signals);
1414 1420
1415 // It shouldn't be readable yet either (we'll wait later). 1421 // It shouldn't be readable yet either (we'll wait later).
1416 cwaiter.Init(); 1422 cwaiter.Init();
1417 ASSERT_EQ(MOJO_RESULT_OK, 1423 ASSERT_EQ(MOJO_RESULT_OK,
1418 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 1424 this->ConsumerAddAwakable(&cwaiter, 2, false,
1419 false, 2, nullptr)); 1425 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1420 1426
1421 static_cast<int32_t*>(write_ptr)[0] = 123; 1427 static_cast<int32_t*>(write_ptr)[0] = 123;
1422 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData( 1428 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(
1423 static_cast<uint32_t>(1u * sizeof(int32_t)))); 1429 static_cast<uint32_t>(1u * sizeof(int32_t))));
1424 1430
1425 // It should immediately be writable again. 1431 // It should immediately be writable again.
1426 pwaiter.Init(); 1432 pwaiter.Init();
1427 hss = HandleSignalsState(); 1433 hss = HandleSignalsState();
1428 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1434 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1429 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1435 this->ProducerAddAwakable(&pwaiter, 3, false,
1430 false, 3, &hss)); 1436 MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
1431 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1437 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1432 hss.satisfied_signals); 1438 hss.satisfied_signals);
1433 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1439 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1434 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1440 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1435 hss.satisfiable_signals); 1441 hss.satisfiable_signals);
1436 1442
1437 // It should become readable. 1443 // It should become readable.
1438 EXPECT_EQ(MOJO_RESULT_OK, 1444 EXPECT_EQ(MOJO_RESULT_OK,
1439 cwaiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 1445 cwaiter.Wait(test::TinyTimeout(), nullptr, nullptr));
1440 hss = HandleSignalsState(); 1446 hss = HandleSignalsState();
1441 this->ConsumerRemoveAwakable(&cwaiter, &hss); 1447 this->ConsumerRemoveAwakable(false, &cwaiter, 0, &hss);
1442 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1448 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1443 hss.satisfied_signals); 1449 hss.satisfied_signals);
1444 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1450 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1445 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1451 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1446 hss.satisfiable_signals); 1452 hss.satisfiable_signals);
1447 1453
1448 // Start another two-phase write and check that it's readable even in the 1454 // Start another two-phase write and check that it's readable even in the
1449 // middle of it. 1455 // middle of it.
1450 write_ptr = nullptr; 1456 write_ptr = nullptr;
1451 num_bytes = 0u; 1457 num_bytes = 0u;
1452 EXPECT_EQ(MOJO_RESULT_OK, 1458 EXPECT_EQ(MOJO_RESULT_OK,
1453 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr), 1459 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
1454 MakeUserPointer(&num_bytes))); 1460 MakeUserPointer(&num_bytes)));
1455 EXPECT_TRUE(write_ptr); 1461 EXPECT_TRUE(write_ptr);
1456 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 1462 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
1457 1463
1458 // It should be readable. 1464 // It should be readable.
1459 cwaiter.Init(); 1465 cwaiter.Init();
1460 hss = HandleSignalsState(); 1466 hss = HandleSignalsState();
1461 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1467 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1462 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 1468 this->ConsumerAddAwakable(&cwaiter, 5, false,
1463 false, 5, &hss)); 1469 MOJO_HANDLE_SIGNAL_READABLE, &hss));
1464 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1470 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1465 hss.satisfied_signals); 1471 hss.satisfied_signals);
1466 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1472 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1467 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1473 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1468 hss.satisfiable_signals); 1474 hss.satisfiable_signals);
1469 1475
1470 // End the two-phase write without writing anything. 1476 // End the two-phase write without writing anything.
1471 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u)); 1477 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
1472 1478
1473 // Start a two-phase read. 1479 // Start a two-phase read.
1474 const void* read_ptr = nullptr; 1480 const void* read_ptr = nullptr;
1475 num_bytes = 0u; 1481 num_bytes = 0u;
1476 EXPECT_EQ(MOJO_RESULT_OK, 1482 EXPECT_EQ(MOJO_RESULT_OK,
1477 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr), 1483 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
1478 MakeUserPointer(&num_bytes))); 1484 MakeUserPointer(&num_bytes)));
1479 EXPECT_TRUE(read_ptr); 1485 EXPECT_TRUE(read_ptr);
1480 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes); 1486 EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
1481 1487
1482 // At this point, it should still be writable. 1488 // At this point, it should still be writable.
1483 pwaiter.Init(); 1489 pwaiter.Init();
1484 hss = HandleSignalsState(); 1490 hss = HandleSignalsState();
1485 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1491 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1486 this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1492 this->ProducerAddAwakable(&pwaiter, 6, false,
1487 false, 6, &hss)); 1493 MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
1488 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1494 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1489 hss.satisfied_signals); 1495 hss.satisfied_signals);
1490 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1496 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1491 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 1497 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
1492 hss.satisfiable_signals); 1498 hss.satisfiable_signals);
1493 1499
1494 // But not readable. 1500 // But not readable.
1495 cwaiter.Init(); 1501 cwaiter.Init();
1496 ASSERT_EQ(MOJO_RESULT_OK, 1502 ASSERT_EQ(MOJO_RESULT_OK,
1497 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 1503 this->ConsumerAddAwakable(&cwaiter, 7, false,
1498 false, 7, nullptr)); 1504 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1499 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr, nullptr)); 1505 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr, nullptr));
1500 hss = HandleSignalsState(); 1506 hss = HandleSignalsState();
1501 this->ConsumerRemoveAwakable(&cwaiter, &hss); 1507 this->ConsumerRemoveAwakable(false, &cwaiter, 0, &hss);
1502 EXPECT_EQ(0u, hss.satisfied_signals); 1508 EXPECT_EQ(0u, hss.satisfied_signals);
1503 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1509 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1504 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1510 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1505 hss.satisfiable_signals); 1511 hss.satisfiable_signals);
1506 1512
1507 // End the two-phase read without reading anything. 1513 // End the two-phase read without reading anything.
1508 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u)); 1514 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
1509 1515
1510 // It should be readable again. 1516 // It should be readable again.
1511 cwaiter.Init(); 1517 cwaiter.Init();
1512 hss = HandleSignalsState(); 1518 hss = HandleSignalsState();
1513 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 1519 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
1514 this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 1520 this->ConsumerAddAwakable(&cwaiter, 8, false,
1515 false, 8, &hss)); 1521 MOJO_HANDLE_SIGNAL_READABLE, &hss));
1516 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1522 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1517 hss.satisfied_signals); 1523 hss.satisfied_signals);
1518 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1524 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1519 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1525 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1520 hss.satisfiable_signals); 1526 hss.satisfiable_signals);
1521 1527
1522 this->ProducerClose(); 1528 this->ProducerClose();
1523 this->ConsumerClose(); 1529 this->ConsumerClose();
1524 } 1530 }
1525 1531
(...skipping 25 matching lines...) Expand all
1551 1557
1552 // Should still be empty. 1558 // Should still be empty.
1553 num_bytes = ~0u; 1559 num_bytes = ~0u;
1554 EXPECT_EQ(MOJO_RESULT_OK, 1560 EXPECT_EQ(MOJO_RESULT_OK,
1555 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 1561 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1556 EXPECT_EQ(0u, num_bytes); 1562 EXPECT_EQ(0u, num_bytes);
1557 1563
1558 // Add waiter. 1564 // Add waiter.
1559 waiter.Init(); 1565 waiter.Init();
1560 ASSERT_EQ(MOJO_RESULT_OK, 1566 ASSERT_EQ(MOJO_RESULT_OK,
1561 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1567 this->ConsumerAddAwakable(&waiter, 1, false,
1562 false, 1, nullptr)); 1568 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1563 1569
1564 // Write some data. 1570 // Write some data.
1565 num_bytes = 5u * sizeof(int32_t); 1571 num_bytes = 5u * sizeof(int32_t);
1566 Seq(100, MOJO_ARRAYSIZE(buffer), buffer); 1572 Seq(100, MOJO_ARRAYSIZE(buffer), buffer);
1567 EXPECT_EQ(MOJO_RESULT_OK, 1573 EXPECT_EQ(MOJO_RESULT_OK,
1568 this->ProducerWriteData(UserPointer<const void>(buffer), 1574 this->ProducerWriteData(UserPointer<const void>(buffer),
1569 MakeUserPointer(&num_bytes), true)); 1575 MakeUserPointer(&num_bytes), true));
1570 EXPECT_EQ(5u * sizeof(int32_t), num_bytes); 1576 EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1571 1577
1572 // Wait for data. 1578 // Wait for data.
1573 // TODO(vtl): There's no real guarantee that all the data will become 1579 // TODO(vtl): There's no real guarantee that all the data will become
1574 // available at once (except that in current implementations, with reasonable 1580 // available at once (except that in current implementations, with reasonable
1575 // limits, it will). Eventually, we'll be able to wait for a specified amount 1581 // limits, it will). Eventually, we'll be able to wait for a specified amount
1576 // of data to become available. 1582 // of data to become available.
1577 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 1583 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
1578 hss = HandleSignalsState(); 1584 hss = HandleSignalsState();
1579 this->ConsumerRemoveAwakable(&waiter, &hss); 1585 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1580 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1586 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1581 hss.satisfied_signals); 1587 hss.satisfied_signals);
1582 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1588 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1583 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1589 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1584 hss.satisfiable_signals); 1590 hss.satisfiable_signals);
1585 1591
1586 // Half full. 1592 // Half full.
1587 num_bytes = 0u; 1593 num_bytes = 0u;
1588 EXPECT_EQ(MOJO_RESULT_OK, 1594 EXPECT_EQ(MOJO_RESULT_OK,
1589 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 1595 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
1673 1679
1674 // Three left. 1680 // Three left.
1675 num_bytes = 0u; 1681 num_bytes = 0u;
1676 EXPECT_EQ(MOJO_RESULT_OK, 1682 EXPECT_EQ(MOJO_RESULT_OK,
1677 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 1683 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1678 EXPECT_EQ(3u * sizeof(int32_t), num_bytes); 1684 EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
1679 1685
1680 // We'll need to wait for the peer closed to propagate. 1686 // We'll need to wait for the peer closed to propagate.
1681 waiter.Init(); 1687 waiter.Init();
1682 ASSERT_EQ(MOJO_RESULT_OK, 1688 ASSERT_EQ(MOJO_RESULT_OK,
1683 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1689 this->ConsumerAddAwakable(&waiter, 2, false,
1684 false, 2, nullptr)); 1690 MOJO_HANDLE_SIGNAL_PEER_CLOSED, nullptr));
1685 1691
1686 // Close the producer, then test producer-closed cases. 1692 // Close the producer, then test producer-closed cases.
1687 this->ProducerClose(); 1693 this->ProducerClose();
1688 1694
1689 // Wait. 1695 // Wait.
1690 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 1696 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
1691 hss = HandleSignalsState(); 1697 hss = HandleSignalsState();
1692 this->ConsumerRemoveAwakable(&waiter, &hss); 1698 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1693 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1699 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1694 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1700 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1695 hss.satisfied_signals); 1701 hss.satisfied_signals);
1696 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1702 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1697 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1703 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1698 hss.satisfiable_signals); 1704 hss.satisfiable_signals);
1699 1705
1700 // Try reading too much; "failed precondition" since the producer is closed. 1706 // Try reading too much; "failed precondition" since the producer is closed.
1701 num_bytes = 4u * sizeof(int32_t); 1707 num_bytes = 4u * sizeof(int32_t);
1702 memset(buffer, 0xab, sizeof(buffer)); 1708 memset(buffer, 0xab, sizeof(buffer));
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
1760 ASSERT_EQ(100u, validated_options.capacity_num_bytes); 1766 ASSERT_EQ(100u, validated_options.capacity_num_bytes);
1761 this->Create(options); 1767 this->Create(options);
1762 this->DoTransfer(); 1768 this->DoTransfer();
1763 1769
1764 Waiter waiter; 1770 Waiter waiter;
1765 HandleSignalsState hss; 1771 HandleSignalsState hss;
1766 1772
1767 // Add waiter. 1773 // Add waiter.
1768 waiter.Init(); 1774 waiter.Init();
1769 ASSERT_EQ(MOJO_RESULT_OK, 1775 ASSERT_EQ(MOJO_RESULT_OK,
1770 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1776 this->ConsumerAddAwakable(&waiter, 1, false,
1771 false, 1, nullptr)); 1777 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1772 1778
1773 // Write 20 bytes. 1779 // Write 20 bytes.
1774 uint32_t num_bytes = 20u; 1780 uint32_t num_bytes = 20u;
1775 EXPECT_EQ(MOJO_RESULT_OK, 1781 EXPECT_EQ(MOJO_RESULT_OK,
1776 this->ProducerWriteData(UserPointer<const void>(&test_data[0]), 1782 this->ProducerWriteData(UserPointer<const void>(&test_data[0]),
1777 MakeUserPointer(&num_bytes), false)); 1783 MakeUserPointer(&num_bytes), false));
1778 EXPECT_EQ(20u, num_bytes); 1784 EXPECT_EQ(20u, num_bytes);
1779 1785
1780 // Wait for data. 1786 // Wait for data.
1781 // TODO(vtl): (See corresponding TODO in AllOrNone.) 1787 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1782 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 1788 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
1783 hss = HandleSignalsState(); 1789 hss = HandleSignalsState();
1784 this->ConsumerRemoveAwakable(&waiter, &hss); 1790 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1785 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1791 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1786 hss.satisfied_signals); 1792 hss.satisfied_signals);
1787 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1793 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
1788 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 1794 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
1789 hss.satisfiable_signals); 1795 hss.satisfiable_signals);
1790 1796
1791 // Read 10 bytes. 1797 // Read 10 bytes.
1792 unsigned char read_buffer[1000] = {0}; 1798 unsigned char read_buffer[1000] = {0};
1793 num_bytes = 10u; 1799 num_bytes = 10u;
1794 EXPECT_EQ(MOJO_RESULT_OK, 1800 EXPECT_EQ(MOJO_RESULT_OK,
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
1965 }; 1971 };
1966 this->Create(options); 1972 this->Create(options);
1967 this->DoTransfer(); 1973 this->DoTransfer();
1968 1974
1969 Waiter waiter; 1975 Waiter waiter;
1970 HandleSignalsState hss; 1976 HandleSignalsState hss;
1971 1977
1972 // Add waiter. 1978 // Add waiter.
1973 waiter.Init(); 1979 waiter.Init();
1974 ASSERT_EQ(MOJO_RESULT_OK, 1980 ASSERT_EQ(MOJO_RESULT_OK,
1975 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1981 this->ConsumerAddAwakable(&waiter, 1, false,
1976 false, 1, nullptr)); 1982 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
1977 1983
1978 // Write some data, so we'll have something to read. 1984 // Write some data, so we'll have something to read.
1979 uint32_t num_bytes = kTestDataSize; 1985 uint32_t num_bytes = kTestDataSize;
1980 EXPECT_EQ(MOJO_RESULT_OK, 1986 EXPECT_EQ(MOJO_RESULT_OK,
1981 this->ProducerWriteData(UserPointer<const void>(kTestData), 1987 this->ProducerWriteData(UserPointer<const void>(kTestData),
1982 MakeUserPointer(&num_bytes), false)); 1988 MakeUserPointer(&num_bytes), false));
1983 EXPECT_EQ(kTestDataSize, num_bytes); 1989 EXPECT_EQ(kTestDataSize, num_bytes);
1984 1990
1985 // Start two-phase write. 1991 // Start two-phase write.
1986 void* write_buffer_ptr = nullptr; 1992 void* write_buffer_ptr = nullptr;
1987 num_bytes = 0u; 1993 num_bytes = 0u;
1988 EXPECT_EQ(MOJO_RESULT_OK, 1994 EXPECT_EQ(MOJO_RESULT_OK,
1989 this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr), 1995 this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
1990 MakeUserPointer(&num_bytes))); 1996 MakeUserPointer(&num_bytes)));
1991 EXPECT_TRUE(write_buffer_ptr); 1997 EXPECT_TRUE(write_buffer_ptr);
1992 ASSERT_GT(num_bytes, kTestDataSize); 1998 ASSERT_GT(num_bytes, kTestDataSize);
1993 1999
1994 // Wait for data. 2000 // Wait for data.
1995 // TODO(vtl): (See corresponding TODO in AllOrNone.) 2001 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1996 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2002 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
1997 hss = HandleSignalsState(); 2003 hss = HandleSignalsState();
1998 this->ConsumerRemoveAwakable(&waiter, &hss); 2004 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
1999 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2005 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2000 hss.satisfied_signals); 2006 hss.satisfied_signals);
2001 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2007 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2002 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2008 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2003 hss.satisfiable_signals); 2009 hss.satisfiable_signals);
2004 2010
2005 // Start two-phase read. 2011 // Start two-phase read.
2006 const void* read_buffer_ptr = nullptr; 2012 const void* read_buffer_ptr = nullptr;
2007 num_bytes = 0u; 2013 num_bytes = 0u;
2008 EXPECT_EQ(MOJO_RESULT_OK, 2014 EXPECT_EQ(MOJO_RESULT_OK,
2009 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr), 2015 this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
2010 MakeUserPointer(&num_bytes))); 2016 MakeUserPointer(&num_bytes)));
2011 EXPECT_TRUE(read_buffer_ptr); 2017 EXPECT_TRUE(read_buffer_ptr);
2012 EXPECT_EQ(kTestDataSize, num_bytes); 2018 EXPECT_EQ(kTestDataSize, num_bytes);
2013 2019
2014 // Add waiter. 2020 // Add waiter.
2015 waiter.Init(); 2021 waiter.Init();
2016 ASSERT_EQ(MOJO_RESULT_OK, 2022 ASSERT_EQ(MOJO_RESULT_OK,
2017 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2023 this->ProducerAddAwakable(&waiter, 1, false,
2018 false, 1, nullptr)); 2024 MOJO_HANDLE_SIGNAL_PEER_CLOSED, nullptr));
2019 2025
2020 // Close the consumer. 2026 // Close the consumer.
2021 this->ConsumerClose(); 2027 this->ConsumerClose();
2022 2028
2023 // Wait for producer to know that the consumer is closed. 2029 // Wait for producer to know that the consumer is closed.
2024 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2030 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2025 hss = HandleSignalsState(); 2031 hss = HandleSignalsState();
2026 this->ProducerRemoveAwakable(&waiter, &hss); 2032 this->ProducerRemoveAwakable(false, &waiter, 0, &hss);
2027 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2033 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2028 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2034 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2029 2035
2030 // Actually write some data. (Note: Premature freeing of the buffer would 2036 // Actually write some data. (Note: Premature freeing of the buffer would
2031 // probably only be detected under ASAN or similar.) 2037 // probably only be detected under ASAN or similar.)
2032 memcpy(write_buffer_ptr, kTestData, kTestDataSize); 2038 memcpy(write_buffer_ptr, kTestData, kTestDataSize);
2033 // Note: Even though the consumer has been closed, ending the two-phase 2039 // Note: Even though the consumer has been closed, ending the two-phase
2034 // write will report success. 2040 // write will report success.
2035 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(kTestDataSize)); 2041 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(kTestDataSize));
2036 2042
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
2098 // Write some data, so we'll have something to read. 2104 // Write some data, so we'll have something to read.
2099 uint32_t num_bytes = kTestDataSize; 2105 uint32_t num_bytes = kTestDataSize;
2100 EXPECT_EQ(MOJO_RESULT_OK, 2106 EXPECT_EQ(MOJO_RESULT_OK,
2101 this->ProducerWriteData(UserPointer<const void>(kTestData), 2107 this->ProducerWriteData(UserPointer<const void>(kTestData),
2102 MakeUserPointer(&num_bytes), false)); 2108 MakeUserPointer(&num_bytes), false));
2103 EXPECT_EQ(kTestDataSize, num_bytes); 2109 EXPECT_EQ(kTestDataSize, num_bytes);
2104 2110
2105 // Add waiter. 2111 // Add waiter.
2106 waiter.Init(); 2112 waiter.Init();
2107 ASSERT_EQ(MOJO_RESULT_OK, 2113 ASSERT_EQ(MOJO_RESULT_OK,
2108 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2114 this->ConsumerAddAwakable(&waiter, 1, false,
2109 false, 1, nullptr)); 2115 MOJO_HANDLE_SIGNAL_PEER_CLOSED, nullptr));
2110 2116
2111 // Close the producer. 2117 // Close the producer.
2112 this->ProducerClose(); 2118 this->ProducerClose();
2113 2119
2114 // Wait. (Note that once the consumer knows that the producer is closed, it 2120 // Wait. (Note that once the consumer knows that the producer is closed, it
2115 // must also know about all the data that was sent.) 2121 // must also know about all the data that was sent.)
2116 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2122 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2117 hss = HandleSignalsState(); 2123 hss = HandleSignalsState();
2118 this->ConsumerRemoveAwakable(&waiter, &hss); 2124 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2119 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2125 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2120 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2126 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2121 hss.satisfied_signals); 2127 hss.satisfied_signals);
2122 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2128 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2123 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2129 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2124 hss.satisfiable_signals); 2130 hss.satisfiable_signals);
2125 2131
2126 // Peek that data. 2132 // Peek that data.
2127 char buffer[1000]; 2133 char buffer[1000];
2128 num_bytes = static_cast<uint32_t>(sizeof(buffer)); 2134 num_bytes = static_cast<uint32_t>(sizeof(buffer));
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
2176 }; 2182 };
2177 this->Create(options); 2183 this->Create(options);
2178 this->DoTransfer(); 2184 this->DoTransfer();
2179 2185
2180 Waiter waiter; 2186 Waiter waiter;
2181 HandleSignalsState hss; 2187 HandleSignalsState hss;
2182 2188
2183 // Add waiter. 2189 // Add waiter.
2184 waiter.Init(); 2190 waiter.Init();
2185 ASSERT_EQ(MOJO_RESULT_OK, 2191 ASSERT_EQ(MOJO_RESULT_OK,
2186 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 2192 this->ConsumerAddAwakable(&waiter, 0, false,
2187 false, 0, nullptr)); 2193 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
2188 2194
2189 // Write some data, so we'll have something to read. 2195 // Write some data, so we'll have something to read.
2190 uint32_t num_bytes = kTestDataSize; 2196 uint32_t num_bytes = kTestDataSize;
2191 EXPECT_EQ(MOJO_RESULT_OK, 2197 EXPECT_EQ(MOJO_RESULT_OK,
2192 this->ProducerWriteData(UserPointer<const void>(&kTestData), 2198 this->ProducerWriteData(UserPointer<const void>(&kTestData),
2193 MakeUserPointer(&num_bytes), false)); 2199 MakeUserPointer(&num_bytes), false));
2194 EXPECT_EQ(kTestDataSize, num_bytes); 2200 EXPECT_EQ(kTestDataSize, num_bytes);
2195 2201
2196 // Wait. 2202 // Wait.
2197 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2203 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2198 hss = HandleSignalsState(); 2204 hss = HandleSignalsState();
2199 this->ConsumerRemoveAwakable(&waiter, &hss); 2205 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2200 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2206 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2201 hss.satisfied_signals); 2207 hss.satisfied_signals);
2202 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2208 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2203 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2209 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2204 hss.satisfiable_signals); 2210 hss.satisfiable_signals);
2205 2211
2206 // Read that data. 2212 // Read that data.
2207 int64_t data[10] = {}; 2213 int64_t data[10] = {};
2208 num_bytes = static_cast<uint32_t>(sizeof(data)); 2214 num_bytes = static_cast<uint32_t>(sizeof(data));
2209 EXPECT_EQ(MOJO_RESULT_OK, 2215 EXPECT_EQ(MOJO_RESULT_OK,
2210 this->ConsumerReadData(UserPointer<void>(data), 2216 this->ConsumerReadData(UserPointer<void>(data),
2211 MakeUserPointer(&num_bytes), false, false)); 2217 MakeUserPointer(&num_bytes), false, false));
2212 EXPECT_EQ(kTestDataSize, num_bytes); 2218 EXPECT_EQ(kTestDataSize, num_bytes);
2213 EXPECT_EQ(kTestData, data[0]); 2219 EXPECT_EQ(kTestData, data[0]);
2214 2220
2215 // Add waiter again. 2221 // Add waiter again.
2216 waiter.Init(); 2222 waiter.Init();
2217 ASSERT_EQ(MOJO_RESULT_OK, 2223 ASSERT_EQ(MOJO_RESULT_OK,
2218 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 2224 this->ConsumerAddAwakable(&waiter, 0, false,
2219 false, 0, nullptr)); 2225 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
2220 2226
2221 // Close the producer. 2227 // Close the producer.
2222 this->ProducerClose(); 2228 this->ProducerClose();
2223 2229
2224 // Wait. 2230 // Wait.
2225 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2231 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2226 waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2232 waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2227 hss = HandleSignalsState(); 2233 hss = HandleSignalsState();
2228 this->ConsumerRemoveAwakable(&waiter, &hss); 2234 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2229 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2235 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2230 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2236 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2231 2237
2232 this->ConsumerClose(); 2238 this->ConsumerClose();
2233 } 2239 }
2234 2240
2235 // During a two-phase read, the consumer is not readable so it may be waited 2241 // During a two-phase read, the consumer is not readable so it may be waited
2236 // upon (to become readable again). If the producer is closed and the two-phase 2242 // upon (to become readable again). If the producer is closed and the two-phase
2237 // read consumes the remaining data, that wait should become unsatisfiable. 2243 // read consumes the remaining data, that wait should become unsatisfiable.
2238 TYPED_TEST(DataPipeImplTest, BeginReadCloseProducerWaitEndReadNoData) { 2244 TYPED_TEST(DataPipeImplTest, BeginReadCloseProducerWaitEndReadNoData) {
2239 const int64_t kTestData = 123456789012345LL; 2245 const int64_t kTestData = 123456789012345LL;
2240 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 2246 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
2241 2247
2242 const MojoCreateDataPipeOptions options = { 2248 const MojoCreateDataPipeOptions options = {
2243 kSizeOfOptions, // |struct_size|. 2249 kSizeOfOptions, // |struct_size|.
2244 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 2250 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
2245 kTestDataSize, // |element_num_bytes|. 2251 kTestDataSize, // |element_num_bytes|.
2246 100u * kTestDataSize // |capacity_num_bytes|. 2252 100u * kTestDataSize // |capacity_num_bytes|.
2247 }; 2253 };
2248 this->Create(options); 2254 this->Create(options);
2249 this->DoTransfer(); 2255 this->DoTransfer();
2250 2256
2251 Waiter waiter; 2257 Waiter waiter;
2252 HandleSignalsState hss; 2258 HandleSignalsState hss;
2253 2259
2254 // Add waiter (for the consumer to become readable). 2260 // Add waiter (for the consumer to become readable).
2255 waiter.Init(); 2261 waiter.Init();
2256 ASSERT_EQ(MOJO_RESULT_OK, 2262 ASSERT_EQ(MOJO_RESULT_OK,
2257 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 2263 this->ConsumerAddAwakable(&waiter, 0, false,
2258 false, 0, nullptr)); 2264 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
2259 2265
2260 // Write some data, so we'll have something to read. 2266 // Write some data, so we'll have something to read.
2261 uint32_t num_bytes = kTestDataSize; 2267 uint32_t num_bytes = kTestDataSize;
2262 EXPECT_EQ(MOJO_RESULT_OK, 2268 EXPECT_EQ(MOJO_RESULT_OK,
2263 this->ProducerWriteData(UserPointer<const void>(&kTestData), 2269 this->ProducerWriteData(UserPointer<const void>(&kTestData),
2264 MakeUserPointer(&num_bytes), false)); 2270 MakeUserPointer(&num_bytes), false));
2265 EXPECT_EQ(kTestDataSize, num_bytes); 2271 EXPECT_EQ(kTestDataSize, num_bytes);
2266 2272
2267 // Wait. 2273 // Wait.
2268 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2274 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2269 hss = HandleSignalsState(); 2275 hss = HandleSignalsState();
2270 this->ConsumerRemoveAwakable(&waiter, &hss); 2276 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2271 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2277 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2272 hss.satisfied_signals); 2278 hss.satisfied_signals);
2273 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2279 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2274 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2280 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2275 hss.satisfiable_signals); 2281 hss.satisfiable_signals);
2276 2282
2277 // Start a two-phase read. 2283 // Start a two-phase read.
2278 num_bytes = 0u; 2284 num_bytes = 0u;
2279 const void* read_ptr = nullptr; 2285 const void* read_ptr = nullptr;
2280 EXPECT_EQ(MOJO_RESULT_OK, 2286 EXPECT_EQ(MOJO_RESULT_OK,
2281 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr), 2287 this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
2282 MakeUserPointer(&num_bytes))); 2288 MakeUserPointer(&num_bytes)));
2283 EXPECT_EQ(kTestDataSize, num_bytes); 2289 EXPECT_EQ(kTestDataSize, num_bytes);
2284 EXPECT_EQ(kTestData, static_cast<const int64_t*>(read_ptr)[0]); 2290 EXPECT_EQ(kTestData, static_cast<const int64_t*>(read_ptr)[0]);
2285 2291
2286 // Add waiter (for the producer to be closed). 2292 // Add waiter (for the producer to be closed).
2287 waiter.Init(); 2293 waiter.Init();
2288 ASSERT_EQ(MOJO_RESULT_OK, 2294 ASSERT_EQ(MOJO_RESULT_OK,
2289 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2295 this->ConsumerAddAwakable(&waiter, 0, false,
2290 false, 0, nullptr)); 2296 MOJO_HANDLE_SIGNAL_PEER_CLOSED, nullptr));
2291 2297
2292 // Close the producer. 2298 // Close the producer.
2293 this->ProducerClose(); 2299 this->ProducerClose();
2294 2300
2295 // Wait for producer close to be detected. 2301 // Wait for producer close to be detected.
2296 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2302 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2297 hss = HandleSignalsState(); 2303 hss = HandleSignalsState();
2298 this->ConsumerRemoveAwakable(&waiter, &hss); 2304 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2299 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2305 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2300 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2306 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2301 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2307 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2302 hss.satisfiable_signals); 2308 hss.satisfiable_signals);
2303 2309
2304 // Add waiter (for the consumer to become readable). 2310 // Add waiter (for the consumer to become readable).
2305 waiter.Init(); 2311 waiter.Init();
2306 ASSERT_EQ(MOJO_RESULT_OK, 2312 ASSERT_EQ(MOJO_RESULT_OK,
2307 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 2313 this->ConsumerAddAwakable(&waiter, 0, false,
2308 false, 0, nullptr)); 2314 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
2309 2315
2310 // Complete the two-phase read. 2316 // Complete the two-phase read.
2311 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize)); 2317 EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize));
2312 2318
2313 // Wait. 2319 // Wait.
2314 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2320 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2315 waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2321 waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2316 hss = HandleSignalsState(); 2322 hss = HandleSignalsState();
2317 this->ConsumerRemoveAwakable(&waiter, &hss); 2323 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2318 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2324 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2319 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2325 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2320 2326
2321 this->ConsumerClose(); 2327 this->ConsumerClose();
2322 } 2328 }
2323 2329
2324 // During a two-phase write, the producer is not writable so it may be waited 2330 // During a two-phase write, the producer is not writable so it may be waited
2325 // upon (to become writable again). If the consumer is closed, that wait should 2331 // upon (to become writable again). If the consumer is closed, that wait should
2326 // become unsatisfiable. 2332 // become unsatisfiable.
2327 TYPED_TEST(DataPipeImplTest, BeginWriteCloseConsumerWaitEndWrite) { 2333 TYPED_TEST(DataPipeImplTest, BeginWriteCloseConsumerWaitEndWrite) {
(...skipping 13 matching lines...) Expand all
2341 // Start a two-phase write. 2347 // Start a two-phase write.
2342 void* write_ptr = nullptr; 2348 void* write_ptr = nullptr;
2343 uint32_t num_bytes = 0u; 2349 uint32_t num_bytes = 0u;
2344 EXPECT_EQ(MOJO_RESULT_OK, 2350 EXPECT_EQ(MOJO_RESULT_OK,
2345 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr), 2351 this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
2346 MakeUserPointer(&num_bytes))); 2352 MakeUserPointer(&num_bytes)));
2347 2353
2348 // Add waiter (for the consumer to be closed). 2354 // Add waiter (for the consumer to be closed).
2349 waiter1.Init(); 2355 waiter1.Init();
2350 ASSERT_EQ(MOJO_RESULT_OK, 2356 ASSERT_EQ(MOJO_RESULT_OK,
2351 this->ProducerAddAwakable(&waiter1, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2357 this->ProducerAddAwakable(&waiter1, 0, false,
2352 false, 0, nullptr)); 2358 MOJO_HANDLE_SIGNAL_PEER_CLOSED, nullptr));
2353 2359
2354 // Add a separate waiter (for the producer to become writable). 2360 // Add a separate waiter (for the producer to become writable).
2355 waiter2.Init(); 2361 waiter2.Init();
2356 ASSERT_EQ(MOJO_RESULT_OK, 2362 ASSERT_EQ(MOJO_RESULT_OK,
2357 this->ProducerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_WRITABLE, 2363 this->ProducerAddAwakable(&waiter2, 0, false,
2358 false, 0, nullptr)); 2364 MOJO_HANDLE_SIGNAL_WRITABLE, nullptr));
2359 2365
2360 // Close the consumer. 2366 // Close the consumer.
2361 this->ConsumerClose(); 2367 this->ConsumerClose();
2362 2368
2363 // Wait for the consumer close to be detected. 2369 // Wait for the consumer close to be detected.
2364 // Note: If we didn't wait for the consumer close to be detected before 2370 // Note: If we didn't wait for the consumer close to be detected before
2365 // completing the two-phase write, wait might succeed (in the remote cases). 2371 // completing the two-phase write, wait might succeed (in the remote cases).
2366 // This is because the first |Awake()| "wins". 2372 // This is because the first |Awake()| "wins".
2367 EXPECT_EQ(MOJO_RESULT_OK, 2373 EXPECT_EQ(MOJO_RESULT_OK,
2368 waiter1.Wait(test::TinyTimeout(), nullptr, nullptr)); 2374 waiter1.Wait(test::TinyTimeout(), nullptr, nullptr));
2369 hss = HandleSignalsState(); 2375 hss = HandleSignalsState();
2370 this->ProducerRemoveAwakable(&waiter1, &hss); 2376 this->ProducerRemoveAwakable(false, &waiter1, 0, &hss);
2371 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2377 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2372 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2378 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2373 2379
2374 // Complete the two-phase write (with nothing written). 2380 // Complete the two-phase write (with nothing written).
2375 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u)); 2381 EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
2376 2382
2377 // Wait. 2383 // Wait.
2378 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2384 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2379 waiter2.Wait(test::TinyTimeout(), nullptr, nullptr)); 2385 waiter2.Wait(test::TinyTimeout(), nullptr, nullptr));
2380 hss = HandleSignalsState(); 2386 hss = HandleSignalsState();
2381 this->ProducerRemoveAwakable(&waiter2, &hss); 2387 this->ProducerRemoveAwakable(false, &waiter2, 0, &hss);
2382 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2388 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2383 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2389 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2384 2390
2385 this->ProducerClose(); 2391 this->ProducerClose();
2386 } 2392 }
2387 2393
2388 // Test that two-phase reads/writes behave correctly when given invalid 2394 // Test that two-phase reads/writes behave correctly when given invalid
2389 // arguments. 2395 // arguments.
2390 TYPED_TEST(DataPipeImplTest, TwoPhaseMoreInvalidArguments) { 2396 TYPED_TEST(DataPipeImplTest, TwoPhaseMoreInvalidArguments) {
2391 const MojoCreateDataPipeOptions options = { 2397 const MojoCreateDataPipeOptions options = {
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
2460 2466
2461 // Still no data. 2467 // Still no data.
2462 num_bytes = 1000u; 2468 num_bytes = 1000u;
2463 EXPECT_EQ(MOJO_RESULT_OK, 2469 EXPECT_EQ(MOJO_RESULT_OK,
2464 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 2470 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
2465 EXPECT_EQ(0u, num_bytes); 2471 EXPECT_EQ(0u, num_bytes);
2466 2472
2467 // Add waiter. 2473 // Add waiter.
2468 waiter.Init(); 2474 waiter.Init();
2469 ASSERT_EQ(MOJO_RESULT_OK, 2475 ASSERT_EQ(MOJO_RESULT_OK,
2470 this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 2476 this->ConsumerAddAwakable(&waiter, 1, false,
2471 false, 1, nullptr)); 2477 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
2472 2478
2473 // Now write some data, so we'll be able to try reading. 2479 // Now write some data, so we'll be able to try reading.
2474 int32_t element = 123; 2480 int32_t element = 123;
2475 num_bytes = 1u * sizeof(int32_t); 2481 num_bytes = 1u * sizeof(int32_t);
2476 EXPECT_EQ(MOJO_RESULT_OK, 2482 EXPECT_EQ(MOJO_RESULT_OK,
2477 this->ProducerWriteData(UserPointer<const void>(&element), 2483 this->ProducerWriteData(UserPointer<const void>(&element),
2478 MakeUserPointer(&num_bytes), false)); 2484 MakeUserPointer(&num_bytes), false));
2479 2485
2480 // Wait for data. 2486 // Wait for data.
2481 // TODO(vtl): (See corresponding TODO in AllOrNone.) 2487 // TODO(vtl): (See corresponding TODO in AllOrNone.)
2482 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2488 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2483 hss = HandleSignalsState(); 2489 hss = HandleSignalsState();
2484 this->ConsumerRemoveAwakable(&waiter, &hss); 2490 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2485 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2491 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2486 hss.satisfied_signals); 2492 hss.satisfied_signals);
2487 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2493 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2488 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2494 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2489 hss.satisfiable_signals); 2495 hss.satisfiable_signals);
2490 2496
2491 // One element available. 2497 // One element available.
2492 num_bytes = 0u; 2498 num_bytes = 0u;
2493 EXPECT_EQ(MOJO_RESULT_OK, 2499 EXPECT_EQ(MOJO_RESULT_OK,
2494 this->ConsumerQueryData(MakeUserPointer(&num_bytes))); 2500 this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
2618 this->ProducerGetOptions(&write_threshold_num_bytes); 2624 this->ProducerGetOptions(&write_threshold_num_bytes);
2619 EXPECT_EQ(0u, write_threshold_num_bytes); 2625 EXPECT_EQ(0u, write_threshold_num_bytes);
2620 2626
2621 Waiter waiter; 2627 Waiter waiter;
2622 HandleSignalsState hss; 2628 HandleSignalsState hss;
2623 2629
2624 // Try to wait to the write threshold signal; it should already have it. 2630 // Try to wait to the write threshold signal; it should already have it.
2625 waiter.Init(); 2631 waiter.Init();
2626 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, 2632 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
2627 this->ProducerAddAwakable( 2633 this->ProducerAddAwakable(
2628 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, false, 0, &hss)); 2634 &waiter, 0, false, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, &hss));
2629 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2635 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2630 hss.satisfied_signals); 2636 hss.satisfied_signals);
2631 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2637 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2632 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2638 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2633 hss.satisfiable_signals); 2639 hss.satisfiable_signals);
2634 2640
2635 // Before writing, add a waiter on the consumer (since we'll need to know when 2641 // Before writing, add a waiter on the consumer (since we'll need to know when
2636 // the written bytes have propagated). 2642 // the written bytes have propagated).
2637 Waiter read_waiter; 2643 Waiter read_waiter;
2638 read_waiter.Init(); 2644 read_waiter.Init();
2639 ASSERT_EQ(MOJO_RESULT_OK, 2645 ASSERT_EQ(MOJO_RESULT_OK,
2640 this->ConsumerAddAwakable(&read_waiter, MOJO_HANDLE_SIGNAL_READABLE, 2646 this->ConsumerAddAwakable(&read_waiter, 0, false,
2641 false, 0, nullptr)); 2647 MOJO_HANDLE_SIGNAL_READABLE, nullptr));
2642 2648
2643 // Write 5 bytes. 2649 // Write 5 bytes.
2644 static const char kTestData[5] = {'A', 'B', 'C', 'D', 'E'}; 2650 static const char kTestData[5] = {'A', 'B', 'C', 'D', 'E'};
2645 uint32_t num_bytes = 5; 2651 uint32_t num_bytes = 5;
2646 EXPECT_EQ(MOJO_RESULT_OK, 2652 EXPECT_EQ(MOJO_RESULT_OK,
2647 this->ProducerWriteData(UserPointer<const void>(kTestData), 2653 this->ProducerWriteData(UserPointer<const void>(kTestData),
2648 MakeUserPointer(&num_bytes), false)); 2654 MakeUserPointer(&num_bytes), false));
2649 EXPECT_EQ(5u, num_bytes); 2655 EXPECT_EQ(5u, num_bytes);
2650 2656
2651 // It should still have the write threshold signal. 2657 // It should still have the write threshold signal.
2652 waiter.Init(); 2658 waiter.Init();
2653 ASSERT_EQ( 2659 ASSERT_EQ(
2654 MOJO_RESULT_ALREADY_EXISTS, 2660 MOJO_RESULT_ALREADY_EXISTS,
2655 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2661 this->ProducerAddAwakable(&waiter, 0, false,
2656 false, 0, nullptr)); 2662 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, nullptr));
2657 2663
2658 // Set the write threshold to 5. 2664 // Set the write threshold to 5.
2659 this->ProducerSetOptions(5); 2665 this->ProducerSetOptions(5);
2660 write_threshold_num_bytes = 123u; 2666 write_threshold_num_bytes = 123u;
2661 this->ProducerGetOptions(&write_threshold_num_bytes); 2667 this->ProducerGetOptions(&write_threshold_num_bytes);
2662 EXPECT_EQ(5u, write_threshold_num_bytes); 2668 EXPECT_EQ(5u, write_threshold_num_bytes);
2663 2669
2664 // Should still have the write threshold signal. 2670 // Should still have the write threshold signal.
2665 waiter.Init(); 2671 waiter.Init();
2666 ASSERT_EQ( 2672 ASSERT_EQ(
2667 MOJO_RESULT_ALREADY_EXISTS, 2673 MOJO_RESULT_ALREADY_EXISTS,
2668 this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2674 this->ProducerAddAwakable(&waiter, 0, false,
2669 false, 0, nullptr)); 2675 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, nullptr));
2670 2676
2671 // Set the write threshold to 6. 2677 // Set the write threshold to 6.
2672 this->ProducerSetOptions(6); 2678 this->ProducerSetOptions(6);
2673 2679
2674 // Should no longer have the write threshold signal. 2680 // Should no longer have the write threshold signal.
2675 waiter.Init(); 2681 waiter.Init();
2676 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable( 2682 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2677 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2683 &waiter, 0, false,
2678 false, 0, nullptr)); 2684 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, nullptr));
2679 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr)); 2685 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr));
2680 hss = HandleSignalsState(); 2686 hss = HandleSignalsState();
2681 this->ProducerRemoveAwakable(&waiter, &hss); 2687 this->ProducerRemoveAwakable(false, &waiter, 0, &hss);
2682 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 2688 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
2683 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2689 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2684 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2690 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2685 hss.satisfiable_signals); 2691 hss.satisfiable_signals);
2686 2692
2687 // Add a waiter. 2693 // Add a waiter.
2688 waiter.Init(); 2694 waiter.Init();
2689 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable( 2695 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2690 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2696 &waiter, 0, false,
2691 false, 0, nullptr)); 2697 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, nullptr));
2692 2698
2693 // Wait for the consumer to be readable. 2699 // Wait for the consumer to be readable.
2694 EXPECT_EQ(MOJO_RESULT_OK, 2700 EXPECT_EQ(MOJO_RESULT_OK,
2695 read_waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2701 read_waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2696 this->ConsumerRemoveAwakable(&read_waiter, nullptr); 2702 this->ConsumerRemoveAwakable(false, &read_waiter, 0, nullptr);
2697 2703
2698 // Read a byte. 2704 // Read a byte.
2699 char read_byte = 'a'; 2705 char read_byte = 'a';
2700 num_bytes = sizeof(read_byte); 2706 num_bytes = sizeof(read_byte);
2701 EXPECT_EQ(MOJO_RESULT_OK, 2707 EXPECT_EQ(MOJO_RESULT_OK,
2702 this->ConsumerReadData(UserPointer<void>(&read_byte), 2708 this->ConsumerReadData(UserPointer<void>(&read_byte),
2703 MakeUserPointer(&num_bytes), true, false)); 2709 MakeUserPointer(&num_bytes), true, false));
2704 EXPECT_EQ(1u, num_bytes); 2710 EXPECT_EQ(1u, num_bytes);
2705 EXPECT_EQ(kTestData[0], read_byte); 2711 EXPECT_EQ(kTestData[0], read_byte);
2706 2712
2707 // Wait; should get the write threshold signal. 2713 // Wait; should get the write threshold signal.
2708 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2714 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2709 hss = HandleSignalsState(); 2715 hss = HandleSignalsState();
2710 this->ProducerRemoveAwakable(&waiter, &hss); 2716 this->ProducerRemoveAwakable(false, &waiter, 0, &hss);
2711 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2717 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2712 hss.satisfied_signals); 2718 hss.satisfied_signals);
2713 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2719 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2714 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2720 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD,
2715 hss.satisfiable_signals); 2721 hss.satisfiable_signals);
2716 2722
2717 // Write 6 more bytes. 2723 // Write 6 more bytes.
2718 static const char kMoreTestData[6] = {'1', '2', '3', '4', '5', '6'}; 2724 static const char kMoreTestData[6] = {'1', '2', '3', '4', '5', '6'};
2719 num_bytes = 6; 2725 num_bytes = 6;
2720 EXPECT_EQ(MOJO_RESULT_OK, 2726 EXPECT_EQ(MOJO_RESULT_OK,
2721 this->ProducerWriteData(UserPointer<const void>(kMoreTestData), 2727 this->ProducerWriteData(UserPointer<const void>(kMoreTestData),
2722 MakeUserPointer(&num_bytes), false)); 2728 MakeUserPointer(&num_bytes), false));
2723 EXPECT_EQ(6u, num_bytes); 2729 EXPECT_EQ(6u, num_bytes);
2724 2730
2725 // Should no longer have the write threshold signal. 2731 // Should no longer have the write threshold signal.
2726 waiter.Init(); 2732 waiter.Init();
2727 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable( 2733 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2728 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2734 &waiter, 0, false,
2729 false, 0, nullptr)); 2735 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, nullptr));
2730 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr)); 2736 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr));
2731 this->ProducerRemoveAwakable(&waiter, nullptr); 2737 this->ProducerRemoveAwakable(false, &waiter, 0, nullptr);
2732 2738
2733 // Set the write threshold to 0 (which means the element size, 1). 2739 // Set the write threshold to 0 (which means the element size, 1).
2734 this->ProducerSetOptions(0); 2740 this->ProducerSetOptions(0);
2735 write_threshold_num_bytes = 123u; 2741 write_threshold_num_bytes = 123u;
2736 this->ProducerGetOptions(&write_threshold_num_bytes); 2742 this->ProducerGetOptions(&write_threshold_num_bytes);
2737 EXPECT_EQ(0u, write_threshold_num_bytes); 2743 EXPECT_EQ(0u, write_threshold_num_bytes);
2738 2744
2739 // Should still not have the write threshold signal. 2745 // Should still not have the write threshold signal.
2740 waiter.Init(); 2746 waiter.Init();
2741 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable( 2747 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2742 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2748 &waiter, 0, false,
2743 false, 0, nullptr)); 2749 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, nullptr));
2744 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr)); 2750 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr));
2745 this->ProducerRemoveAwakable(&waiter, nullptr); 2751 this->ProducerRemoveAwakable(false, &waiter, 0, nullptr);
2746 2752
2747 // Add a waiter. 2753 // Add a waiter.
2748 waiter.Init(); 2754 waiter.Init();
2749 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable( 2755 ASSERT_EQ(MOJO_RESULT_OK, this->ProducerAddAwakable(
2750 &waiter, MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, 2756 &waiter, 0, false,
2751 false, 0, nullptr)); 2757 MOJO_HANDLE_SIGNAL_WRITE_THRESHOLD, nullptr));
2752 2758
2753 // Close the consumer. 2759 // Close the consumer.
2754 this->ConsumerClose(); 2760 this->ConsumerClose();
2755 2761
2756 // Wait; the condition should now never be satisfiable. 2762 // Wait; the condition should now never be satisfiable.
2757 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2763 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2758 waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2764 waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2759 hss = HandleSignalsState(); 2765 hss = HandleSignalsState();
2760 this->ProducerRemoveAwakable(&waiter, &hss); 2766 this->ProducerRemoveAwakable(false, &waiter, 0, &hss);
2761 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2767 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2762 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2768 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2763 2769
2764 this->ProducerClose(); 2770 this->ProducerClose();
2765 } 2771 }
2766 2772
2767 TYPED_TEST(DataPipeImplTest, ReadThreshold) { 2773 TYPED_TEST(DataPipeImplTest, ReadThreshold) {
2768 const MojoCreateDataPipeOptions options = { 2774 const MojoCreateDataPipeOptions options = {
2769 kSizeOfOptions, // |struct_size|. 2775 kSizeOfOptions, // |struct_size|.
2770 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 2776 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
2771 1u, // |element_num_bytes|. 2777 1u, // |element_num_bytes|.
2772 1000u // |capacity_num_bytes|. 2778 1000u // |capacity_num_bytes|.
2773 }; 2779 };
2774 this->Create(options); 2780 this->Create(options);
2775 this->DoTransfer(); 2781 this->DoTransfer();
2776 2782
2777 // The default read threshold should be 0 (which means "default", i.e., one 2783 // The default read threshold should be 0 (which means "default", i.e., one
2778 // element). 2784 // element).
2779 uint32_t read_threshold_num_bytes = 123; 2785 uint32_t read_threshold_num_bytes = 123;
2780 this->ConsumerGetOptions(&read_threshold_num_bytes); 2786 this->ConsumerGetOptions(&read_threshold_num_bytes);
2781 EXPECT_EQ(0u, read_threshold_num_bytes); 2787 EXPECT_EQ(0u, read_threshold_num_bytes);
2782 2788
2783 Waiter waiter; 2789 Waiter waiter;
2784 HandleSignalsState hss; 2790 HandleSignalsState hss;
2785 2791
2786 // Add a waiter. 2792 // Add a waiter.
2787 waiter.Init(); 2793 waiter.Init();
2788 ASSERT_EQ(MOJO_RESULT_OK, 2794 ASSERT_EQ(MOJO_RESULT_OK,
2789 this->ConsumerAddAwakable( 2795 this->ConsumerAddAwakable(
2790 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr)); 2796 &waiter, 0, false, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, nullptr));
2791 // Trivial wait: it shouldn't have the read threshold signal. 2797 // Trivial wait: it shouldn't have the read threshold signal.
2792 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr)); 2798 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr));
2793 hss = HandleSignalsState(); 2799 hss = HandleSignalsState();
2794 this->ConsumerRemoveAwakable(&waiter, &hss); 2800 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2795 EXPECT_EQ(0u, hss.satisfied_signals); 2801 EXPECT_EQ(0u, hss.satisfied_signals);
2796 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2802 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2797 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2803 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2798 hss.satisfiable_signals); 2804 hss.satisfiable_signals);
2799 2805
2800 // Add a waiter. 2806 // Add a waiter.
2801 waiter.Init(); 2807 waiter.Init();
2802 ASSERT_EQ(MOJO_RESULT_OK, 2808 ASSERT_EQ(MOJO_RESULT_OK,
2803 this->ConsumerAddAwakable( 2809 this->ConsumerAddAwakable(
2804 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr)); 2810 &waiter, 0, false, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, nullptr));
2805 2811
2806 // Write a byte. 2812 // Write a byte.
2807 const char kTestData[] = {'x'}; 2813 const char kTestData[] = {'x'};
2808 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 2814 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
2809 uint32_t num_bytes = kTestDataSize; 2815 uint32_t num_bytes = kTestDataSize;
2810 EXPECT_EQ(MOJO_RESULT_OK, 2816 EXPECT_EQ(MOJO_RESULT_OK,
2811 this->ProducerWriteData(UserPointer<const void>(kTestData), 2817 this->ProducerWriteData(UserPointer<const void>(kTestData),
2812 MakeUserPointer(&num_bytes), false)); 2818 MakeUserPointer(&num_bytes), false));
2813 EXPECT_EQ(kTestDataSize, num_bytes); 2819 EXPECT_EQ(kTestDataSize, num_bytes);
2814 2820
2815 // Wait for the read threshold signal. 2821 // Wait for the read threshold signal.
2816 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2822 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2817 hss = HandleSignalsState(); 2823 hss = HandleSignalsState();
2818 this->ConsumerRemoveAwakable(&waiter, &hss); 2824 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2819 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2825 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2820 hss.satisfied_signals); 2826 hss.satisfied_signals);
2821 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2827 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2822 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2828 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2823 hss.satisfiable_signals); 2829 hss.satisfiable_signals);
2824 2830
2825 // Set the read threshold to 1. 2831 // Set the read threshold to 1.
2826 this->ConsumerSetOptions(1); 2832 this->ConsumerSetOptions(1);
2827 read_threshold_num_bytes = 123u; 2833 read_threshold_num_bytes = 123u;
2828 this->ConsumerGetOptions(&read_threshold_num_bytes); 2834 this->ConsumerGetOptions(&read_threshold_num_bytes);
2829 EXPECT_EQ(1u, read_threshold_num_bytes); 2835 EXPECT_EQ(1u, read_threshold_num_bytes);
2830 2836
2831 // Try to add a waiter: it should (still) already have the read threshold 2837 // Try to add a waiter: it should (still) already have the read threshold
2832 // signal. 2838 // signal.
2833 waiter.Init(); 2839 waiter.Init();
2834 hss = HandleSignalsState(); 2840 hss = HandleSignalsState();
2835 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, 2841 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
2836 this->ConsumerAddAwakable( 2842 this->ConsumerAddAwakable(&waiter, 0, false,
2837 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, &hss)); 2843 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, &hss));
2838 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2844 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2839 hss.satisfied_signals); 2845 hss.satisfied_signals);
2840 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2846 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2841 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2847 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2842 hss.satisfiable_signals); 2848 hss.satisfiable_signals);
2843 2849
2844 // Set the read threshold to 2. 2850 // Set the read threshold to 2.
2845 this->ConsumerSetOptions(2); 2851 this->ConsumerSetOptions(2);
2846 read_threshold_num_bytes = 123u; 2852 read_threshold_num_bytes = 123u;
2847 this->ConsumerGetOptions(&read_threshold_num_bytes); 2853 this->ConsumerGetOptions(&read_threshold_num_bytes);
2848 EXPECT_EQ(2u, read_threshold_num_bytes); 2854 EXPECT_EQ(2u, read_threshold_num_bytes);
2849 2855
2850 // Add a waiter. 2856 // Add a waiter.
2851 waiter.Init(); 2857 waiter.Init();
2852 ASSERT_EQ(MOJO_RESULT_OK, 2858 ASSERT_EQ(MOJO_RESULT_OK,
2853 this->ConsumerAddAwakable( 2859 this->ConsumerAddAwakable(
2854 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr)); 2860 &waiter, 0, false, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, nullptr));
2855 2861
2856 // Write another byte. 2862 // Write another byte.
2857 num_bytes = kTestDataSize; 2863 num_bytes = kTestDataSize;
2858 EXPECT_EQ(MOJO_RESULT_OK, 2864 EXPECT_EQ(MOJO_RESULT_OK,
2859 this->ProducerWriteData(UserPointer<const void>(kTestData), 2865 this->ProducerWriteData(UserPointer<const void>(kTestData),
2860 MakeUserPointer(&num_bytes), false)); 2866 MakeUserPointer(&num_bytes), false));
2861 EXPECT_EQ(kTestDataSize, num_bytes); 2867 EXPECT_EQ(kTestDataSize, num_bytes);
2862 2868
2863 // Wait for the read threshold signal. 2869 // Wait for the read threshold signal.
2864 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2870 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2865 hss = HandleSignalsState(); 2871 hss = HandleSignalsState();
2866 this->ConsumerRemoveAwakable(&waiter, &hss); 2872 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2867 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2873 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2868 hss.satisfied_signals); 2874 hss.satisfied_signals);
2869 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2875 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2870 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2876 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2871 hss.satisfiable_signals); 2877 hss.satisfiable_signals);
2872 2878
2873 // Read one byte. 2879 // Read one byte.
2874 char read_byte = 'a'; 2880 char read_byte = 'a';
2875 num_bytes = sizeof(read_byte); 2881 num_bytes = sizeof(read_byte);
2876 EXPECT_EQ(MOJO_RESULT_OK, 2882 EXPECT_EQ(MOJO_RESULT_OK,
2877 this->ConsumerReadData(UserPointer<void>(&read_byte), 2883 this->ConsumerReadData(UserPointer<void>(&read_byte),
2878 MakeUserPointer(&num_bytes), true, false)); 2884 MakeUserPointer(&num_bytes), true, false));
2879 EXPECT_EQ(1u, num_bytes); 2885 EXPECT_EQ(1u, num_bytes);
2880 EXPECT_EQ(kTestData[0], read_byte); 2886 EXPECT_EQ(kTestData[0], read_byte);
2881 2887
2882 // Add a waiter. 2888 // Add a waiter.
2883 waiter.Init(); 2889 waiter.Init();
2884 ASSERT_EQ(MOJO_RESULT_OK, 2890 ASSERT_EQ(MOJO_RESULT_OK,
2885 this->ConsumerAddAwakable( 2891 this->ConsumerAddAwakable(
2886 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr)); 2892 &waiter, 0, false, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, nullptr));
2887 // Trivial wait: it shouldn't have the read threshold signal. 2893 // Trivial wait: it shouldn't have the read threshold signal.
2888 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr)); 2894 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr));
2889 hss = HandleSignalsState(); 2895 hss = HandleSignalsState();
2890 this->ConsumerRemoveAwakable(&waiter, &hss); 2896 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2891 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 2897 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
2892 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2898 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2893 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2899 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2894 hss.satisfiable_signals); 2900 hss.satisfiable_signals);
2895 2901
2896 // Add a waiter. 2902 // Add a waiter.
2897 waiter.Init(); 2903 waiter.Init();
2898 ASSERT_EQ(MOJO_RESULT_OK, 2904 ASSERT_EQ(MOJO_RESULT_OK,
2899 this->ConsumerAddAwakable( 2905 this->ConsumerAddAwakable(
2900 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr)); 2906 &waiter, 0, false, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, nullptr));
2901 // Trivial wait: it shouldn't have the read threshold signal. 2907 // Trivial wait: it shouldn't have the read threshold signal.
2902 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr)); 2908 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr, nullptr));
2903 hss = HandleSignalsState(); 2909 hss = HandleSignalsState();
2904 this->ConsumerRemoveAwakable(&waiter, &hss); 2910 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2905 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 2911 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
2906 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2912 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2907 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2913 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2908 hss.satisfiable_signals); 2914 hss.satisfiable_signals);
2909 2915
2910 // Add a waiter. 2916 // Add a waiter.
2911 waiter.Init(); 2917 waiter.Init();
2912 ASSERT_EQ(MOJO_RESULT_OK, 2918 ASSERT_EQ(MOJO_RESULT_OK,
2913 this->ConsumerAddAwakable( 2919 this->ConsumerAddAwakable(
2914 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, nullptr)); 2920 &waiter, 0, false, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, nullptr));
2915 2921
2916 // Close the producer. 2922 // Close the producer.
2917 this->ProducerClose(); 2923 this->ProducerClose();
2918 2924
2919 // Wait; the current read threshold becomes never satisfiable. 2925 // Wait; the current read threshold becomes never satisfiable.
2920 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2926 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2921 waiter.Wait(test::TinyTimeout(), nullptr, nullptr)); 2927 waiter.Wait(test::TinyTimeout(), nullptr, nullptr));
2922 hss = HandleSignalsState(); 2928 hss = HandleSignalsState();
2923 this->ConsumerRemoveAwakable(&waiter, &hss); 2929 this->ConsumerRemoveAwakable(false, &waiter, 0, &hss);
2924 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2930 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2925 hss.satisfied_signals); 2931 hss.satisfied_signals);
2926 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2932 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
2927 hss.satisfiable_signals); 2933 hss.satisfiable_signals);
2928 2934
2929 // Set the read threshold back to zero to 0. 2935 // Set the read threshold back to zero to 0.
2930 this->ConsumerSetOptions(0); 2936 this->ConsumerSetOptions(0);
2931 read_threshold_num_bytes = 123u; 2937 read_threshold_num_bytes = 123u;
2932 this->ConsumerGetOptions(&read_threshold_num_bytes); 2938 this->ConsumerGetOptions(&read_threshold_num_bytes);
2933 // "Get options" should preserve 0 (and not set it to the element size). 2939 // "Get options" should preserve 0 (and not set it to the element size).
2934 EXPECT_EQ(0u, read_threshold_num_bytes); 2940 EXPECT_EQ(0u, read_threshold_num_bytes);
2935 2941
2936 // Try to add a waiter: it should have the read threshold signal. 2942 // Try to add a waiter: it should have the read threshold signal.
2937 waiter.Init(); 2943 waiter.Init();
2938 hss = HandleSignalsState(); 2944 hss = HandleSignalsState();
2939 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, 2945 ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
2940 this->ConsumerAddAwakable( 2946 this->ConsumerAddAwakable(&waiter, 0, false,
2941 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, &hss)); 2947 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, &hss));
2942 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2948 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2943 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2949 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2944 hss.satisfied_signals); 2950 hss.satisfied_signals);
2945 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 2951 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
2946 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 2952 MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
2947 hss.satisfiable_signals); 2953 hss.satisfiable_signals);
2948 2954
2949 // Read the other byte. 2955 // Read the other byte.
2950 read_byte = 'a'; 2956 read_byte = 'a';
2951 num_bytes = sizeof(read_byte); 2957 num_bytes = sizeof(read_byte);
2952 EXPECT_EQ(MOJO_RESULT_OK, 2958 EXPECT_EQ(MOJO_RESULT_OK,
2953 this->ConsumerReadData(UserPointer<void>(&read_byte), 2959 this->ConsumerReadData(UserPointer<void>(&read_byte),
2954 MakeUserPointer(&num_bytes), true, false)); 2960 MakeUserPointer(&num_bytes), true, false));
2955 EXPECT_EQ(1u, num_bytes); 2961 EXPECT_EQ(1u, num_bytes);
2956 EXPECT_EQ(kTestData[0], read_byte); 2962 EXPECT_EQ(kTestData[0], read_byte);
2957 2963
2958 // Try to add a waiter: the read threshold signal should be unsatisfiable. 2964 // Try to add a waiter: the read threshold signal should be unsatisfiable.
2959 waiter.Init(); 2965 waiter.Init();
2960 hss = HandleSignalsState(); 2966 hss = HandleSignalsState();
2961 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 2967 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
2962 this->ConsumerAddAwakable( 2968 this->ConsumerAddAwakable(&waiter, 0, false,
2963 &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, false, 0, &hss)); 2969 MOJO_HANDLE_SIGNAL_READ_THRESHOLD, &hss));
2964 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 2970 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
2965 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 2971 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
2966 2972
2967 this->ConsumerClose(); 2973 this->ConsumerClose();
2968 } 2974 }
2969 2975
2970 } // namespace 2976 } // namespace
2971 } // namespace system 2977 } // namespace system
2972 } // namespace mojo 2978 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.cc ('k') | mojo/edk/system/data_pipe_producer_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698