Index: third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc b/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc |
index 3ca76d31891b461527876d2fdd334336c566a5c2..938ce68c651637daa55966d3203c57ed753dea54 100644 |
--- a/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc |
+++ b/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc |
@@ -14,13 +14,30 @@ |
#include <algorithm> |
+#include "base/compiler_specific.h" |
#include "base/logging.h" |
+#include "base/memory/scoped_ptr.h" |
+#include "mojo/edk/system/channel.h" |
#include "mojo/edk/system/configuration.h" |
#include "mojo/edk/system/data_pipe.h" |
+#include "mojo/edk/system/message_in_transit.h" |
+#include "mojo/edk/system/message_in_transit_queue.h" |
+#include "mojo/edk/system/remote_consumer_data_pipe_impl.h" |
+#include "mojo/edk/system/remote_producer_data_pipe_impl.h" |
namespace mojo { |
namespace system { |
+// Assert some things about some things defined in data_pipe_impl.h (don't make |
+// the assertions there, to avoid including message_in_transit.h). |
+static_assert(ALIGNOF(SerializedDataPipeConsumerDispatcher) == |
+ MessageInTransit::kMessageAlignment, |
+ "Wrong alignment"); |
+static_assert(sizeof(SerializedDataPipeConsumerDispatcher) % |
+ MessageInTransit::kMessageAlignment == |
+ 0, |
+ "Wrong size"); |
+ |
LocalDataPipeImpl::LocalDataPipeImpl() |
: start_index_(0), current_num_bytes_(0) { |
// Note: |buffer_| is lazily allocated, since a common case will be that one |
@@ -51,39 +68,25 @@ MojoResult LocalDataPipeImpl::ProducerWriteData( |
DCHECK_EQ(max_num_bytes_to_write % element_num_bytes(), 0u); |
DCHECK_EQ(min_num_bytes_to_write % element_num_bytes(), 0u); |
DCHECK_GT(max_num_bytes_to_write, 0u); |
+ DCHECK_GE(max_num_bytes_to_write, min_num_bytes_to_write); |
DCHECK(consumer_open()); |
- size_t num_bytes_to_write = 0; |
- if (may_discard()) { |
- if (min_num_bytes_to_write > capacity_num_bytes()) |
- return MOJO_RESULT_OUT_OF_RANGE; |
- |
- num_bytes_to_write = std::min(static_cast<size_t>(max_num_bytes_to_write), |
- capacity_num_bytes()); |
- if (num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) { |
- // Discard as much as needed (discard oldest first). |
- MarkDataAsConsumed(num_bytes_to_write - |
- (capacity_num_bytes() - current_num_bytes_)); |
- // No need to wake up write waiters, since we're definitely going to leave |
- // the buffer full. |
- } |
- } else { |
- if (min_num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) { |
- // Don't return "should wait" since you can't wait for a specified amount |
- // of data. |
- return MOJO_RESULT_OUT_OF_RANGE; |
- } |
- |
- num_bytes_to_write = std::min(static_cast<size_t>(max_num_bytes_to_write), |
- capacity_num_bytes() - current_num_bytes_); |
+ if (min_num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) { |
+ // Don't return "should wait" since you can't wait for a specified amount |
+ // of data. |
+ return MOJO_RESULT_OUT_OF_RANGE; |
} |
+ |
+ size_t num_bytes_to_write = |
+ std::min(static_cast<size_t>(max_num_bytes_to_write), |
+ capacity_num_bytes() - current_num_bytes_); |
if (num_bytes_to_write == 0) |
return MOJO_RESULT_SHOULD_WAIT; |
- // The amount we can write in our first |memcpy()|. |
+ // The amount we can write in our first copy. |
size_t num_bytes_to_write_first = |
std::min(num_bytes_to_write, GetMaxNumBytesToWrite()); |
- // Do the first (and possibly only) |memcpy()|. |
+ // Do the first (and possibly only) copy. |
size_t first_write_index = |
(start_index_ + current_num_bytes_) % capacity_num_bytes(); |
EnsureBuffer(); |
@@ -114,21 +117,9 @@ MojoResult LocalDataPipeImpl::ProducerBeginWriteData( |
size_t max_num_bytes_to_write = GetMaxNumBytesToWrite(); |
if (min_num_bytes_to_write > max_num_bytes_to_write) { |
- // In "may discard" mode, we can always write from the write index to the |
- // end of the buffer. |
- if (may_discard() && |
- min_num_bytes_to_write <= capacity_num_bytes() - write_index) { |
- // To do so, we need to discard an appropriate amount of data. |
- // We should only reach here if the start index is after the write index! |
- DCHECK_GE(start_index_, write_index); |
- DCHECK_GT(min_num_bytes_to_write - max_num_bytes_to_write, 0u); |
- MarkDataAsConsumed(min_num_bytes_to_write - max_num_bytes_to_write); |
- max_num_bytes_to_write = min_num_bytes_to_write; |
- } else { |
- // Don't return "should wait" since you can't wait for a specified amount |
- // of data. |
- return MOJO_RESULT_OUT_OF_RANGE; |
- } |
+ // Don't return "should wait" since you can't wait for a specified amount |
+ // of data. |
+ return MOJO_RESULT_OUT_OF_RANGE; |
} |
// Don't go into a two-phase write if there's no room. |
@@ -145,6 +136,7 @@ MojoResult LocalDataPipeImpl::ProducerBeginWriteData( |
MojoResult LocalDataPipeImpl::ProducerEndWriteData(uint32_t num_bytes_written) { |
DCHECK_LE(num_bytes_written, producer_two_phase_max_num_bytes_written()); |
+ DCHECK_EQ(num_bytes_written % element_num_bytes(), 0u); |
current_num_bytes_ += num_bytes_written; |
DCHECK_LE(current_num_bytes_, capacity_num_bytes()); |
set_producer_two_phase_max_num_bytes_written(0); |
@@ -154,7 +146,7 @@ MojoResult LocalDataPipeImpl::ProducerEndWriteData(uint32_t num_bytes_written) { |
HandleSignalsState LocalDataPipeImpl::ProducerGetHandleSignalsState() const { |
HandleSignalsState rv; |
if (consumer_open()) { |
- if ((may_discard() || current_num_bytes_ < capacity_num_bytes()) && |
+ if (current_num_bytes_ < capacity_num_bytes() && |
!producer_in_two_phase_write()) |
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
@@ -168,8 +160,8 @@ HandleSignalsState LocalDataPipeImpl::ProducerGetHandleSignalsState() const { |
void LocalDataPipeImpl::ProducerStartSerialize(Channel* channel, |
size_t* max_size, |
size_t* max_platform_handles) { |
- // TODO(vtl): Support serializing producer data pipe handles. |
- *max_size = 0; |
+ *max_size = sizeof(SerializedDataPipeProducerDispatcher) + |
+ channel->GetSerializedEndpointSize(); |
*max_platform_handles = 0; |
} |
@@ -178,9 +170,36 @@ bool LocalDataPipeImpl::ProducerEndSerialize( |
void* destination, |
size_t* actual_size, |
embedder::PlatformHandleVector* platform_handles) { |
- // TODO(vtl): Support serializing producer data pipe handles. |
- owner()->ProducerCloseNoLock(); |
- return false; |
+ SerializedDataPipeProducerDispatcher* s = |
+ static_cast<SerializedDataPipeProducerDispatcher*>(destination); |
+ s->validated_options = validated_options(); |
+ void* destination_for_endpoint = static_cast<char*>(destination) + |
+ sizeof(SerializedDataPipeProducerDispatcher); |
+ |
+ if (!consumer_open()) { |
+ // Case 1: The consumer is closed. |
+ s->consumer_num_bytes = static_cast<size_t>(-1); |
+ *actual_size = sizeof(SerializedDataPipeProducerDispatcher); |
+ return true; |
+ } |
+ |
+ // Case 2: The consumer isn't closed. We'll replace ourselves with a |
+ // |RemoteProducerDataPipeImpl|. |
+ |
+ s->consumer_num_bytes = current_num_bytes_; |
+ // Note: We don't use |port|. |
+ scoped_refptr<ChannelEndpoint> channel_endpoint = |
+ channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr, |
+ owner(), 0); |
+ // Note: Keep |*this| alive until the end of this method, to make things |
+ // slightly easier on ourselves. |
+ scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr( |
+ new RemoteProducerDataPipeImpl(channel_endpoint.get(), buffer_.Pass(), |
+ start_index_, current_num_bytes_)))); |
+ |
+ *actual_size = sizeof(SerializedDataPipeProducerDispatcher) + |
+ channel->GetSerializedEndpointSize(); |
+ return true; |
} |
void LocalDataPipeImpl::ConsumerClose() { |
@@ -215,7 +234,7 @@ MojoResult LocalDataPipeImpl::ConsumerReadData(UserPointer<void> elements, |
: MOJO_RESULT_FAILED_PRECONDITION; |
} |
- // The amount we can read in our first |memcpy()|. |
+ // The amount we can read in our first copy. |
size_t num_bytes_to_read_first = |
std::min(num_bytes_to_read, GetMaxNumBytesToRead()); |
elements.PutArray(buffer_.get() + start_index_, num_bytes_to_read_first); |
@@ -294,6 +313,7 @@ MojoResult LocalDataPipeImpl::ConsumerBeginReadData( |
MojoResult LocalDataPipeImpl::ConsumerEndReadData(uint32_t num_bytes_read) { |
DCHECK_LE(num_bytes_read, consumer_two_phase_max_num_bytes_read()); |
+ DCHECK_EQ(num_bytes_read % element_num_bytes(), 0u); |
DCHECK_LE(start_index_ + num_bytes_read, capacity_num_bytes()); |
MarkDataAsConsumed(num_bytes_read); |
set_consumer_two_phase_max_num_bytes_read(0); |
@@ -318,8 +338,8 @@ HandleSignalsState LocalDataPipeImpl::ConsumerGetHandleSignalsState() const { |
void LocalDataPipeImpl::ConsumerStartSerialize(Channel* channel, |
size_t* max_size, |
size_t* max_platform_handles) { |
- // TODO(vtl): Support serializing consumer data pipe handles. |
- *max_size = 0; |
+ *max_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
+ channel->GetSerializedEndpointSize(); |
*max_platform_handles = 0; |
} |
@@ -328,11 +348,55 @@ bool LocalDataPipeImpl::ConsumerEndSerialize( |
void* destination, |
size_t* actual_size, |
embedder::PlatformHandleVector* platform_handles) { |
- // TODO(vtl): Support serializing consumer data pipe handles. |
- owner()->ConsumerCloseNoLock(); |
+ SerializedDataPipeConsumerDispatcher* s = |
+ static_cast<SerializedDataPipeConsumerDispatcher*>(destination); |
+ s->validated_options = validated_options(); |
+ void* destination_for_endpoint = static_cast<char*>(destination) + |
+ sizeof(SerializedDataPipeConsumerDispatcher); |
+ |
+ size_t old_num_bytes = current_num_bytes_; |
+ MessageInTransitQueue message_queue; |
+ ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, |
+ &message_queue); |
+ start_index_ = 0; |
+ current_num_bytes_ = 0; |
+ |
+ if (!producer_open()) { |
+ // Case 1: The producer is closed. |
+ channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, |
+ &message_queue); |
+ *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
+ channel->GetSerializedEndpointSize(); |
+ return true; |
+ } |
+ |
+ // Case 2: The producer isn't closed. We'll replace ourselves with a |
+ // |RemoteConsumerDataPipeImpl|. |
+ |
+ // Note: We don't use |port|. |
+ scoped_refptr<ChannelEndpoint> channel_endpoint = |
+ channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, |
+ &message_queue, owner(), 0); |
+ // Note: Keep |*this| alive until the end of this method, to make things |
+ // slightly easier on ourselves. |
+ scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr( |
+ new RemoteConsumerDataPipeImpl(channel_endpoint.get(), old_num_bytes)))); |
+ |
+ *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
+ channel->GetSerializedEndpointSize(); |
+ return true; |
+} |
+ |
+bool LocalDataPipeImpl::OnReadMessage(unsigned /*port*/, |
+ MessageInTransit* /*message*/) { |
+ NOTREACHED(); |
return false; |
} |
+void LocalDataPipeImpl::OnDetachFromChannel(unsigned /*port*/) { |
+ NOTREACHED(); |
+} |
+ |
void LocalDataPipeImpl::EnsureBuffer() { |
DCHECK(producer_open()); |
if (buffer_) |