Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1372)

Unified Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
index b66ffe169b48d6b28621e0691bd0ffddc1d6f3d4..0a5557cf12e300216997b1a19136e662bdf50670 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -7,6 +7,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <algorithm>
#include <utility>
#include "base/bind.h"
@@ -16,38 +17,42 @@
#include "mojo/edk/embedder/platform_shared_buffer.h"
#include "mojo/edk/embedder/platform_support.h"
#include "mojo/edk/system/configuration.h"
-#include "mojo/edk/system/data_pipe.h"
+#include "mojo/edk/system/transport_data.h"
namespace mojo {
namespace edk {
-void DataPipeProducerDispatcher::Init(
- ScopedPlatformHandle message_pipe,
- char* serialized_write_buffer, size_t serialized_write_buffer_size) {
- if (message_pipe.is_valid()) {
- channel_ = RawChannel::Create(std::move(message_pipe));
- channel_->SetSerializedData(
- nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size,
- nullptr, nullptr);
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
- } else {
- error_ = true;
+void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe,
+ char* serialized_write_buffer,
+ size_t serialized_write_buffer_size,
+ char* serialized_read_buffer,
+ size_t serialized_read_buffer_size,
+ ScopedPlatformHandle shared_buffer_handle,
+ size_t ring_buffer_start,
+ size_t ring_buffer_size) {
+ if (!message_pipe.is_valid()) {
+ peer_closed_ = true;
}
+
+ data_pipe_->Init(std::move(message_pipe), serialized_write_buffer,
+ serialized_write_buffer_size, serialized_read_buffer,
+ serialized_read_buffer_size, std::move(shared_buffer_handle),
+ ring_buffer_start, ring_buffer_size, true,
+ base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
}
void DataPipeProducerDispatcher::InitOnIO() {
base::AutoLock locker(lock());
- if (channel_)
- channel_->Init(this);
+ calling_init_ = true;
+ RawChannel* channel = data_pipe_->GetChannel();
+ if (channel)
+ channel->Init(this);
+ calling_init_ = false;
}
void DataPipeProducerDispatcher::CloseOnIO() {
base::AutoLock locker(lock());
- if (channel_) {
- channel_->Shutdown();
- channel_ = nullptr;
- }
+ data_pipe_->Shutdown();
}
Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
@@ -60,42 +65,57 @@ DataPipeProducerDispatcher::Deserialize(
size_t size,
PlatformHandleVector* platform_handles) {
MojoCreateDataPipeOptions options;
- ScopedPlatformHandle shared_memory_handle;
- size_t shared_memory_size = 0;
- ScopedPlatformHandle platform_handle =
- DataPipe::Deserialize(source, size, platform_handles, &options,
- &shared_memory_handle, &shared_memory_size);
+ ScopedPlatformHandle channel_handle, channel_shared_handle,
+ shared_buffer_handle;
+ size_t serialized_read_buffer_size, serialized_write_buffer_size;
+ size_t ring_buffer_start, ring_buffer_size;
+
+ ScopedPlatformHandle platform_handle = DataPipe::Deserialize(
+ source, size, platform_handles, &options, &channel_shared_handle,
+ &serialized_read_buffer_size, &serialized_write_buffer_size,
+ &shared_buffer_handle, &ring_buffer_start, &ring_buffer_size);
scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
+ // Create shared buffer and pull out the serialised read and write data
+ // for the channel.
+ size_t sz = serialized_write_buffer_size + serialized_read_buffer_size;
+ char* serialized_read_buffer = nullptr;
char* serialized_write_buffer = nullptr;
- size_t serialized_write_buffer_size = 0;
- scoped_refptr<PlatformSharedBuffer> shared_buffer;
+ scoped_refptr<PlatformSharedBuffer> channel_shared_buffer;
scoped_ptr<PlatformSharedBufferMapping> mapping;
- if (shared_memory_size) {
- shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
- shared_memory_size, std::move(shared_memory_handle));
- mapping = shared_buffer->Map(0, shared_memory_size);
- serialized_write_buffer = static_cast<char*>(mapping->GetBase());
- serialized_write_buffer_size = shared_memory_size;
+ if (channel_shared_handle.is_valid()) {
+ channel_shared_buffer =
+ internal::g_platform_support->CreateSharedBufferFromHandle(
+ sz, std::move(channel_shared_handle));
+ mapping = channel_shared_buffer->Map(0, sz);
+
+ serialized_read_buffer = static_cast<char*>(mapping->GetBase());
+ serialized_write_buffer =
+ static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size;
}
- rv->Init(std::move(platform_handle), serialized_write_buffer,
- serialized_write_buffer_size);
+ rv->Init(std::move(platform_handle), serialized_read_buffer,
+ serialized_read_buffer_size, serialized_write_buffer,
+ serialized_write_buffer_size, std::move(shared_buffer_handle),
+ ring_buffer_start, ring_buffer_size);
return rv;
}
DataPipeProducerDispatcher::DataPipeProducerDispatcher(
const MojoCreateDataPipeOptions& options)
- : options_(options), channel_(nullptr), error_(false), serialized_(false) {
-}
+ : data_pipe_(new DataPipe(options)),
+ calling_init_(false),
+ peer_closed_(false),
+ in_two_phase_write_(false),
+ two_phase_max_bytes_write_(0) {}
DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
// See comment in ~MessagePipeDispatcher.
- if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
- channel_->Shutdown();
+ if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
+ data_pipe_->Shutdown();
else
- DCHECK(!channel_);
+ DCHECK(!data_pipe_->GetChannel());
}
void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
@@ -111,14 +131,16 @@ void DataPipeProducerDispatcher::CloseImplNoLock() {
scoped_refptr<Dispatcher>
DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
+ // This function is used by TransportData to make sure there are no references
+ // to the dispatcher it is trying to serialise and transport.
lock().AssertAcquired();
- SerializeInternal();
+ scoped_refptr<DataPipeProducerDispatcher> rv =
+ Create(data_pipe_->GetOptions());
+ data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get());
+
+ DCHECK(!in_two_phase_write_);
- scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
- serialized_write_buffer_.swap(rv->serialized_write_buffer_);
- rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
- rv->serialized_ = true;
return scoped_refptr<Dispatcher>(rv.get());
}
@@ -127,41 +149,54 @@ MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
uint32_t* num_bytes,
MojoWriteDataFlags flags) {
lock().AssertAcquired();
- if (InTwoPhaseWrite())
+ if (in_two_phase_write_)
return MOJO_RESULT_BUSY;
- if (error_)
+ if (peer_closed_)
return MOJO_RESULT_FAILED_PRECONDITION;
- if (*num_bytes % options_.element_num_bytes != 0)
+ if (*num_bytes % data_pipe_->GetOptions().element_num_bytes != 0)
return MOJO_RESULT_INVALID_ARGUMENT;
if (*num_bytes == 0)
return MOJO_RESULT_OK; // Nothing to do.
- // For now, we ignore options.capacity_num_bytes as a total of all pending
- // writes (and just treat it per message). We will implement that later if
- // we need to. All current uses want all their data to be sent, and it's not
- // clear that this backpressure should be done at the mojo layer or at a
- // higher application layer.
+ // Don't write non element sized chunks.
+ uint32_t writable = uint32_t(data_pipe_->GetWritableBytes());
+ writable -= writable % data_pipe_->GetOptions().element_num_bytes;
+
bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
- if (min_num_bytes_to_write > options_.capacity_num_bytes) {
+ if (min_num_bytes_to_write > writable) {
// Don't return "should wait" since you can't wait for a specified amount of
// data.
return MOJO_RESULT_OUT_OF_RANGE;
}
- uint32_t num_bytes_to_write =
- std::min(*num_bytes, options_.capacity_num_bytes);
- if (num_bytes_to_write == 0)
+ if (writable == 0)
return MOJO_RESULT_SHOULD_WAIT;
- HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
+ uint32_t num_bytes_to_write = std::min(*num_bytes, writable);
+
+ // The failure case for |WriteDataIntoSharedBuffer| is the shared
+ // buffer not existing, so we should wait.
+ if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) {
+ return MOJO_RESULT_SHOULD_WAIT;
+ }
+
+ // If we can't tell the other end about the write, pretend this write didn't
+ // happen and mark the other end as closed. We deal with any state changes
+ // due to the other side being closed in OnError.
+ if (!data_pipe_->NotifyWrite(num_bytes_to_write)) {
+ peer_closed_ = true;
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
*num_bytes = num_bytes_to_write;
- WriteDataIntoMessages(elements, num_bytes_to_write);
+ HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
+ data_pipe_->UpdateFromWrite(num_bytes_to_write);
HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
+
return MOJO_RESULT_OK;
}
@@ -170,21 +205,27 @@ MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
uint32_t* buffer_num_bytes,
MojoWriteDataFlags flags) {
lock().AssertAcquired();
- if (InTwoPhaseWrite())
+ if (in_two_phase_write_)
return MOJO_RESULT_BUSY;
- if (error_)
+ if (peer_closed_)
return MOJO_RESULT_FAILED_PRECONDITION;
- // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes.
+ size_t max_num_bytes_to_write;
+ void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write);
+
+ if (max_num_bytes_to_write == 0)
+ return MOJO_RESULT_SHOULD_WAIT;
+
if (*buffer_num_bytes == 0)
- *buffer_num_bytes = options_.capacity_num_bytes;
+ *buffer_num_bytes = uint32_t(max_num_bytes_to_write);
- two_phase_data_.resize(*buffer_num_bytes);
- *buffer = &two_phase_data_[0];
+ // Don't promise more bytes than we have.
+ *buffer_num_bytes =
+ std::min(uint32_t(max_num_bytes_to_write), *buffer_num_bytes);
- // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
- // we can construct a MessageInTransit here. But then we need to make
- // MessageInTransit support changing its data size later.
+ two_phase_max_bytes_write_ = *buffer_num_bytes;
+ *buffer = temp_buf;
+ in_two_phase_write_ = true;
return MOJO_RESULT_OK;
}
@@ -192,28 +233,30 @@ MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
uint32_t num_bytes_written) {
lock().AssertAcquired();
- if (!InTwoPhaseWrite())
+ if (!in_two_phase_write_)
return MOJO_RESULT_FAILED_PRECONDITION;
- // Note: Allow successful completion of the two-phase write even if the other
- // side has been closed.
- MojoResult rv = MOJO_RESULT_OK;
- if (num_bytes_written > two_phase_data_.size() ||
- num_bytes_written % options_.element_num_bytes != 0) {
- rv = MOJO_RESULT_INVALID_ARGUMENT;
- } else if (channel_) {
- WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
+ HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
+ in_two_phase_write_ = false;
+
+ if (num_bytes_written > two_phase_max_bytes_write_ ||
+ num_bytes_written % data_pipe_->GetOptions().element_num_bytes != 0) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
}
- // Two-phase write ended even on failure.
- two_phase_data_.clear();
- // If we're now writable, we *became* writable (since we weren't writable
- // during the two-phase write), so awake producer awakables.
+ data_pipe_->UpdateFromWrite(num_bytes_written);
+
HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
- if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
+ if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
- return rv;
+ // Note: Allow successful completion of the two-phase write even if the other
+ // side has been closed.
+ // Deal with state changes due to peer being closed in OnError.
+ if (!data_pipe_->NotifyWrite(num_bytes_written))
+ peer_closed_ = true;
+
+ return MOJO_RESULT_OK;
}
HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
@@ -221,13 +264,14 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
lock().AssertAcquired();
HandleSignalsState rv;
- if (!error_) {
- if (!InTwoPhaseWrite())
+ if (!peer_closed_) {
+ if (!in_two_phase_write_ && data_pipe_->GetWritableBytes())
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
} else {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
}
+
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
return rv;
}
@@ -238,8 +282,6 @@ MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
uintptr_t context,
HandleSignalsState* signals_state) {
lock().AssertAcquired();
- if (channel_)
- channel_->EnsureLazyInitialized();
HandleSignalsState state = GetHandleSignalsStateImplNoLock();
if (state.satisfies(signals)) {
if (signals_state)
@@ -268,34 +310,14 @@ void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
void DataPipeProducerDispatcher::StartSerializeImplNoLock(
size_t* max_size,
size_t* max_platform_handles) {
- if (!serialized_)
- SerializeInternal();
-
- DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
- !serialized_write_buffer_.empty(), max_size,
- max_platform_handles);
+ data_pipe_->StartSerialize(max_size, max_platform_handles);
}
bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
void* destination,
size_t* actual_size,
PlatformHandleVector* platform_handles) {
- ScopedPlatformHandle shared_memory_handle;
- size_t shared_memory_size = serialized_write_buffer_.size();
- if (shared_memory_size) {
- scoped_refptr<PlatformSharedBuffer> shared_buffer(
- internal::g_platform_support->CreateSharedBuffer(
- shared_memory_size));
- scoped_ptr<PlatformSharedBufferMapping> mapping(
- shared_buffer->Map(0, shared_memory_size));
- memcpy(mapping->GetBase(), &serialized_write_buffer_[0],
- shared_memory_size);
- shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
- }
-
- DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
- std::move(shared_memory_handle), shared_memory_size,
- destination, actual_size, platform_handles);
+ data_pipe_->EndSerialize(destination, actual_size, platform_handles);
CloseImplNoLock();
return true;
}
@@ -310,44 +332,82 @@ void DataPipeProducerDispatcher::TransportEnded() {
bool DataPipeProducerDispatcher::IsBusyNoLock() const {
lock().AssertAcquired();
- return InTwoPhaseWrite();
+ return in_two_phase_write_;
+}
+
+bool DataPipeProducerDispatcher::ProcessCommand(
+ const DataPipeCommandHeader& command,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ // Handles write/read case and shared buffer becoming available case.
+ return data_pipe_->ProcessCommand(command, std::move(platform_handles));
}
void DataPipeProducerDispatcher::OnReadMessage(
const MessageInTransit::View& message_view,
ScopedPlatformHandleVectorPtr platform_handles) {
- CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages.";
+ const DataPipeCommandHeader* command =
+ static_cast<const DataPipeCommandHeader*>(message_view.bytes());
+ DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader));
+
+ if (started_transport_.Try()) {
+ // We're not in the middle of being sent.
+
+ // Can get synchronously called back from RawChannel::Init in InitOnIO if
+ // there was initial data. InitOnIO locks, so don't lock twice.
+ scoped_ptr<base::AutoLock> locker;
+ if (!calling_init_) {
+ locker.reset(new base::AutoLock(lock()));
+ }
+
+ if (ProcessCommand(*command, std::move(platform_handles))) {
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ }
+ started_transport_.Release();
+ } else {
+ // DataPipe::Serialize calls ReleaseHandle on the channel, which
+ // acquires RawChannel's read_lock_. The function OnReadMessage is only
+ // called while read_lock_ is acquired, and not after ReleaseHandle has been
+ // called. This means this function will only be called before Serialize
+ // calls ReleaseHandle, meaning the serialisation will not have started yet.
+ // We only notify awakables if we're not in the process of being
+ // transported.
+ ProcessCommand(*command, std::move(platform_handles));
+ }
}
void DataPipeProducerDispatcher::OnError(Error error) {
switch (error) {
- case ERROR_READ_BROKEN:
- case ERROR_READ_BAD_MESSAGE:
- case ERROR_READ_UNKNOWN:
- LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error.";
- break;
case ERROR_READ_SHUTDOWN:
// The other side was cleanly closed, so this isn't actually an error.
DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)";
break;
+ case ERROR_READ_BROKEN:
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)";
+ break;
+ case ERROR_READ_BAD_MESSAGE:
+ // Receiving a bad message means either a bug, data corruption, or
+ // malicious attack (probably due to some other bug).
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad "
+ << "message)";
+ break;
+ case ERROR_READ_UNKNOWN:
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)";
+ break;
case ERROR_WRITE:
- // Write errors are slightly notable: they probably shouldn't happen under
- // normal operation (but maybe the other side crashed).
- LOG(WARNING) << "DataPipeProducerDispatcher write error";
+ LOG(ERROR) << "DataPipeProducerDispatcher write error";
break;
}
- error_ = true;
+ peer_closed_ = true;
if (started_transport_.Try()) {
base::AutoLock locker(lock());
// We can get two OnError callbacks before the post task below completes.
// Although RawChannel still has a pointer to this object until Shutdown is
// called, that is safe since this class always does a PostTask to the IO
// thread to self destruct.
- if (channel_) {
+ if (data_pipe_->GetChannel()) {
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
- channel_->Shutdown();
- channel_ = nullptr;
+ data_pipe_->Shutdown();
}
started_transport_.Release();
} else {
@@ -355,56 +415,5 @@ void DataPipeProducerDispatcher::OnError(Error error) {
}
}
-bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
- return !two_phase_data_.empty();
-}
-
-bool DataPipeProducerDispatcher::WriteDataIntoMessages(
- const void* elements,
- uint32_t num_bytes) {
- // The maximum amount of data to send per message (make it a multiple of the
- // element size.
- size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
- max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
- DCHECK_GT(max_message_num_bytes, 0u);
-
- uint32_t offset = 0;
- while (offset < num_bytes) {
- uint32_t message_num_bytes =
- std::min(static_cast<uint32_t>(max_message_num_bytes),
- num_bytes - offset);
- scoped_ptr<MessageInTransit> message(new MessageInTransit(
- MessageInTransit::Type::MESSAGE, message_num_bytes,
- static_cast<const char*>(elements) + offset));
- if (!channel_->WriteMessage(std::move(message))) {
- error_ = true;
- return false;
- }
-
- offset += message_num_bytes;
- }
-
- return true;
-}
-
-void DataPipeProducerDispatcher::SerializeInternal() {
- // We need to stop watching handle immediately, even though not on IO thread,
- // so that other messages aren't read after this.
- if (channel_) {
- std::vector<char> serialized_read_buffer;
- std::vector<int> fds;
- bool write_error = false;
- serialized_platform_handle_ = channel_->ReleaseHandle(
- &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds,
- &write_error);
- CHECK(serialized_read_buffer.empty());
- CHECK(fds.empty());
- if (write_error)
- serialized_platform_handle_.reset();
- channel_ = nullptr;
- }
- serialized_ = true;
-}
-
} // namespace edk
} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698