Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Unified Diff: third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc

Issue 975973002: Update mojo sdk to rev f68e697e389943cd9bf9652397312280e96b127a (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: shake fist at msvc Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc
diff --git a/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc b/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc
index 3ca76d31891b461527876d2fdd334336c566a5c2..938ce68c651637daa55966d3203c57ed753dea54 100644
--- a/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc
+++ b/third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.cc
@@ -14,13 +14,30 @@
#include <algorithm>
+#include "base/compiler_specific.h"
#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/configuration.h"
#include "mojo/edk/system/data_pipe.h"
+#include "mojo/edk/system/message_in_transit.h"
+#include "mojo/edk/system/message_in_transit_queue.h"
+#include "mojo/edk/system/remote_consumer_data_pipe_impl.h"
+#include "mojo/edk/system/remote_producer_data_pipe_impl.h"
namespace mojo {
namespace system {
+// Assert some things about some things defined in data_pipe_impl.h (don't make
+// the assertions there, to avoid including message_in_transit.h).
+static_assert(ALIGNOF(SerializedDataPipeConsumerDispatcher) ==
+ MessageInTransit::kMessageAlignment,
+ "Wrong alignment");
+static_assert(sizeof(SerializedDataPipeConsumerDispatcher) %
+ MessageInTransit::kMessageAlignment ==
+ 0,
+ "Wrong size");
+
LocalDataPipeImpl::LocalDataPipeImpl()
: start_index_(0), current_num_bytes_(0) {
// Note: |buffer_| is lazily allocated, since a common case will be that one
@@ -51,39 +68,25 @@ MojoResult LocalDataPipeImpl::ProducerWriteData(
DCHECK_EQ(max_num_bytes_to_write % element_num_bytes(), 0u);
DCHECK_EQ(min_num_bytes_to_write % element_num_bytes(), 0u);
DCHECK_GT(max_num_bytes_to_write, 0u);
+ DCHECK_GE(max_num_bytes_to_write, min_num_bytes_to_write);
DCHECK(consumer_open());
- size_t num_bytes_to_write = 0;
- if (may_discard()) {
- if (min_num_bytes_to_write > capacity_num_bytes())
- return MOJO_RESULT_OUT_OF_RANGE;
-
- num_bytes_to_write = std::min(static_cast<size_t>(max_num_bytes_to_write),
- capacity_num_bytes());
- if (num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
- // Discard as much as needed (discard oldest first).
- MarkDataAsConsumed(num_bytes_to_write -
- (capacity_num_bytes() - current_num_bytes_));
- // No need to wake up write waiters, since we're definitely going to leave
- // the buffer full.
- }
- } else {
- if (min_num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
- // Don't return "should wait" since you can't wait for a specified amount
- // of data.
- return MOJO_RESULT_OUT_OF_RANGE;
- }
-
- num_bytes_to_write = std::min(static_cast<size_t>(max_num_bytes_to_write),
- capacity_num_bytes() - current_num_bytes_);
+ if (min_num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
+ // Don't return "should wait" since you can't wait for a specified amount
+ // of data.
+ return MOJO_RESULT_OUT_OF_RANGE;
}
+
+ size_t num_bytes_to_write =
+ std::min(static_cast<size_t>(max_num_bytes_to_write),
+ capacity_num_bytes() - current_num_bytes_);
if (num_bytes_to_write == 0)
return MOJO_RESULT_SHOULD_WAIT;
- // The amount we can write in our first |memcpy()|.
+ // The amount we can write in our first copy.
size_t num_bytes_to_write_first =
std::min(num_bytes_to_write, GetMaxNumBytesToWrite());
- // Do the first (and possibly only) |memcpy()|.
+ // Do the first (and possibly only) copy.
size_t first_write_index =
(start_index_ + current_num_bytes_) % capacity_num_bytes();
EnsureBuffer();
@@ -114,21 +117,9 @@ MojoResult LocalDataPipeImpl::ProducerBeginWriteData(
size_t max_num_bytes_to_write = GetMaxNumBytesToWrite();
if (min_num_bytes_to_write > max_num_bytes_to_write) {
- // In "may discard" mode, we can always write from the write index to the
- // end of the buffer.
- if (may_discard() &&
- min_num_bytes_to_write <= capacity_num_bytes() - write_index) {
- // To do so, we need to discard an appropriate amount of data.
- // We should only reach here if the start index is after the write index!
- DCHECK_GE(start_index_, write_index);
- DCHECK_GT(min_num_bytes_to_write - max_num_bytes_to_write, 0u);
- MarkDataAsConsumed(min_num_bytes_to_write - max_num_bytes_to_write);
- max_num_bytes_to_write = min_num_bytes_to_write;
- } else {
- // Don't return "should wait" since you can't wait for a specified amount
- // of data.
- return MOJO_RESULT_OUT_OF_RANGE;
- }
+ // Don't return "should wait" since you can't wait for a specified amount
+ // of data.
+ return MOJO_RESULT_OUT_OF_RANGE;
}
// Don't go into a two-phase write if there's no room.
@@ -145,6 +136,7 @@ MojoResult LocalDataPipeImpl::ProducerBeginWriteData(
MojoResult LocalDataPipeImpl::ProducerEndWriteData(uint32_t num_bytes_written) {
DCHECK_LE(num_bytes_written, producer_two_phase_max_num_bytes_written());
+ DCHECK_EQ(num_bytes_written % element_num_bytes(), 0u);
current_num_bytes_ += num_bytes_written;
DCHECK_LE(current_num_bytes_, capacity_num_bytes());
set_producer_two_phase_max_num_bytes_written(0);
@@ -154,7 +146,7 @@ MojoResult LocalDataPipeImpl::ProducerEndWriteData(uint32_t num_bytes_written) {
HandleSignalsState LocalDataPipeImpl::ProducerGetHandleSignalsState() const {
HandleSignalsState rv;
if (consumer_open()) {
- if ((may_discard() || current_num_bytes_ < capacity_num_bytes()) &&
+ if (current_num_bytes_ < capacity_num_bytes() &&
!producer_in_two_phase_write())
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
@@ -168,8 +160,8 @@ HandleSignalsState LocalDataPipeImpl::ProducerGetHandleSignalsState() const {
void LocalDataPipeImpl::ProducerStartSerialize(Channel* channel,
size_t* max_size,
size_t* max_platform_handles) {
- // TODO(vtl): Support serializing producer data pipe handles.
- *max_size = 0;
+ *max_size = sizeof(SerializedDataPipeProducerDispatcher) +
+ channel->GetSerializedEndpointSize();
*max_platform_handles = 0;
}
@@ -178,9 +170,36 @@ bool LocalDataPipeImpl::ProducerEndSerialize(
void* destination,
size_t* actual_size,
embedder::PlatformHandleVector* platform_handles) {
- // TODO(vtl): Support serializing producer data pipe handles.
- owner()->ProducerCloseNoLock();
- return false;
+ SerializedDataPipeProducerDispatcher* s =
+ static_cast<SerializedDataPipeProducerDispatcher*>(destination);
+ s->validated_options = validated_options();
+ void* destination_for_endpoint = static_cast<char*>(destination) +
+ sizeof(SerializedDataPipeProducerDispatcher);
+
+ if (!consumer_open()) {
+ // Case 1: The consumer is closed.
+ s->consumer_num_bytes = static_cast<size_t>(-1);
+ *actual_size = sizeof(SerializedDataPipeProducerDispatcher);
+ return true;
+ }
+
+ // Case 2: The consumer isn't closed. We'll replace ourselves with a
+ // |RemoteProducerDataPipeImpl|.
+
+ s->consumer_num_bytes = current_num_bytes_;
+ // Note: We don't use |port|.
+ scoped_refptr<ChannelEndpoint> channel_endpoint =
+ channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr,
+ owner(), 0);
+ // Note: Keep |*this| alive until the end of this method, to make things
+ // slightly easier on ourselves.
+ scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr(
+ new RemoteProducerDataPipeImpl(channel_endpoint.get(), buffer_.Pass(),
+ start_index_, current_num_bytes_))));
+
+ *actual_size = sizeof(SerializedDataPipeProducerDispatcher) +
+ channel->GetSerializedEndpointSize();
+ return true;
}
void LocalDataPipeImpl::ConsumerClose() {
@@ -215,7 +234,7 @@ MojoResult LocalDataPipeImpl::ConsumerReadData(UserPointer<void> elements,
: MOJO_RESULT_FAILED_PRECONDITION;
}
- // The amount we can read in our first |memcpy()|.
+ // The amount we can read in our first copy.
size_t num_bytes_to_read_first =
std::min(num_bytes_to_read, GetMaxNumBytesToRead());
elements.PutArray(buffer_.get() + start_index_, num_bytes_to_read_first);
@@ -294,6 +313,7 @@ MojoResult LocalDataPipeImpl::ConsumerBeginReadData(
MojoResult LocalDataPipeImpl::ConsumerEndReadData(uint32_t num_bytes_read) {
DCHECK_LE(num_bytes_read, consumer_two_phase_max_num_bytes_read());
+ DCHECK_EQ(num_bytes_read % element_num_bytes(), 0u);
DCHECK_LE(start_index_ + num_bytes_read, capacity_num_bytes());
MarkDataAsConsumed(num_bytes_read);
set_consumer_two_phase_max_num_bytes_read(0);
@@ -318,8 +338,8 @@ HandleSignalsState LocalDataPipeImpl::ConsumerGetHandleSignalsState() const {
void LocalDataPipeImpl::ConsumerStartSerialize(Channel* channel,
size_t* max_size,
size_t* max_platform_handles) {
- // TODO(vtl): Support serializing consumer data pipe handles.
- *max_size = 0;
+ *max_size = sizeof(SerializedDataPipeConsumerDispatcher) +
+ channel->GetSerializedEndpointSize();
*max_platform_handles = 0;
}
@@ -328,11 +348,55 @@ bool LocalDataPipeImpl::ConsumerEndSerialize(
void* destination,
size_t* actual_size,
embedder::PlatformHandleVector* platform_handles) {
- // TODO(vtl): Support serializing consumer data pipe handles.
- owner()->ConsumerCloseNoLock();
+ SerializedDataPipeConsumerDispatcher* s =
+ static_cast<SerializedDataPipeConsumerDispatcher*>(destination);
+ s->validated_options = validated_options();
+ void* destination_for_endpoint = static_cast<char*>(destination) +
+ sizeof(SerializedDataPipeConsumerDispatcher);
+
+ size_t old_num_bytes = current_num_bytes_;
+ MessageInTransitQueue message_queue;
+ ConvertDataToMessages(buffer_.get(), &start_index_, &current_num_bytes_,
+ &message_queue);
+ start_index_ = 0;
+ current_num_bytes_ = 0;
+
+ if (!producer_open()) {
+ // Case 1: The producer is closed.
+ channel->SerializeEndpointWithClosedPeer(destination_for_endpoint,
+ &message_queue);
+ *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) +
+ channel->GetSerializedEndpointSize();
+ return true;
+ }
+
+ // Case 2: The producer isn't closed. We'll replace ourselves with a
+ // |RemoteConsumerDataPipeImpl|.
+
+ // Note: We don't use |port|.
+ scoped_refptr<ChannelEndpoint> channel_endpoint =
+ channel->SerializeEndpointWithLocalPeer(destination_for_endpoint,
+ &message_queue, owner(), 0);
+ // Note: Keep |*this| alive until the end of this method, to make things
+ // slightly easier on ourselves.
+ scoped_ptr<DataPipeImpl> self(owner()->ReplaceImplNoLock(make_scoped_ptr(
+ new RemoteConsumerDataPipeImpl(channel_endpoint.get(), old_num_bytes))));
+
+ *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) +
+ channel->GetSerializedEndpointSize();
+ return true;
+}
+
+bool LocalDataPipeImpl::OnReadMessage(unsigned /*port*/,
+ MessageInTransit* /*message*/) {
+ NOTREACHED();
return false;
}
+void LocalDataPipeImpl::OnDetachFromChannel(unsigned /*port*/) {
+ NOTREACHED();
+}
+
void LocalDataPipeImpl::EnsureBuffer() {
DCHECK(producer_open());
if (buffer_)

Powered by Google App Engine
This is Rietveld 408576698