Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(185)

Unified Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1880823005: [mojo-edk] Add explicit message object APIs (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/message_pipe_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/message_pipe_dispatcher.cc
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index eb5892bcecbb777b775c6a8374134b4229f0df07..d55a3c975cea1312c2fd3c6bbbcd8435a1fa72da 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -11,6 +11,7 @@
#include "base/memory/ref_counted.h"
#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/system/core.h"
+#include "mojo/edk/system/message_for_transit.h"
#include "mojo/edk/system/node_controller.h"
#include "mojo/edk/system/ports_message.h"
#include "mojo/edk/system/request_context.h"
@@ -20,36 +21,10 @@ namespace edk {
namespace {
-#pragma pack(push, 1)
-
-// Header attached to every message sent over a message pipe.
-struct MessageHeader {
- // The number of serialized dispatchers included in this header.
- uint32_t num_dispatchers;
-
- // Total size of the header, including serialized dispatcher data.
- uint32_t header_size;
-};
-
-static_assert(sizeof(MessageHeader) % 8 == 0, "Invalid MessageHeader size.");
-
-// Header for each dispatcher, immediately following the message header.
-struct DispatcherHeader {
- // The type of the dispatcher, correpsonding to the Dispatcher::Type enum.
- int32_t type;
-
- // The size of the serialized dispatcher, not including this header.
- uint32_t num_bytes;
+using DispatcherHeader = MessageForTransit::DispatcherHeader;
+using MessageHeader = MessageForTransit::MessageHeader;
- // The number of ports needed to deserialize this dispatcher.
- uint32_t num_ports;
-
- // The number of platform handles needed to deserialize this dispatcher.
- uint32_t num_platform_handles;
-};
-
-static_assert(sizeof(DispatcherHeader) % 8 == 0,
- "Invalid DispatcherHeader size.");
+#pragma pack(push, 1)
struct SerializedState {
uint64_t pipe_id;
@@ -157,163 +132,44 @@ MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
}
MojoResult MessagePipeDispatcher::WriteMessage(
- const void* bytes,
- uint32_t num_bytes,
- const DispatcherInTransit* dispatchers,
- uint32_t num_dispatchers,
+ std::unique_ptr<MessageForTransit> message,
MojoWriteMessageFlags flags) {
-
-
if (port_closed_ || in_transit_)
return MOJO_RESULT_INVALID_ARGUMENT;
- // A structure for retaining information about every Dispatcher we're about
- // to send. This information is collected by calling StartSerialize() on
- // each dispatcher in sequence.
- struct DispatcherInfo {
- uint32_t num_bytes;
- uint32_t num_ports;
- uint32_t num_handles;
- };
-
- // This is only the base header size. It will grow as we accumulate the
- // size of serialized state for each dispatcher.
- size_t header_size = sizeof(MessageHeader) +
- num_dispatchers * sizeof(DispatcherHeader);
-
- size_t num_ports = 0;
- size_t num_handles = 0;
-
- std::vector<DispatcherInfo> dispatcher_info(num_dispatchers);
- for (size_t i = 0; i < num_dispatchers; ++i) {
- Dispatcher* d = dispatchers[i].dispatcher.get();
- d->StartSerialize(&dispatcher_info[i].num_bytes,
- &dispatcher_info[i].num_ports,
- &dispatcher_info[i].num_handles);
- header_size += dispatcher_info[i].num_bytes;
- num_ports += dispatcher_info[i].num_ports;
- num_handles += dispatcher_info[i].num_handles;
- }
-
- // We now have enough information to fully allocate the message storage.
- std::unique_ptr<PortsMessage> message = PortsMessage::NewUserMessage(
- header_size + num_bytes, num_ports, num_handles);
- DCHECK(message);
+ size_t num_bytes = message->num_bytes();
+ std::unique_ptr<PortsMessage> msg = message->TakePortsMessage();
+ int rv = node_controller_->SendMessage(port_, &msg);
- // Populate the message header with information about serialized dispatchers.
- //
- // The front of the message is always a MessageHeader followed by a
- // DispatcherHeader for each dispatcher to be sent.
- MessageHeader* header =
- static_cast<MessageHeader*>(message->mutable_payload_bytes());
- DispatcherHeader* dispatcher_headers =
- reinterpret_cast<DispatcherHeader*>(header + 1);
-
- // Serialized dispatcher state immediately follows the series of
- // DispatcherHeaders.
- char* dispatcher_data =
- reinterpret_cast<char*>(dispatcher_headers + num_dispatchers);
-
- header->num_dispatchers = num_dispatchers;
-
- // |header_size| is the total number of bytes preceding the message payload,
- // including all dispatcher headers and serialized dispatcher state.
- DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max());
- header->header_size = static_cast<uint32_t>(header_size);
-
- bool cancel_transit = false;
- if (num_dispatchers > 0) {
- ScopedPlatformHandleVectorPtr handles(
- new PlatformHandleVector(num_handles));
- size_t port_index = 0;
- size_t handle_index = 0;
- for (size_t i = 0; i < num_dispatchers; ++i) {
- Dispatcher* d = dispatchers[i].dispatcher.get();
- DispatcherHeader* dh = &dispatcher_headers[i];
- const DispatcherInfo& info = dispatcher_info[i];
-
- // Fill in the header for this dispatcher.
- dh->type = static_cast<int32_t>(d->GetType());
- dh->num_bytes = info.num_bytes;
- dh->num_ports = info.num_ports;
- dh->num_platform_handles = info.num_handles;
-
- // Fill in serialized state, ports, and platform handles. We'll cancel
- // the send if the dispatcher implementation rejects for some reason.
- if (!d->EndSerialize(static_cast<void*>(dispatcher_data),
- message->mutable_ports() + port_index,
- handles->data() + handle_index)) {
- cancel_transit = true;
- break;
- }
-
- dispatcher_data += info.num_bytes;
- port_index += info.num_ports;
- handle_index += info.num_handles;
- }
-
- if (!cancel_transit) {
- // Take ownership of all the handles and move them into message storage.
- message->SetHandles(std::move(handles));
- } else {
- // Release any platform handles we've accumulated. Their dispatchers
- // retain ownership when transit is canceled, so these are not actually
- // leaking.
- handles->clear();
- }
- }
+ DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
+ << " [port=" << port_.name() << "; rv=" << rv
+ << "; num_bytes=" << num_bytes << "]";
- MojoResult result = MOJO_RESULT_OK;
- if (!cancel_transit) {
- // Copy the message body.
- void* message_body = static_cast<void*>(
- static_cast<char*>(message->mutable_payload_bytes()) + header_size);
- memcpy(message_body, bytes, num_bytes);
-
- int rv = node_controller_->SendMessage(port_, &message);
-
- DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
- << " [port=" << port_.name() << "; rv=" << rv
- << "; num_bytes=" << num_bytes << "]";
-
- if (rv != ports::OK) {
- if (rv == ports::ERROR_PORT_UNKNOWN ||
- rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
- rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
- result = MOJO_RESULT_INVALID_ARGUMENT;
- } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
- base::AutoLock lock(signal_lock_);
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
- result = MOJO_RESULT_FAILED_PRECONDITION;
- } else {
- NOTREACHED();
- result = MOJO_RESULT_UNKNOWN;
- }
- cancel_transit = true;
- } else {
- DCHECK(!message);
+ if (rv != ports::OK) {
+ if (rv == ports::ERROR_PORT_UNKNOWN ||
+ rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
+ rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
+ base::AutoLock lock(signal_lock_);
+ awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ return MOJO_RESULT_FAILED_PRECONDITION;
}
- }
- if (cancel_transit) {
- // We ended up not sending the message. Release all the platform handles.
- // Their dipatchers retain ownership when transit is canceled, so these are
- // not actually leaking.
- DCHECK(message);
- Channel::MessagePtr m = message->TakeChannelMessage();
- ScopedPlatformHandleVectorPtr handles = m->TakeHandles();
- if (handles)
- handles->clear();
+ NOTREACHED();
+ return MOJO_RESULT_UNKNOWN;
}
- return result;
+ return MOJO_RESULT_OK;
}
-MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
- uint32_t* num_bytes,
- MojoHandle* handles,
- uint32_t* num_handles,
- MojoReadMessageFlags flags) {
+MojoResult MessagePipeDispatcher::ReadMessage(
+ std::unique_ptr<MessageForTransit>* message,
+ uint32_t* num_bytes,
+ MojoHandle* handles,
+ uint32_t* num_handles,
+ MojoReadMessageFlags flags,
+ bool read_any_size) {
// We can't read from a port that's closed or in transit!
if (port_closed_ || in_transit_)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -321,15 +177,17 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
bool no_space = false;
bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
- // Ensure the provided buffers are large enough to hold the next message.
- // GetMessageIf provides an atomic way to test the next message without
- // committing to removing it from the port's underlying message queue until
- // we are sure we can consume it.
+ // Grab a message if the provided handles buffer is large enough. If the input
+ // |num_bytes| is provided and |read_any_size| is false, we also ensure
+ // that it specifies a size at least as large as the next available payload.
+ //
+ // If |read_any_size| is true, the input value of |*num_bytes| is ignored.
+ // This flag exists to support both new and old API behavior.
ports::ScopedMessage ports_message;
int rv = node_controller_->node()->GetMessageIf(
port_,
- [num_bytes, num_handles, &no_space, &may_discard](
+ [read_any_size, num_bytes, num_handles, &no_space, &may_discard](
const ports::Message& next_message) {
const PortsMessage& message =
static_cast<const PortsMessage&>(next_message);
@@ -355,8 +213,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
*num_handles = handles_available;
}
- if (bytes_to_read < bytes_available ||
- handles_to_read < handles_available) {
+ if (handles_to_read < handles_available ||
+ (!read_any_size && bytes_to_read < bytes_available)) {
no_space = true;
return may_discard;
}
@@ -375,9 +233,9 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
}
if (no_space) {
- // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this
- // message's data. The message will still be in queue unless
- // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
+ // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
+ // sufficient to hold this message's data. The message will still be in
+ // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
return MOJO_RESULT_RESOURCE_EXHAUSTED;
}
@@ -397,11 +255,11 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
// Alright! We have a message and the caller has provided sufficient storage
// in which to receive it.
- std::unique_ptr<PortsMessage> message(
+ std::unique_ptr<PortsMessage> msg(
static_cast<PortsMessage*>(ports_message.release()));
const MessageHeader* header =
- static_cast<const MessageHeader*>(message->payload_bytes());
+ static_cast<const MessageHeader*>( msg->payload_bytes());
const DispatcherHeader* dispatcher_headers =
reinterpret_cast<const DispatcherHeader*>(header + 1);
@@ -420,18 +278,17 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
const DispatcherHeader& dh = dispatcher_headers[i];
Type type = static_cast<Type>(dh.type);
- DCHECK_GE(message->num_payload_bytes(),
+ DCHECK_GE(msg->num_payload_bytes(),
data_payload_index + dh.num_bytes);
- DCHECK_GE(message->num_ports(),
+ DCHECK_GE(msg->num_ports(),
port_index + dh.num_ports);
- DCHECK_GE(message->num_handles(),
+ DCHECK_GE(msg->num_handles(),
platform_handle_index + dh.num_platform_handles);
PlatformHandle* out_handles =
- message->num_handles() ? message->handles() + platform_handle_index
- : nullptr;
+ msg->num_handles() ? msg->handles() + platform_handle_index : nullptr;
dispatchers[i].dispatcher = Dispatcher::Deserialize(
- type, dispatcher_data, dh.num_bytes, message->ports() + port_index,
+ type, dispatcher_data, dh.num_bytes, msg->ports() + port_index,
dh.num_ports, out_handles, dh.num_platform_handles);
if (!dispatchers[i].dispatcher)
return MOJO_RESULT_UNKNOWN;
@@ -447,12 +304,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
return MOJO_RESULT_UNKNOWN;
}
- // Copy message bytes.
- DCHECK_GE(message->num_payload_bytes(), header->header_size);
- const char* payload = reinterpret_cast<const char*>(message->payload_bytes());
- memcpy(bytes, payload + header->header_size,
- message->num_payload_bytes() - header->header_size);
-
+ CHECK(msg);
+ *message = MessageForTransit::WrapPortsMessage(std::move(msg));
return MOJO_RESULT_OK;
}
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/message_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698