| Index: mojo/system/data_pipe.cc
|
| diff --git a/mojo/system/data_pipe.cc b/mojo/system/data_pipe.cc
|
| index dac187ecc3fe7d4129716a69aa5a4825a4019fcb..639bb5e7d20b3278060658d69176291c76edbc9b 100644
|
| --- a/mojo/system/data_pipe.cc
|
| +++ b/mojo/system/data_pipe.cc
|
| @@ -79,6 +79,7 @@ void DataPipe::ProducerClose() {
|
| << "Producer closed with active two-phase write";
|
| producer_two_phase_max_num_bytes_written_ = 0;
|
| ProducerCloseImplNoLock();
|
| + AwakeConsumerWaitersForStateChangeNoLock();
|
| }
|
|
|
| MojoResult DataPipe::ProducerWriteData(const void* elements,
|
| @@ -99,7 +100,11 @@ MojoResult DataPipe::ProducerWriteData(const void* elements,
|
| if (*num_bytes == 0)
|
| return MOJO_RESULT_OK; // Nothing to do.
|
|
|
| - return ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none);
|
| + MojoWaitFlags old_consumer_satisfied_flags = ConsumerSatisfiedFlagsNoLock();
|
| + MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none);
|
| + if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags)
|
| + AwakeConsumerWaitersForStateChangeNoLock();
|
| + return rv;
|
| }
|
|
|
| MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
|
| @@ -117,7 +122,10 @@ MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
|
| all_or_none);
|
| if (rv != MOJO_RESULT_OK)
|
| return rv;
|
| -
|
| + // Note: No need to awake producer waiters, even though we're going from
|
| + // writable to non-writable (since you can't wait on non-writability).
|
| + // Similarly, though this may have discarded data (in "may discard" mode),
|
| + // making it non-readable, there's still no need to awake consumer waiters.
|
| DCHECK(producer_in_two_phase_write_no_lock());
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -131,9 +139,16 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
|
| // Note: Allow successful completion of the two-phase write even if the
|
| // consumer has been closed.
|
|
|
| + MojoWaitFlags old_consumer_satisfied_flags = ConsumerSatisfiedFlagsNoLock();
|
| MojoResult rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
|
| // Two-phase write ended even on failure.
|
| DCHECK(!producer_in_two_phase_write_no_lock());
|
| + // If we're now writable, we *became* writable (since we weren't writable
|
| + // during the two-phase write), so awake producer waiters.
|
| + if ((ProducerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_WRITABLE))
|
| + AwakeProducerWaitersForStateChangeNoLock();
|
| + if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags)
|
| + AwakeConsumerWaitersForStateChangeNoLock();
|
| return rv;
|
| }
|
|
|
| @@ -175,6 +190,7 @@ void DataPipe::ConsumerClose() {
|
| << "Consumer closed with active two-phase read";
|
| consumer_two_phase_max_num_bytes_read_ = 0;
|
| ConsumerCloseImplNoLock();
|
| + AwakeProducerWaitersForStateChangeNoLock();
|
| }
|
|
|
| MojoResult DataPipe::ConsumerReadData(void* elements,
|
| @@ -192,7 +208,11 @@ MojoResult DataPipe::ConsumerReadData(void* elements,
|
| if (*num_bytes == 0)
|
| return MOJO_RESULT_OK; // Nothing to do.
|
|
|
| - return ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none);
|
| + MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
|
| + MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none);
|
| + if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
|
| + AwakeProducerWaitersForStateChangeNoLock();
|
| + return rv;
|
| }
|
|
|
| MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes,
|
| @@ -209,7 +229,11 @@ MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes,
|
| if (*num_bytes == 0)
|
| return MOJO_RESULT_OK; // Nothing to do.
|
|
|
| - return ConsumerDiscardDataImplNoLock(num_bytes, all_or_none);
|
| + MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
|
| + MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none);
|
| + if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
|
| + AwakeProducerWaitersForStateChangeNoLock();
|
| + return rv;
|
| }
|
|
|
| MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) {
|
| @@ -236,7 +260,6 @@ MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
|
| all_or_none);
|
| if (rv != MOJO_RESULT_OK)
|
| return rv;
|
| -
|
| DCHECK(consumer_in_two_phase_read_no_lock());
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -248,9 +271,16 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
|
| if (!consumer_in_two_phase_read_no_lock())
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| + MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
|
| MojoResult rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
|
| // Two-phase read ended even on failure.
|
| DCHECK(!consumer_in_two_phase_read_no_lock());
|
| + // If we're now readable, we *became* readable (since we weren't readable
|
| + // during the two-phase read), so awake consumer waiters.
|
| + if ((ConsumerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_READABLE))
|
| + AwakeConsumerWaitersForStateChangeNoLock();
|
| + if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
|
| + AwakeProducerWaitersForStateChangeNoLock();
|
| return rv;
|
| }
|
|
|
|
|