Index: mojo/edk/system/data_pipe_producer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
index b66ffe169b48d6b28621e0691bd0ffddc1d6f3d4..b35517878947e336d86c0f7504466f5a004bfdc4 100644 |
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
@@ -7,6 +7,7 @@ |
#include <stddef.h> |
#include <stdint.h> |
+#include <algorithm> |
#include <utility> |
#include "base/bind.h" |
@@ -16,38 +17,42 @@ |
#include "mojo/edk/embedder/platform_shared_buffer.h" |
#include "mojo/edk/embedder/platform_support.h" |
#include "mojo/edk/system/configuration.h" |
-#include "mojo/edk/system/data_pipe.h" |
+#include "mojo/edk/system/transport_data.h" |
namespace mojo { |
namespace edk { |
void DataPipeProducerDispatcher::Init( |
- ScopedPlatformHandle message_pipe, |
- char* serialized_write_buffer, size_t serialized_write_buffer_size) { |
- if (message_pipe.is_valid()) { |
- channel_ = RawChannel::Create(std::move(message_pipe)); |
- channel_->SetSerializedData( |
- nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, |
- nullptr, nullptr); |
- internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); |
- } else { |
- error_ = true; |
+ ScopedPlatformHandle channel_handle, |
+ scoped_refptr<PlatformSharedBuffer> shared_buffer) { |
+ CHECK(shared_buffer); |
+ if (channel_handle.is_valid()) { |
+ RawChannel* channel = RawChannel::Create(std::move(channel_handle)); |
+ data_pipe_->set_channel(channel); |
} |
+ data_pipe_->set_shared_buffer(shared_buffer); |
+ InitInternal(); |
+} |
+ |
+void DataPipeProducerDispatcher::InitInternal() { |
+ peer_closed_ = data_pipe_->channel() == nullptr; |
+ internal::g_io_thread_task_runner->PostTask( |
+ FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); |
+ data_pipe_->Init(); |
} |
void DataPipeProducerDispatcher::InitOnIO() { |
base::AutoLock locker(lock()); |
- if (channel_) |
- channel_->Init(this); |
+ calling_init_ = true; |
+ RawChannel* channel = data_pipe_->channel(); |
+ if (channel) |
+ channel->Init(this); |
+ calling_init_ = false; |
} |
void DataPipeProducerDispatcher::CloseOnIO() { |
base::AutoLock locker(lock()); |
- if (channel_) { |
- channel_->Shutdown(); |
- channel_ = nullptr; |
- } |
+ data_pipe_->Shutdown(); |
} |
Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
@@ -59,43 +64,32 @@ DataPipeProducerDispatcher::Deserialize( |
const void* source, |
size_t size, |
PlatformHandleVector* platform_handles) { |
- MojoCreateDataPipeOptions options; |
- ScopedPlatformHandle shared_memory_handle; |
- size_t shared_memory_size = 0; |
- ScopedPlatformHandle platform_handle = |
- DataPipe::Deserialize(source, size, platform_handles, &options, |
- &shared_memory_handle, &shared_memory_size); |
- |
- scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); |
- |
- char* serialized_write_buffer = nullptr; |
- size_t serialized_write_buffer_size = 0; |
- scoped_refptr<PlatformSharedBuffer> shared_buffer; |
- scoped_ptr<PlatformSharedBufferMapping> mapping; |
- if (shared_memory_size) { |
- shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
- shared_memory_size, std::move(shared_memory_handle)); |
- mapping = shared_buffer->Map(0, shared_memory_size); |
- serialized_write_buffer = static_cast<char*>(mapping->GetBase()); |
- serialized_write_buffer_size = shared_memory_size; |
- } |
- |
- rv->Init(std::move(platform_handle), serialized_write_buffer, |
- serialized_write_buffer_size); |
+ scoped_refptr<DataPipe> data_pipe( |
+ DataPipe::Deserialize(source, size, platform_handles)); |
+ scoped_refptr<DataPipeProducerDispatcher> rv( |
+ new DataPipeProducerDispatcher(data_pipe)); |
+ rv->InitInternal(); |
return rv; |
} |
DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
const MojoCreateDataPipeOptions& options) |
- : options_(options), channel_(nullptr), error_(false), serialized_(false) { |
-} |
+ : DataPipeProducerDispatcher(new DataPipe(options)) {} |
+ |
+DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
+ scoped_refptr<DataPipe> data_pipe) |
+ : data_pipe_(data_pipe), |
+ calling_init_(false), |
+ peer_closed_(false), |
+ in_two_phase_write_(false), |
+ two_phase_max_bytes_write_(0u) {} |
DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
// See comment in ~MessagePipeDispatcher. |
- if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
- channel_->Shutdown(); |
+ if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
+ data_pipe_->Shutdown(); |
else |
- DCHECK(!channel_); |
+ DCHECK(!data_pipe_->channel()); |
} |
void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { |
@@ -111,14 +105,15 @@ void DataPipeProducerDispatcher::CloseImplNoLock() { |
scoped_refptr<Dispatcher> |
DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
+ // This function is used by TransportData to make sure there are no references |
+ // to the dispatcher it is trying to serialize and transport. |
lock().AssertAcquired(); |
- SerializeInternal(); |
+ scoped_refptr<DataPipeProducerDispatcher> rv = Create(data_pipe_->options()); |
+ data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get()); |
+ |
+ DCHECK(!in_two_phase_write_); |
- scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); |
- serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
- rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); |
- rv->serialized_ = true; |
return scoped_refptr<Dispatcher>(rv.get()); |
} |
@@ -127,41 +122,54 @@ MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( |
uint32_t* num_bytes, |
MojoWriteDataFlags flags) { |
lock().AssertAcquired(); |
- if (InTwoPhaseWrite()) |
+ if (in_two_phase_write_) |
return MOJO_RESULT_BUSY; |
- if (error_) |
+ if (peer_closed_) |
return MOJO_RESULT_FAILED_PRECONDITION; |
- if (*num_bytes % options_.element_num_bytes != 0) |
+ if (*num_bytes % data_pipe_->options().element_num_bytes != 0) |
return MOJO_RESULT_INVALID_ARGUMENT; |
if (*num_bytes == 0) |
return MOJO_RESULT_OK; // Nothing to do. |
- // For now, we ignore options.capacity_num_bytes as a total of all pending |
- // writes (and just treat it per message). We will implement that later if |
- // we need to. All current uses want all their data to be sent, and it's not |
- // clear that this backpressure should be done at the mojo layer or at a |
- // higher application layer. |
+ // Don't write non element sized chunks. |
+ uint32_t writable = data_pipe_->GetWritableBytes(); |
+ writable -= writable % data_pipe_->options().element_num_bytes; |
+ |
bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; |
uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; |
- if (min_num_bytes_to_write > options_.capacity_num_bytes) { |
+ if (min_num_bytes_to_write > writable) { |
// Don't return "should wait" since you can't wait for a specified amount of |
// data. |
return MOJO_RESULT_OUT_OF_RANGE; |
} |
- uint32_t num_bytes_to_write = |
- std::min(*num_bytes, options_.capacity_num_bytes); |
- if (num_bytes_to_write == 0) |
+ if (writable == 0) |
return MOJO_RESULT_SHOULD_WAIT; |
- HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
+ uint32_t num_bytes_to_write = std::min(*num_bytes, writable); |
+ |
+ // The failure case for |WriteDataIntoSharedBuffer| is the shared |
+ // buffer not existing, so we should wait. |
+ if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) { |
+ return MOJO_RESULT_SHOULD_WAIT; |
+ } |
+ |
+ // If we can't tell the other end about the write, pretend this write didn't |
+ // happen and mark the other end as closed. We deal with any state changes |
+ // due to the other side being closed in OnError. |
+ if (!data_pipe_->NotifyWrite(num_bytes_to_write)) { |
+ peer_closed_ = true; |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
+ } |
*num_bytes = num_bytes_to_write; |
- WriteDataIntoMessages(elements, num_bytes_to_write); |
+ HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
+ data_pipe_->UpdateFromWrite(num_bytes_to_write); |
HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
+ |
return MOJO_RESULT_OK; |
} |
@@ -170,21 +178,26 @@ MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( |
uint32_t* buffer_num_bytes, |
MojoWriteDataFlags flags) { |
lock().AssertAcquired(); |
- if (InTwoPhaseWrite()) |
+ if (in_two_phase_write_) |
return MOJO_RESULT_BUSY; |
- if (error_) |
+ if (peer_closed_) |
return MOJO_RESULT_FAILED_PRECONDITION; |
- // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. |
+ uint32_t max_num_bytes_to_write; |
+ void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write); |
+ |
+ if (max_num_bytes_to_write == 0) |
+ return MOJO_RESULT_SHOULD_WAIT; |
+ |
if (*buffer_num_bytes == 0) |
- *buffer_num_bytes = options_.capacity_num_bytes; |
+ *buffer_num_bytes = max_num_bytes_to_write; |
- two_phase_data_.resize(*buffer_num_bytes); |
- *buffer = &two_phase_data_[0]; |
+ // Don't promise more bytes than we have. |
+ *buffer_num_bytes = std::min(max_num_bytes_to_write, *buffer_num_bytes); |
- // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes |
- // we can construct a MessageInTransit here. But then we need to make |
- // MessageInTransit support changing its data size later. |
+ two_phase_max_bytes_write_ = *buffer_num_bytes; |
+ *buffer = temp_buf; |
+ in_two_phase_write_ = true; |
return MOJO_RESULT_OK; |
} |
@@ -192,28 +205,30 @@ MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( |
MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( |
uint32_t num_bytes_written) { |
lock().AssertAcquired(); |
- if (!InTwoPhaseWrite()) |
+ if (!in_two_phase_write_) |
return MOJO_RESULT_FAILED_PRECONDITION; |
- // Note: Allow successful completion of the two-phase write even if the other |
- // side has been closed. |
- MojoResult rv = MOJO_RESULT_OK; |
- if (num_bytes_written > two_phase_data_.size() || |
- num_bytes_written % options_.element_num_bytes != 0) { |
- rv = MOJO_RESULT_INVALID_ARGUMENT; |
- } else if (channel_) { |
- WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written); |
+ HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
+ in_two_phase_write_ = false; |
+ |
+ if (num_bytes_written > two_phase_max_bytes_write_ || |
+ num_bytes_written % data_pipe_->options().element_num_bytes != 0) { |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
} |
- // Two-phase write ended even on failure. |
- two_phase_data_.clear(); |
- // If we're now writable, we *became* writable (since we weren't writable |
- // during the two-phase write), so awake producer awakables. |
+ data_pipe_->UpdateFromWrite(num_bytes_written); |
+ |
HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
- if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
+ if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
- return rv; |
+ // Note: Allow successful completion of the two-phase write even if the other |
+ // side has been closed. |
+ // Deal with state changes due to peer being closed in OnError. |
+ if (!data_pipe_->NotifyWrite(num_bytes_written)) |
+ peer_closed_ = true; |
+ |
+ return MOJO_RESULT_OK; |
} |
HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() |
@@ -221,13 +236,14 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() |
lock().AssertAcquired(); |
HandleSignalsState rv; |
- if (!error_) { |
- if (!InTwoPhaseWrite()) |
+ if (!peer_closed_) { |
+ if (!in_two_phase_write_ && data_pipe_->GetWritableBytes()) |
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
} else { |
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
} |
+ |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
return rv; |
} |
@@ -238,8 +254,6 @@ MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( |
uintptr_t context, |
HandleSignalsState* signals_state) { |
lock().AssertAcquired(); |
- if (channel_) |
- channel_->EnsureLazyInitialized(); |
HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
if (state.satisfies(signals)) { |
if (signals_state) |
@@ -268,34 +282,14 @@ void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( |
void DataPipeProducerDispatcher::StartSerializeImplNoLock( |
size_t* max_size, |
size_t* max_platform_handles) { |
- if (!serialized_) |
- SerializeInternal(); |
- |
- DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), |
- !serialized_write_buffer_.empty(), max_size, |
- max_platform_handles); |
+ data_pipe_->StartSerialize(max_size, max_platform_handles); |
} |
bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( |
void* destination, |
size_t* actual_size, |
PlatformHandleVector* platform_handles) { |
- ScopedPlatformHandle shared_memory_handle; |
- size_t shared_memory_size = serialized_write_buffer_.size(); |
- if (shared_memory_size) { |
- scoped_refptr<PlatformSharedBuffer> shared_buffer( |
- internal::g_platform_support->CreateSharedBuffer( |
- shared_memory_size)); |
- scoped_ptr<PlatformSharedBufferMapping> mapping( |
- shared_buffer->Map(0, shared_memory_size)); |
- memcpy(mapping->GetBase(), &serialized_write_buffer_[0], |
- shared_memory_size); |
- shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); |
- } |
- |
- DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), |
- std::move(shared_memory_handle), shared_memory_size, |
- destination, actual_size, platform_handles); |
+ data_pipe_->EndSerialize(destination, actual_size, platform_handles); |
CloseImplNoLock(); |
return true; |
} |
@@ -310,44 +304,82 @@ void DataPipeProducerDispatcher::TransportEnded() { |
bool DataPipeProducerDispatcher::IsBusyNoLock() const { |
lock().AssertAcquired(); |
- return InTwoPhaseWrite(); |
+ return in_two_phase_write_; |
+} |
+ |
+bool DataPipeProducerDispatcher::ProcessCommand( |
+ const DataPipeCommandHeader& command, |
+ ScopedPlatformHandleVectorPtr platform_handles) { |
+ // Handles write/read case and shared buffer becoming available case. |
+ return data_pipe_->ProcessCommand(command, std::move(platform_handles)); |
} |
void DataPipeProducerDispatcher::OnReadMessage( |
const MessageInTransit::View& message_view, |
ScopedPlatformHandleVectorPtr platform_handles) { |
- CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; |
+ const DataPipeCommandHeader* command = |
+ static_cast<const DataPipeCommandHeader*>(message_view.bytes()); |
+ DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader)); |
+ |
+ if (started_transport_.Try()) { |
+ // We're not in the middle of being sent. |
+ |
+ // Can get synchronously called back from RawChannel::Init in InitOnIO if |
+ // there was initial data. InitOnIO locks, so don't lock twice. |
+ scoped_ptr<base::AutoLock> locker; |
+ if (!calling_init_) { |
+ locker.reset(new base::AutoLock(lock())); |
+ } |
+ |
+ if (ProcessCommand(*command, std::move(platform_handles))) { |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ } |
+ started_transport_.Release(); |
+ } else { |
+ // DataPipe::Serialize calls ReleaseHandle on the channel, which |
+ // acquires RawChannel's read_lock_. The function OnReadMessage is only |
+ // called while read_lock_ is acquired, and not after ReleaseHandle has been |
+ // called. This means this function will only be called before Serialize |
+ // calls ReleaseHandle, meaning the serialisation will not have started yet. |
+ // We only notify awakables if we're not in the process of being |
+ // transported. |
+ ProcessCommand(*command, std::move(platform_handles)); |
+ } |
} |
void DataPipeProducerDispatcher::OnError(Error error) { |
switch (error) { |
- case ERROR_READ_BROKEN: |
- case ERROR_READ_BAD_MESSAGE: |
- case ERROR_READ_UNKNOWN: |
- LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error."; |
- break; |
case ERROR_READ_SHUTDOWN: |
// The other side was cleanly closed, so this isn't actually an error. |
DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; |
break; |
+ case ERROR_READ_BROKEN: |
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)"; |
+ break; |
+ case ERROR_READ_BAD_MESSAGE: |
+ // Receiving a bad message means either a bug, data corruption, or |
+ // malicious attack (probably due to some other bug). |
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad " |
+ << "message)"; |
+ break; |
+ case ERROR_READ_UNKNOWN: |
+ LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)"; |
+ break; |
case ERROR_WRITE: |
- // Write errors are slightly notable: they probably shouldn't happen under |
- // normal operation (but maybe the other side crashed). |
- LOG(WARNING) << "DataPipeProducerDispatcher write error"; |
+ LOG(ERROR) << "DataPipeProducerDispatcher write error"; |
break; |
} |
- error_ = true; |
+ peer_closed_ = true; |
if (started_transport_.Try()) { |
base::AutoLock locker(lock()); |
// We can get two OnError callbacks before the post task below completes. |
// Although RawChannel still has a pointer to this object until Shutdown is |
// called, that is safe since this class always does a PostTask to the IO |
// thread to self destruct. |
- if (channel_) { |
+ if (data_pipe_->channel()) { |
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
- channel_->Shutdown(); |
- channel_ = nullptr; |
+ data_pipe_->Shutdown(); |
} |
started_transport_.Release(); |
} else { |
@@ -355,56 +387,5 @@ void DataPipeProducerDispatcher::OnError(Error error) { |
} |
} |
-bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { |
- return !two_phase_data_.empty(); |
-} |
- |
-bool DataPipeProducerDispatcher::WriteDataIntoMessages( |
- const void* elements, |
- uint32_t num_bytes) { |
- // The maximum amount of data to send per message (make it a multiple of the |
- // element size. |
- size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
- max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; |
- DCHECK_GT(max_message_num_bytes, 0u); |
- |
- uint32_t offset = 0; |
- while (offset < num_bytes) { |
- uint32_t message_num_bytes = |
- std::min(static_cast<uint32_t>(max_message_num_bytes), |
- num_bytes - offset); |
- scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::Type::MESSAGE, message_num_bytes, |
- static_cast<const char*>(elements) + offset)); |
- if (!channel_->WriteMessage(std::move(message))) { |
- error_ = true; |
- return false; |
- } |
- |
- offset += message_num_bytes; |
- } |
- |
- return true; |
-} |
- |
-void DataPipeProducerDispatcher::SerializeInternal() { |
- // We need to stop watching handle immediately, even though not on IO thread, |
- // so that other messages aren't read after this. |
- if (channel_) { |
- std::vector<char> serialized_read_buffer; |
- std::vector<int> fds; |
- bool write_error = false; |
- serialized_platform_handle_ = channel_->ReleaseHandle( |
- &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds, |
- &write_error); |
- CHECK(serialized_read_buffer.empty()); |
- CHECK(fds.empty()); |
- if (write_error) |
- serialized_platform_handle_.reset(); |
- channel_ = nullptr; |
- } |
- serialized_ = true; |
-} |
- |
} // namespace edk |
} // namespace mojo |