| Index: third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc
|
| diff --git a/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc b/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc
|
| index 3ca76d31891b461527876d2fdd334336c566a5c2..938ce68c651637daa55966d3203c57ed753dea54 100644
|
| --- a/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc
|
| +++ b/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc
|
| @@ -14,13 +14,30 @@
|
|
|
| #include <algorithm>
|
|
|
| +#include "base/compiler_specific.h"
|
| #include "base/logging.h"
|
| +#include "base/memory/scoped_ptr.h"
|
| +#include "mojo/edk/system/channel.h"
|
| #include "mojo/edk/system/configuration.h"
|
| #include "mojo/edk/system/data_pipe.h"
|
| +#include "mojo/edk/system/message_in_transit.h"
|
| +#include "mojo/edk/system/message_in_transit_queue.h"
|
| +#include "mojo/edk/system/remote_consumer_data_pipe_impl.h"
|
| +#include "mojo/edk/system/remote_producer_data_pipe_impl.h"
|
|
|
| namespace mojo {
|
| namespace system {
|
|
|
| +// Assert some things about some things defined in data_pipe_impl.h (don't make
|
| +// the assertions there, to avoid including message_in_transit.h).
|
| +static_assert(ALIGNOF(SerializedDataPipeConsumerDispatcher) ==
|
| + MessageInTransit::kMessageAlignment,
|
| + "Wrong alignment");
|
| +static_assert(sizeof(SerializedDataPipeConsumerDispatcher) %
|
| + MessageInTransit::kMessageAlignment ==
|
| + 0,
|
| + "Wrong size");
|
| +
|
| LocalDataPipeImpl::LocalDataPipeImpl()
|
| : start_index_(0), current_num_bytes_(0) {
|
| // Note: |buffer_| is lazily allocated, since a common case will be that one
|
| @@ -51,39 +68,25 @@ MojoResult LocalDataPipeImpl::ProducerWriteData(
|
| DCHECK_EQ(max_num_bytes_to_write % element_num_bytes(), 0u);
|
| DCHECK_EQ(min_num_bytes_to_write % element_num_bytes(), 0u);
|
| DCHECK_GT(max_num_bytes_to_write, 0u);
|
| + DCHECK_GE(max_num_bytes_to_write, min_num_bytes_to_write);
|
| DCHECK(consumer_open());
|
|
|
| - size_t num_bytes_to_write = 0;
|
| - if (may_discard()) {
|
| - if (min_num_bytes_to_write > capacity_num_bytes())
|
| - return MOJO_RESULT_OUT_OF_RANGE;
|
| -
|
| - num_bytes_to_write = std::min(static_cast<size_t>(max_num_bytes_to_write),
|
| - capacity_num_bytes());
|
| - if (num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
|
| - // Discard as much as needed (discard oldest first).
|
| - MarkDataAsConsumed(num_bytes_to_write -
|
| - (capacity_num_bytes() - current_num_bytes_));
|
| - // No need to wake up write waiters, since we're definitely going to leave
|
| - // the buffer full.
|
| - }
|
| - } else {
|
| - if (min_num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
|
| - // Don't return "should wait" since you can't wait for a specified amount
|
| - // of data.
|
| - return MOJO_RESULT_OUT_OF_RANGE;
|
| - }
|
| -
|
| - num_bytes_to_write = std::min(static_cast<size_t>(max_num_bytes_to_write),
|
| - capacity_num_bytes() - current_num_bytes_);
|
| + if (min_num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
|
| + // Don't return "should wait" since you can't wait for a specified amount
|
| + // of data.
|
| + return MOJO_RESULT_OUT_OF_RANGE;
|
| }
|
| +
|
| + size_t num_bytes_to_write =
|
| + std::min(static_cast<size_t>(max_num_bytes_to_write),
|
| + capacity_num_bytes() - current_num_bytes_);
|
| if (num_bytes_to_write == 0)
|
| return MOJO_RESULT_SHOULD_WAIT;
|
|
|
| - // The amount we can write in our first |memcpy()|.
|
| + // The amount we can write in our first copy.
|
| size_t num_bytes_to_write_first =
|
| std::min(num_bytes_to_write, GetMaxNumBytesToWrite());
|
| - // Do the first (and possibly only) |memcpy()|.
|
| + // Do the first (and possibly only) copy.
|
| size_t first_write_index =
|
| (start_index_ + current_num_bytes_) % capacity_num_bytes();
|
| EnsureBuffer();
|
| @@ -114,21 +117,9 @@ MojoResult LocalDataPipeImpl::ProducerBeginWriteData(
|
|
|
| size_t max_num_bytes_to_write = GetMaxNumBytesToWrite();
|
| if (min_num_bytes_to_write > max_num_bytes_to_write) {
|
| - // In "may discard" mode, we can always write from the write index to the
|
| - // end of the buffer.
|
| - if (may_discard() &&
|
| - min_num_bytes_to_write <= capacity_num_bytes() - write_index) {
|
| - // To do so, we need to discard an appropriate amount of data.
|
| - // We should only reach here if the start index is after the write index!
|
| - DCHECK_GE(start_index_, write_index);
|
| - DCHECK_GT(min_num_bytes_to_write - max_num_bytes_to_write, 0u);
|
| - MarkDataAsConsumed(min_num_bytes_to_write - max_num_bytes_to_write);
|
| - max_num_bytes_to_write = min_num_bytes_to_write;
|
| - } else {
|
| - // Don't return "should wait" since you can't wait for a specified amount
|
| - // of data.
|
| - return MOJO_RESULT_OUT_OF_RANGE;
|
| - }
|
| + // Don't return "should wait" since you can't wait for a specified amount
|
| + // of data.
|
| + return MOJO_RESULT_OUT_OF_RANGE;
|
| }
|
|
|
| // Don't go into a two-phase write if there's no room.
|
| @@ -145,6 +136,7 @@ MojoResult LocalDataPipeImpl::ProducerBeginWriteData(
|
|
|
| MojoResult LocalDataPipeImpl::ProducerEndWriteData(uint32_t num_bytes_written) {
|
| DCHECK_LE(num_bytes_written, producer_two_phase_max_num_bytes_written());
|
| + DCHECK_EQ(num_bytes_written % element_num_bytes(), 0u);
|
| current_num_bytes_ += num_bytes_written;
|
| DCHECK_LE(current_num_bytes_, capacity_num_bytes());
|
| set_producer_two_phase_max_num_bytes_written(0);
|
| @@ -154,7 +146,7 @@ MojoResult LocalDataPipeImpl::ProducerEndWriteData(uint32_t num_bytes_written) {
|
| HandleSignalsState LocalDataPipeImpl::ProducerGetHandleSignalsState() const {
|
| HandleSignalsState rv;
|
| if (consumer_open()) {
|
| - if ((may_discard() || current_num_bytes_ < capacity_num_bytes()) &&
|
| + if (current_num_bytes_ < capacity_num_bytes() &&
|
| !producer_in_two_phase_write())
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| @@ -168,8 +160,8 @@ HandleSignalsState LocalDataPipeImpl::ProducerGetHandleSignalsState() const {
|
| void LocalDataPipeImpl::ProducerStartSerialize(Channel* channel,
|
| size_t* max_size,
|
| size_t* max_platform_handles) {
|
| - // TODO(vtl): Support serializing producer data pipe handles.
|
| - *max_size = 0;
|
| + *max_size = sizeof(SerializedDataPipeProducerDispatcher) +
|
| + channel->GetSerializedEndpointSize();
|
| *max_platform_handles = 0;
|
| }
|
|
|
| @@ -178,9 +170,36 @@ bool LocalDataPipeImpl::ProducerEndSerialize(
|
| void* destination,
|
| size_t* actual_size,
|
| embedder::PlatformHandleVector* platform_handles) {
|
| - // TODO(vtl): Support serializing producer data pipe handles.
|
| - owner()->ProducerCloseNoLock();
|
| - return false;
|
| + SerializedDataPipeProducerDispatcher* s =
|
| + static_cast<SerializedDataPipeProducerDispatcher*>(destination);
|
| + s->validated_options = validated_options();
|
| + void* destination_for_endpoint = static_cast<char*>(destination) +
|
| + sizeof(SerializedDataPipeProducerDispatcher);
|
| +
|
| + if (!consumer_open()) {
|
| + // Case 1: The consumer is closed.
|
| + s->consumer_num_bytes = static_cast<size_t>(-1);
|
| + *actual_size = sizeof(SerializedDataPipeProducerDispatcher);
|
| + return true;
|
| + }
|
| +
|
| + // Case 2: The consumer isn't closed. We'll replace ourselves with a
|
| + // |RemoteProducerDataPipeImpl|.
|
| +
|
| + s->consumer_num_bytes = current_num_bytes_;
|
| + // Note: We don't use |port|.
|
| + scoped_refptr<ChannelEndpoint> channel_endpoint =
|
| + channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr,
|
| + owner(), 0);
|
| + // Note: Keep |*this| alive until the end of this method, to make things
|
| + // slightly easier on ourselves.
|
| + scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr(
|
| + new RemoteProducerDataPipeImpl(channel_endpoint.get(), buffer_.Pass(),
|
| + start_index_, current_num_bytes_))));
|
| +
|
| + *actual_size = sizeof(SerializedDataPipeProducerDispatcher) +
|
| + channel->GetSerializedEndpointSize();
|
| + return true;
|
| }
|
|
|
| void LocalDataPipeImpl::ConsumerClose() {
|
| @@ -215,7 +234,7 @@ MojoResult LocalDataPipeImpl::ConsumerReadData(UserPointer<void> elements,
|
| : MOJO_RESULT_FAILED_PRECONDITION;
|
| }
|
|
|
| - // The amount we can read in our first |memcpy()|.
|
| + // The amount we can read in our first copy.
|
| size_t num_bytes_to_read_first =
|
| std::min(num_bytes_to_read, GetMaxNumBytesToRead());
|
| elements.PutArray(buffer_.get() + start_index_, num_bytes_to_read_first);
|
| @@ -294,6 +313,7 @@ MojoResult LocalDataPipeImpl::ConsumerBeginReadData(
|
|
|
| MojoResult LocalDataPipeImpl::ConsumerEndReadData(uint32_t num_bytes_read) {
|
| DCHECK_LE(num_bytes_read, consumer_two_phase_max_num_bytes_read());
|
| + DCHECK_EQ(num_bytes_read % element_num_bytes(), 0u);
|
| DCHECK_LE(start_index_ + num_bytes_read, capacity_num_bytes());
|
| MarkDataAsConsumed(num_bytes_read);
|
| set_consumer_two_phase_max_num_bytes_read(0);
|
| @@ -318,8 +338,8 @@ HandleSignalsState LocalDataPipeImpl::ConsumerGetHandleSignalsState() const {
|
| void LocalDataPipeImpl::ConsumerStartSerialize(Channel* channel,
|
| size_t* max_size,
|
| size_t* max_platform_handles) {
|
| - // TODO(vtl): Support serializing consumer data pipe handles.
|
| - *max_size = 0;
|
| + *max_size = sizeof(SerializedDataPipeConsumerDispatcher) +
|
| + channel->GetSerializedEndpointSize();
|
| *max_platform_handles = 0;
|
| }
|
|
|
| @@ -328,11 +348,55 @@ bool LocalDataPipeImpl::ConsumerEndSerialize(
|
| void* destination,
|
| size_t* actual_size,
|
| embedder::PlatformHandleVector* platform_handles) {
|
| - // TODO(vtl): Support serializing consumer data pipe handles.
|
| - owner()->ConsumerCloseNoLock();
|
| + SerializedDataPipeConsumerDispatcher* s =
|
| + static_cast<SerializedDataPipeConsumerDispatcher*>(destination);
|
| + s->validated_options = validated_options();
|
| + void* destination_for_endpoint = static_cast<char*>(destination) +
|
| + sizeof(SerializedDataPipeConsumerDispatcher);
|
| +
|
| + size_t old_num_bytes = current_num_bytes_;
|
| + MessageInTransitQueue message_queue;
|
| + ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_,
|
| + &message_queue);
|
| + start_index_ = 0;
|
| + current_num_bytes_ = 0;
|
| +
|
| + if (!producer_open()) {
|
| + // Case 1: The producer is closed.
|
| + channel->SerializeEndpointWithClosedPeer(destination_for_endpoint,
|
| + &message_queue);
|
| + *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) +
|
| + channel->GetSerializedEndpointSize();
|
| + return true;
|
| + }
|
| +
|
| + // Case 2: The producer isn't closed. We'll replace ourselves with a
|
| + // |RemoteConsumerDataPipeImpl|.
|
| +
|
| + // Note: We don't use |port|.
|
| + scoped_refptr<ChannelEndpoint> channel_endpoint =
|
| + channel->SerializeEndpointWithLocalPeer(destination_for_endpoint,
|
| + &message_queue, owner(), 0);
|
| + // Note: Keep |*this| alive until the end of this method, to make things
|
| + // slightly easier on ourselves.
|
| + scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr(
|
| + new RemoteConsumerDataPipeImpl(channel_endpoint.get(), old_num_bytes))));
|
| +
|
| + *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) +
|
| + channel->GetSerializedEndpointSize();
|
| + return true;
|
| +}
|
| +
|
| +bool LocalDataPipeImpl::OnReadMessage(unsigned /*port*/,
|
| + MessageInTransit* /*message*/) {
|
| + NOTREACHED();
|
| return false;
|
| }
|
|
|
| +void LocalDataPipeImpl::OnDetachFromChannel(unsigned /*port*/) {
|
| + NOTREACHED();
|
| +}
|
| +
|
| void LocalDataPipeImpl::EnsureBuffer() {
|
| DCHECK(producer_open());
|
| if (buffer_)
|
|
|