Index: mojo/system/data_pipe.cc |
diff --git a/mojo/system/data_pipe.cc b/mojo/system/data_pipe.cc |
index dac187ecc3fe7d4129716a69aa5a4825a4019fcb..639bb5e7d20b3278060658d69176291c76edbc9b 100644 |
--- a/mojo/system/data_pipe.cc |
+++ b/mojo/system/data_pipe.cc |
@@ -79,6 +79,7 @@ void DataPipe::ProducerClose() { |
<< "Producer closed with active two-phase write"; |
producer_two_phase_max_num_bytes_written_ = 0; |
ProducerCloseImplNoLock(); |
+ AwakeConsumerWaitersForStateChangeNoLock(); |
} |
MojoResult DataPipe::ProducerWriteData(const void* elements, |
@@ -99,7 +100,11 @@ MojoResult DataPipe::ProducerWriteData(const void* elements, |
if (*num_bytes == 0) |
return MOJO_RESULT_OK; // Nothing to do. |
- return ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none); |
+ MojoWaitFlags old_consumer_satisfied_flags = ConsumerSatisfiedFlagsNoLock(); |
+ MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none); |
+ if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags) |
+ AwakeConsumerWaitersForStateChangeNoLock(); |
+ return rv; |
} |
MojoResult DataPipe::ProducerBeginWriteData(void** buffer, |
@@ -117,7 +122,10 @@ MojoResult DataPipe::ProducerBeginWriteData(void** buffer, |
all_or_none); |
if (rv != MOJO_RESULT_OK) |
return rv; |
- |
+ // Note: No need to awake producer waiters, even though we're going from |
+ // writable to non-writable (since you can't wait on non-writability). |
+ // Similarly, though this may have discarded data (in "may discard" mode), |
+ // making it non-readable, there's still no need to awake consumer waiters. |
DCHECK(producer_in_two_phase_write_no_lock()); |
return MOJO_RESULT_OK; |
} |
@@ -131,9 +139,16 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { |
// Note: Allow successful completion of the two-phase write even if the |
// consumer has been closed. |
+ MojoWaitFlags old_consumer_satisfied_flags = ConsumerSatisfiedFlagsNoLock(); |
MojoResult rv = ProducerEndWriteDataImplNoLock(num_bytes_written); |
// Two-phase write ended even on failure. |
DCHECK(!producer_in_two_phase_write_no_lock()); |
+ // If we're now writable, we *became* writable (since we weren't writable |
+ // during the two-phase write), so awake producer waiters. |
+ if ((ProducerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_WRITABLE)) |
+ AwakeProducerWaitersForStateChangeNoLock(); |
+ if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags) |
+ AwakeConsumerWaitersForStateChangeNoLock(); |
return rv; |
} |
@@ -175,6 +190,7 @@ void DataPipe::ConsumerClose() { |
<< "Consumer closed with active two-phase read"; |
consumer_two_phase_max_num_bytes_read_ = 0; |
ConsumerCloseImplNoLock(); |
+ AwakeProducerWaitersForStateChangeNoLock(); |
} |
MojoResult DataPipe::ConsumerReadData(void* elements, |
@@ -192,7 +208,11 @@ MojoResult DataPipe::ConsumerReadData(void* elements, |
if (*num_bytes == 0) |
return MOJO_RESULT_OK; // Nothing to do. |
- return ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none); |
+ MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock(); |
+ MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none); |
+ if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags) |
+ AwakeProducerWaitersForStateChangeNoLock(); |
+ return rv; |
} |
MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes, |
@@ -209,7 +229,11 @@ MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes, |
if (*num_bytes == 0) |
return MOJO_RESULT_OK; // Nothing to do. |
- return ConsumerDiscardDataImplNoLock(num_bytes, all_or_none); |
+ MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock(); |
+ MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none); |
+ if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags) |
+ AwakeProducerWaitersForStateChangeNoLock(); |
+ return rv; |
} |
MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) { |
@@ -236,7 +260,6 @@ MojoResult DataPipe::ConsumerBeginReadData(const void** buffer, |
all_or_none); |
if (rv != MOJO_RESULT_OK) |
return rv; |
- |
DCHECK(consumer_in_two_phase_read_no_lock()); |
return MOJO_RESULT_OK; |
} |
@@ -248,9 +271,16 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { |
if (!consumer_in_two_phase_read_no_lock()) |
return MOJO_RESULT_FAILED_PRECONDITION; |
+ MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock(); |
MojoResult rv = ConsumerEndReadDataImplNoLock(num_bytes_read); |
// Two-phase read ended even on failure. |
DCHECK(!consumer_in_two_phase_read_no_lock()); |
+ // If we're now readable, we *became* readable (since we weren't readable |
+ // during the two-phase read), so awake consumer waiters. |
+ if ((ConsumerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_READABLE)) |
+ AwakeConsumerWaitersForStateChangeNoLock(); |
+ if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags) |
+ AwakeProducerWaitersForStateChangeNoLock(); |
return rv; |
} |