Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Unified Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
index b66ffe169b48d6b28621e0691bd0ffddc1d6f3d4..b35517878947e336d86c0f7504466f5a004bfdc4 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -7,6 +7,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <algorithm>
#include <utility>
#include "base/bind.h"
@@ -16,38 +17,42 @@
#include "mojo/edk/embedder/platform_shared_buffer.h"
#include "mojo/edk/embedder/platform_support.h"
#include "mojo/edk/system/configuration.h"
-#include "mojo/edk/system/data_pipe.h"
+#include "mojo/edk/system/transport_data.h"
namespace mojo {
namespace edk {
void DataPipeProducerDispatcher::Init(
- ScopedPlatformHandle message_pipe,
- char* serialized_write_buffer, size_t serialized_write_buffer_size) {
- if (message_pipe.is_valid()) {
- channel_ = RawChannel::Create(std::move(message_pipe));
- channel_->SetSerializedData(
- nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size,
- nullptr, nullptr);
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
- } else {
- error_ = true;
+ ScopedPlatformHandle channel_handle,
+ scoped_refptr<PlatformSharedBuffer> shared_buffer) {
+ CHECK(shared_buffer);
+ if (channel_handle.is_valid()) {
+ RawChannel* channel = RawChannel::Create(std::move(channel_handle));
+ data_pipe_->set_channel(channel);
}
+ data_pipe_->set_shared_buffer(shared_buffer);
+ InitInternal();
+}
+
+void DataPipeProducerDispatcher::InitInternal() {
+ peer_closed_ = data_pipe_->channel() == nullptr;
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
+ data_pipe_->Init();
}
void DataPipeProducerDispatcher::InitOnIO() {
base::AutoLock locker(lock());
- if (channel_)
- channel_->Init(this);
+ calling_init_ = true;
+ RawChannel* channel = data_pipe_->channel();
+ if (channel)
+ channel->Init(this);
+ calling_init_ = false;
}
void DataPipeProducerDispatcher::CloseOnIO() {
base::AutoLock locker(lock());
- if (channel_) {
- channel_->Shutdown();
- channel_ = nullptr;
- }
+ data_pipe_->Shutdown();
}
Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
@@ -59,43 +64,32 @@ DataPipeProducerDispatcher::Deserialize(
const void* source,
size_t size,
PlatformHandleVector* platform_handles) {
- MojoCreateDataPipeOptions options;
- ScopedPlatformHandle shared_memory_handle;
- size_t shared_memory_size = 0;
- ScopedPlatformHandle platform_handle =
- DataPipe::Deserialize(source, size, platform_handles, &options,
- &shared_memory_handle, &shared_memory_size);
-
- scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
-
- char* serialized_write_buffer = nullptr;
- size_t serialized_write_buffer_size = 0;
- scoped_refptr<PlatformSharedBuffer> shared_buffer;
- scoped_ptr<PlatformSharedBufferMapping> mapping;
- if (shared_memory_size) {
- shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
- shared_memory_size, std::move(shared_memory_handle));
- mapping = shared_buffer->Map(0, shared_memory_size);
- serialized_write_buffer = static_cast<char*>(mapping->GetBase());
- serialized_write_buffer_size = shared_memory_size;
- }
-
- rv->Init(std::move(platform_handle), serialized_write_buffer,
- serialized_write_buffer_size);
+ scoped_refptr<DataPipe> data_pipe(
+ DataPipe::Deserialize(source, size, platform_handles));
+ scoped_refptr<DataPipeProducerDispatcher> rv(
+ new DataPipeProducerDispatcher(data_pipe));
+ rv->InitInternal();
return rv;
}
DataPipeProducerDispatcher::DataPipeProducerDispatcher(
const MojoCreateDataPipeOptions& options)
- : options_(options), channel_(nullptr), error_(false), serialized_(false) {
-}
+ : DataPipeProducerDispatcher(new DataPipe(options)) {}
+
+DataPipeProducerDispatcher::DataPipeProducerDispatcher(
+ scoped_refptr<DataPipe> data_pipe)
+ : data_pipe_(data_pipe),
+ calling_init_(false),
+ peer_closed_(false),
+ in_two_phase_write_(false),
+ two_phase_max_bytes_write_(0u) {}
DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
// See comment in ~MessagePipeDispatcher.
- if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
- channel_->Shutdown();
+ if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
+ data_pipe_->Shutdown();
else
- DCHECK(!channel_);
+ DCHECK(!data_pipe_->channel());
}
void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
@@ -111,14 +105,15 @@ void DataPipeProducerDispatcher::CloseImplNoLock() {
scoped_refptr<Dispatcher>
DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
+ // This function is used by TransportData to make sure there are no references
+ // to the dispatcher it is trying to serialize and transport.
lock().AssertAcquired();
- SerializeInternal();
+ scoped_refptr<DataPipeProducerDispatcher> rv = Create(data_pipe_->options());
+ data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get());
+
+ DCHECK(!in_two_phase_write_);
- scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
- serialized_write_buffer_.swap(rv->serialized_write_buffer_);
- rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
- rv->serialized_ = true;
return scoped_refptr<Dispatcher>(rv.get());
}
@@ -127,41 +122,54 @@ MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
uint32_t* num_bytes,
MojoWriteDataFlags flags) {
lock().AssertAcquired();
- if (InTwoPhaseWrite())
+ if (in_two_phase_write_)
return MOJO_RESULT_BUSY;
- if (error_)
+ if (peer_closed_)
return MOJO_RESULT_FAILED_PRECONDITION;
- if (*num_bytes % options_.element_num_bytes != 0)
+ if (*num_bytes % data_pipe_->options().element_num_bytes != 0)
return MOJO_RESULT_INVALID_ARGUMENT;
if (*num_bytes == 0)
return MOJO_RESULT_OK; // Nothing to do.
- // For now, we ignore options.capacity_num_bytes as a total of all pending
- // writes (and just treat it per message). We will implement that later if
- // we need to. All current uses want all their data to be sent, and it's not
- // clear that this backpressure should be done at the mojo layer or at a
- // higher application layer.
+ // Don't write non element sized chunks.
+ uint32_t writable = data_pipe_->GetWritableBytes();
+ writable -= writable % data_pipe_->options().element_num_bytes;
+
bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
- if (min_num_bytes_to_write > options_.capacity_num_bytes) {
+ if (min_num_bytes_to_write > writable) {
// Don't return "should wait" since you can't wait for a specified amount of
// data.
return MOJO_RESULT_OUT_OF_RANGE;
}
- uint32_t num_bytes_to_write =
- std::min(*num_bytes, options_.capacity_num_bytes);
- if (num_bytes_to_write == 0)
+ if (writable == 0)
return MOJO_RESULT_SHOULD_WAIT;
- HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
+ uint32_t num_bytes_to_write = std::min(*num_bytes, writable);
+
+ // The failure case for |WriteDataIntoSharedBuffer| is the shared
+ // buffer not existing, so we should wait.
+ if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) {
+ return MOJO_RESULT_SHOULD_WAIT;
+ }
+
+ // If we can't tell the other end about the write, pretend this write didn't
+ // happen and mark the other end as closed. We deal with any state changes
+ // due to the other side being closed in OnError.
+ if (!data_pipe_->NotifyWrite(num_bytes_to_write)) {
+ peer_closed_ = true;
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
*num_bytes = num_bytes_to_write;
- WriteDataIntoMessages(elements, num_bytes_to_write);
+ HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
+ data_pipe_->UpdateFromWrite(num_bytes_to_write);
HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
+
return MOJO_RESULT_OK;
}
@@ -170,21 +178,26 @@ MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
uint32_t* buffer_num_bytes,
MojoWriteDataFlags flags) {
lock().AssertAcquired();
- if (InTwoPhaseWrite())
+ if (in_two_phase_write_)
return MOJO_RESULT_BUSY;
- if (error_)
+ if (peer_closed_)
return MOJO_RESULT_FAILED_PRECONDITION;
- // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes.
+ uint32_t max_num_bytes_to_write;
+ void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write);
+
+ if (max_num_bytes_to_write == 0)
+ return MOJO_RESULT_SHOULD_WAIT;
+
if (*buffer_num_bytes == 0)
- *buffer_num_bytes = options_.capacity_num_bytes;
+ *buffer_num_bytes = max_num_bytes_to_write;
- two_phase_data_.resize(*buffer_num_bytes);
- *buffer = &two_phase_data_[0];
+ // Don't promise more bytes than we have.
+ *buffer_num_bytes = std::min(max_num_bytes_to_write, *buffer_num_bytes);
- // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
- // we can construct a MessageInTransit here. But then we need to make
- // MessageInTransit support changing its data size later.
+ two_phase_max_bytes_write_ = *buffer_num_bytes;
+ *buffer = temp_buf;
+ in_two_phase_write_ = true;
return MOJO_RESULT_OK;
}
@@ -192,28 +205,30 @@ MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
uint32_t num_bytes_written) {
lock().AssertAcquired();
- if (!InTwoPhaseWrite())
+ if (!in_two_phase_write_)
return MOJO_RESULT_FAILED_PRECONDITION;
- // Note: Allow successful completion of the two-phase write even if the other
- // side has been closed.
- MojoResult rv = MOJO_RESULT_OK;
- if (num_bytes_written > two_phase_data_.size() ||
- num_bytes_written % options_.element_num_bytes != 0) {
- rv = MOJO_RESULT_INVALID_ARGUMENT;
- } else if (channel_) {
- WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
+ HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
+ in_two_phase_write_ = false;
+
+ if (num_bytes_written > two_phase_max_bytes_write_ ||
+ num_bytes_written % data_pipe_->options().element_num_bytes != 0) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
}
- // Two-phase write ended even on failure.
- two_phase_data_.clear();
- // If we're now writable, we *became* writable (since we weren't writable
- // during the two-phase write), so awake producer awakables.
+ data_pipe_->UpdateFromWrite(num_bytes_written);
+
HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
- if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
+ if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
- return rv;
+ // Note: Allow successful completion of the two-phase write even if the other
+ // side has been closed.
+ // Deal with state changes due to peer being closed in OnError.
+ if (!data_pipe_->NotifyWrite(num_bytes_written))
+ peer_closed_ = true;
+
+ return MOJO_RESULT_OK;
}
HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
@@ -221,13 +236,14 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
lock().AssertAcquired();
HandleSignalsState rv;
- if (!error_) {
- if (!InTwoPhaseWrite())
+ if (!peer_closed_) {
+ if (!in_two_phase_write_ && data_pipe_->GetWritableBytes())
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
} else {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
}
+
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
return rv;
}
@@ -238,8 +254,6 @@ MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
uintptr_t context,
HandleSignalsState* signals_state) {
lock().AssertAcquired();
- if (channel_)
- channel_->EnsureLazyInitialized();
HandleSignalsState state = GetHandleSignalsStateImplNoLock();
if (state.satisfies(signals)) {
if (signals_state)
@@ -268,34 +282,14 @@ void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
void DataPipeProducerDispatcher::StartSerializeImplNoLock(
size_t* max_size,
size_t* max_platform_handles) {
- if (!serialized_)
- SerializeInternal();
-
- DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
- !serialized_write_buffer_.empty(), max_size,
- max_platform_handles);
+ data_pipe_->StartSerialize(max_size, max_platform_handles);
}
bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
void* destination,
size_t* actual_size,
PlatformHandleVector* platform_handles) {
- ScopedPlatformHandle shared_memory_handle;
- size_t shared_memory_size = serialized_write_buffer_.size();
- if (shared_memory_size) {
- scoped_refptr<PlatformSharedBuffer> shared_buffer(
- internal::g_platform_support->CreateSharedBuffer(
- shared_memory_size));
- scoped_ptr<PlatformSharedBufferMapping> mapping(
- shared_buffer->Map(0, shared_memory_size));
- memcpy(mapping->GetBase(), &serialized_write_buffer_[0],
- shared_memory_size);
- shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
- }
-
- DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
- std::move(shared_memory_handle), shared_memory_size,
- destination, actual_size, platform_handles);
+ data_pipe_->EndSerialize(destination, actual_size, platform_handles);
CloseImplNoLock();
return true;
}
@@ -310,44 +304,82 @@ void DataPipeProducerDispatcher::TransportEnded() {
bool DataPipeProducerDispatcher::IsBusyNoLock() const {
lock().AssertAcquired();
- return InTwoPhaseWrite();
+ return in_two_phase_write_;
+}
+
+bool DataPipeProducerDispatcher::ProcessCommand(
+ const DataPipeCommandHeader& command,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ // Handles write/read case and shared buffer becoming available case.
+ return data_pipe_->ProcessCommand(command, std::move(platform_handles));
}
void DataPipeProducerDispatcher::OnReadMessage(
const MessageInTransit::View& message_view,
ScopedPlatformHandleVectorPtr platform_handles) {
- CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages.";
+ const DataPipeCommandHeader* command =
+ static_cast<const DataPipeCommandHeader*>(message_view.bytes());
+ DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader));
+
+ if (started_transport_.Try()) {
+ // We're not in the middle of being sent.
+
+ // Can get synchronously called back from RawChannel::Init in InitOnIO if
+ // there was initial data. InitOnIO locks, so don't lock twice.
+ scoped_ptr<base::AutoLock> locker;
+ if (!calling_init_) {
+ locker.reset(new base::AutoLock(lock()));
+ }
+
+ if (ProcessCommand(*command, std::move(platform_handles))) {
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ }
+ started_transport_.Release();
+ } else {
+ // DataPipe::Serialize calls ReleaseHandle on the channel, which
+ // acquires RawChannel's read_lock_. The function OnReadMessage is only
+ // called while read_lock_ is acquired, and not after ReleaseHandle has been
+ // called. This means this function will only be called before Serialize
+ // calls ReleaseHandle, meaning the serialisation will not have started yet.
+ // We only notify awakables if we're not in the process of being
+ // transported.
+ ProcessCommand(*command, std::move(platform_handles));
+ }
}
void DataPipeProducerDispatcher::OnError(Error error) {
switch (error) {
- case ERROR_READ_BROKEN:
- case ERROR_READ_BAD_MESSAGE:
- case ERROR_READ_UNKNOWN:
- LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error.";
- break;
case ERROR_READ_SHUTDOWN:
// The other side was cleanly closed, so this isn't actually an error.
DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)";
break;
+ case ERROR_READ_BROKEN:
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)";
+ break;
+ case ERROR_READ_BAD_MESSAGE:
+ // Receiving a bad message means either a bug, data corruption, or
+ // malicious attack (probably due to some other bug).
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad "
+ << "message)";
+ break;
+ case ERROR_READ_UNKNOWN:
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)";
+ break;
case ERROR_WRITE:
- // Write errors are slightly notable: they probably shouldn't happen under
- // normal operation (but maybe the other side crashed).
- LOG(WARNING) << "DataPipeProducerDispatcher write error";
+ LOG(ERROR) << "DataPipeProducerDispatcher write error";
break;
}
- error_ = true;
+ peer_closed_ = true;
if (started_transport_.Try()) {
base::AutoLock locker(lock());
// We can get two OnError callbacks before the post task below completes.
// Although RawChannel still has a pointer to this object until Shutdown is
// called, that is safe since this class always does a PostTask to the IO
// thread to self destruct.
- if (channel_) {
+ if (data_pipe_->channel()) {
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
- channel_->Shutdown();
- channel_ = nullptr;
+ data_pipe_->Shutdown();
}
started_transport_.Release();
} else {
@@ -355,56 +387,5 @@ void DataPipeProducerDispatcher::OnError(Error error) {
}
}
-bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
- return !two_phase_data_.empty();
-}
-
-bool DataPipeProducerDispatcher::WriteDataIntoMessages(
- const void* elements,
- uint32_t num_bytes) {
- // The maximum amount of data to send per message (make it a multiple of the
- // element size.
- size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
- max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
- DCHECK_GT(max_message_num_bytes, 0u);
-
- uint32_t offset = 0;
- while (offset < num_bytes) {
- uint32_t message_num_bytes =
- std::min(static_cast<uint32_t>(max_message_num_bytes),
- num_bytes - offset);
- scoped_ptr<MessageInTransit> message(new MessageInTransit(
- MessageInTransit::Type::MESSAGE, message_num_bytes,
- static_cast<const char*>(elements) + offset));
- if (!channel_->WriteMessage(std::move(message))) {
- error_ = true;
- return false;
- }
-
- offset += message_num_bytes;
- }
-
- return true;
-}
-
-void DataPipeProducerDispatcher::SerializeInternal() {
- // We need to stop watching handle immediately, even though not on IO thread,
- // so that other messages aren't read after this.
- if (channel_) {
- std::vector<char> serialized_read_buffer;
- std::vector<int> fds;
- bool write_error = false;
- serialized_platform_handle_ = channel_->ReleaseHandle(
- &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds,
- &write_error);
- CHECK(serialized_read_buffer.empty());
- CHECK(fds.empty());
- if (write_error)
- serialized_platform_handle_.reset();
- channel_ = nullptr;
- }
- serialized_ = true;
-}
-
} // namespace edk
} // namespace mojo
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698