Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(84)

Unified Diff: content/child/webmessageportchannel_impl.cc

Issue 2422793002: HTML MessagePort as mojo::MessagePipeHandle (Closed)
Patch Set: Add missing ScopedAsyncTaskScheduler instance for the new unit tests; required by a recent change t… Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « content/child/webmessageportchannel_impl.h ('k') | content/common/BUILD.gn » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/child/webmessageportchannel_impl.cc
diff --git a/content/child/webmessageportchannel_impl.cc b/content/child/webmessageportchannel_impl.cc
index 26de409df251f612d9b4d849950dbfbfceaa290a..3c5e8d90ed75c5523c12f432d50589b24f897a5d 100644
--- a/content/child/webmessageportchannel_impl.cc
+++ b/content/child/webmessageportchannel_impl.cc
@@ -5,16 +5,12 @@
#include "content/child/webmessageportchannel_impl.h"
#include <stddef.h>
-#include <utility>
#include "base/bind.h"
-#include "content/child/child_process.h"
-#include "content/child/child_thread_impl.h"
-#include "content/common/message_port_messages.h"
+#include "base/logging.h"
+#include "base/memory/ptr_util.h"
#include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
#include "third_party/WebKit/public/platform/WebString.h"
-#include "third_party/WebKit/public/web/WebSerializedScriptValue.h"
-#include "v8/include/v8.h"
using blink::WebMessagePortChannel;
using blink::WebMessagePortChannelArray;
@@ -23,303 +19,109 @@ using blink::WebString;
namespace content {
-WebMessagePortChannelImpl::WebMessagePortChannelImpl(
- const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner)
- : client_(NULL),
- route_id_(MSG_ROUTING_NONE),
- message_port_id_(MSG_ROUTING_NONE),
- main_thread_task_runner_(main_thread_task_runner) {
- AddRef();
- Init();
+WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
+ setClient(nullptr);
}
WebMessagePortChannelImpl::WebMessagePortChannelImpl(
- int route_id,
- int port_id,
- const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner)
- : client_(NULL),
- route_id_(route_id),
- message_port_id_(port_id),
- main_thread_task_runner_(main_thread_task_runner) {
- AddRef();
- Init();
-}
-
-WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
- // If we have any queued messages with attached ports, manually destroy them.
- while (!message_queue_.empty()) {
- const WebMessagePortChannelArray& channel_array =
- message_queue_.front().ports;
- for (size_t i = 0; i < channel_array.size(); i++) {
- channel_array[i]->destroy();
- }
- message_queue_.pop();
- }
-
- if (message_port_id_ != MSG_ROUTING_NONE)
- Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
-
- if (route_id_ != MSG_ROUTING_NONE)
- ChildThreadImpl::current()->GetRouter()->RemoveRoute(route_id_);
+ MessagePort message_port)
+ : port_(message_port.ReleaseHandle()) {
}
// static
void WebMessagePortChannelImpl::CreatePair(
- const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner,
blink::WebMessagePortChannel** channel1,
blink::WebMessagePortChannel** channel2) {
- WebMessagePortChannelImpl* impl1 =
- new WebMessagePortChannelImpl(main_thread_task_runner);
- WebMessagePortChannelImpl* impl2 =
- new WebMessagePortChannelImpl(main_thread_task_runner);
-
- impl1->Entangle(impl2);
- impl2->Entangle(impl1);
-
- *channel1 = impl1;
- *channel2 = impl2;
-}
-
-// static
-std::vector<int>
-WebMessagePortChannelImpl::ExtractMessagePortIDs(
- std::unique_ptr<WebMessagePortChannelArray> channels) {
- std::vector<int> message_ports;
- if (channels)
- message_ports = ExtractMessagePortIDs(*channels);
- return message_ports;
+ mojo::MessagePipe pipe;
+ *channel1 = new WebMessagePortChannelImpl(std::move(pipe.handle0));
+ *channel2 = new WebMessagePortChannelImpl(std::move(pipe.handle1));
}
// static
-std::vector<int>
-WebMessagePortChannelImpl::ExtractMessagePortIDs(
- const WebMessagePortChannelArray& channels) {
- std::vector<int> message_ports(channels.size());
+std::vector<MessagePort>
+WebMessagePortChannelImpl::ExtractMessagePorts(
+ WebMessagePortChannelArray channels) {
+ std::vector<MessagePort> message_ports(channels.size());
for (size_t i = 0; i < channels.size(); ++i) {
- WebMessagePortChannelImpl* webchannel =
- static_cast<WebMessagePortChannelImpl*>(channels[i]);
- // The message port ids might not be set up yet if this channel
- // wasn't created on the main thread.
- DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread());
- message_ports[i] = webchannel->message_port_id();
- webchannel->QueueMessages();
- DCHECK(message_ports[i] != MSG_ROUTING_NONE);
+ WebMessagePortChannelImpl* channel_impl =
+ static_cast<WebMessagePortChannelImpl*>(channels[i].get());
+ message_ports[i] = channel_impl->ReleaseMessagePort();
+ DCHECK(message_ports[i].GetHandle().is_valid());
}
return message_ports;
}
// static
-std::vector<int>
-WebMessagePortChannelImpl::ExtractMessagePortIDsWithoutQueueing(
- std::unique_ptr<WebMessagePortChannelArray> channels) {
- if (!channels)
- return std::vector<int>();
-
- std::vector<int> message_ports(channels->size());
- for (size_t i = 0; i < channels->size(); ++i) {
- WebMessagePortChannelImpl* webchannel =
- static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
- // The message port ids might not be set up yet if this channel
- // wasn't created on the main thread.
- DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread());
- message_ports[i] = webchannel->message_port_id();
- // Don't queue messages, but do increase the child processes ref-count to
- // ensure this child process stays alive long enough to receive all
- // in-flight messages.
- ChildProcess::current()->AddRefProcess();
- DCHECK(message_ports[i] != MSG_ROUTING_NONE);
- }
- return message_ports;
+WebMessagePortChannelArray
+WebMessagePortChannelImpl::CreateFromMessagePorts(
+ const std::vector<MessagePort>& message_ports) {
+ WebMessagePortChannelArray channels(message_ports.size());
+ for (size_t i = 0; i < message_ports.size(); ++i)
+ channels[i] = base::MakeUnique<WebMessagePortChannelImpl>(message_ports[i]);
+ return channels;
}
// static
-WebMessagePortChannelArray WebMessagePortChannelImpl::CreatePorts(
- const std::vector<int>& message_ports,
- const std::vector<int>& new_routing_ids,
- const scoped_refptr<base::SingleThreadTaskRunner>&
- main_thread_task_runner) {
- DCHECK_EQ(message_ports.size(), new_routing_ids.size());
- WebMessagePortChannelArray channels(message_ports.size());
- for (size_t i = 0; i < message_ports.size() && i < new_routing_ids.size();
- ++i) {
- channels[i] = new WebMessagePortChannelImpl(
- new_routing_ids[i], message_ports[i],
- main_thread_task_runner);
+WebMessagePortChannelArray
+WebMessagePortChannelImpl::CreateFromMessagePipeHandles(
+ std::vector<mojo::ScopedMessagePipeHandle> handles) {
+ WebMessagePortChannelArray channels(handles.size());
+ for (size_t i = 0; i < handles.size(); ++i) {
+ channels[i] = base::MakeUnique<WebMessagePortChannelImpl>(
+ MessagePort(std::move(handles[i])));
}
return channels;
}
-void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
- // Must lock here since client_ is called on the main thread.
- base::AutoLock auto_lock(lock_);
- client_ = client;
+MessagePort WebMessagePortChannelImpl::ReleaseMessagePort() {
+ return MessagePort(port_.ReleaseHandle());
}
-void WebMessagePortChannelImpl::destroy() {
- setClient(NULL);
-
- // Release the object on the main thread, since the destructor might want to
- // send an IPC, and that has to happen on the main thread.
- main_thread_task_runner_->ReleaseSoon(FROM_HERE, this);
+WebMessagePortChannelImpl::WebMessagePortChannelImpl(
+ mojo::ScopedMessagePipeHandle handle)
+ : port_(std::move(handle)) {
}
-void WebMessagePortChannelImpl::postMessage(
- const WebString& message,
- WebMessagePortChannelArray* channels_ptr) {
- std::unique_ptr<WebMessagePortChannelArray> channels(channels_ptr);
- if (!main_thread_task_runner_->BelongsToCurrentThread()) {
- // Note: we must construct the base::string16 here and pass that. Otherwise,
- // the WebString will be passed, leading to references to the StringImpl
- // from two threads, which is a data race.
- main_thread_task_runner_->PostTask(
- FROM_HERE, base::Bind(&WebMessagePortChannelImpl::SendPostMessage, this,
- base::Passed(message.utf16()),
- base::Passed(std::move(channels))));
+void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
+ if (client) {
+ port_.SetCallback(
+ base::Bind(&WebMessagePortChannelClient::messageAvailable,
+ base::Unretained(client)));
} else {
- SendPostMessage(message.utf16(), std::move(channels));
+ port_.ClearCallback();
}
}
-void WebMessagePortChannelImpl::SendPostMessage(
- const base::string16& message,
- std::unique_ptr<WebMessagePortChannelArray> channels) {
- IPC::Message* msg = new MessagePortHostMsg_PostMessage(
- message_port_id_, message, ExtractMessagePortIDs(std::move(channels)));
- Send(msg);
+void WebMessagePortChannelImpl::postMessage(
+ const WebString& encoded_message,
+ WebMessagePortChannelArray channels) {
+ std::vector<MessagePort> ports;
+ if (!channels.isEmpty()) {
+ ports.resize(channels.size());
+ for (size_t i = 0; i < channels.size(); ++i) {
+ ports[i] = static_cast<WebMessagePortChannelImpl*>(channels[i].get())->
+ ReleaseMessagePort();
+ }
+ }
+ port_.PostMessage(encoded_message.utf16(), std::move(ports));
}
bool WebMessagePortChannelImpl::tryGetMessage(
- WebString* message,
+ WebString* encoded_message,
WebMessagePortChannelArray& channels) {
- base::AutoLock auto_lock(lock_);
- if (message_queue_.empty())
+ base::string16 buffer;
+ std::vector<MessagePort> ports;
+ if (!port_.GetMessage(&buffer, &ports))
return false;
- *message = WebString::fromUTF16(message_queue_.front().message);
- channels = message_queue_.front().ports;
- message_queue_.pop();
- return true;
-}
+ *encoded_message = WebString::fromUTF16(buffer);
-void WebMessagePortChannelImpl::Init() {
- if (!main_thread_task_runner_->BelongsToCurrentThread()) {
- main_thread_task_runner_->PostTask(
- FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
- return;
+ if (!ports.empty()) {
+ channels = WebMessagePortChannelArray(ports.size());
+ for (size_t i = 0; i < ports.size(); ++i)
+ channels[i] = base::MakeUnique<WebMessagePortChannelImpl>(ports[i]);
}
-
- if (route_id_ == MSG_ROUTING_NONE) {
- DCHECK(message_port_id_ == MSG_ROUTING_NONE);
- Send(new MessagePortHostMsg_CreateMessagePort(
- &route_id_, &message_port_id_));
- } else if (message_port_id_ != MSG_ROUTING_NONE) {
- Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_));
- }
-
- ChildThreadImpl::current()->GetRouter()->AddRoute(route_id_, this);
-}
-
-void WebMessagePortChannelImpl::Entangle(
- scoped_refptr<WebMessagePortChannelImpl> channel) {
- // The message port ids might not be set up yet, if this channel wasn't
- // created on the main thread. So need to wait until we're on the main thread
- // before getting the other message port id.
- if (!main_thread_task_runner_->BelongsToCurrentThread()) {
- main_thread_task_runner_->PostTask(
- FROM_HERE,
- base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
- return;
- }
-
- Send(new MessagePortHostMsg_Entangle(
- message_port_id_, channel->message_port_id()));
-}
-
-void WebMessagePortChannelImpl::QueueMessages() {
- if (!main_thread_task_runner_->BelongsToCurrentThread()) {
- main_thread_task_runner_->PostTask(
- FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
- return;
- }
- // This message port is being sent elsewhere (perhaps to another process).
- // The new endpoint needs to receive the queued messages, including ones that
- // could still be in-flight. So we tell the browser to queue messages, and it
- // sends us an ack, whose receipt we know means that no more messages are
- // in-flight. We then send the queued messages to the browser, which prepends
- // them to the ones it queued and it sends them to the new endpoint.
- Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
-
- // The process could potentially go away while we're still waiting for
- // in-flight messages. Ensure it stays alive.
- ChildProcess::current()->AddRefProcess();
-}
-
-void WebMessagePortChannelImpl::Send(IPC::Message* message) {
- if (!main_thread_task_runner_->BelongsToCurrentThread()) {
- DCHECK(!message->is_sync());
- main_thread_task_runner_->PostTask(
- FROM_HERE,
- base::Bind(&WebMessagePortChannelImpl::Send, this, message));
- return;
- }
-
- ChildThreadImpl::current()->GetRouter()->Send(message);
-}
-
-bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
- bool handled = true;
- IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
- IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
- IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
- IPC_MESSAGE_UNHANDLED(handled = false)
- IPC_END_MESSAGE_MAP()
- return handled;
-}
-
-void WebMessagePortChannelImpl::OnMessage(
- const base::string16& message,
- const std::vector<int>& sent_message_ports,
- const std::vector<int>& new_routing_ids) {
- base::AutoLock auto_lock(lock_);
- Message msg;
- msg.message = message;
- msg.ports = CreatePorts(sent_message_ports, new_routing_ids,
- main_thread_task_runner_.get());
-
- bool was_empty = message_queue_.empty();
- message_queue_.push(msg);
- if (client_ && was_empty)
- client_->messageAvailable();
-}
-
-void WebMessagePortChannelImpl::OnMessagesQueued() {
- std::vector<QueuedMessage> queued_messages;
-
- {
- base::AutoLock auto_lock(lock_);
- queued_messages.reserve(message_queue_.size());
- while (!message_queue_.empty()) {
- base::string16 message = message_queue_.front().message;
- std::vector<int> ports =
- ExtractMessagePortIDs(message_queue_.front().ports);
- queued_messages.push_back(std::make_pair(message, ports));
- message_queue_.pop();
- }
- }
-
- Send(new MessagePortHostMsg_SendQueuedMessages(
- message_port_id_, queued_messages));
-
- message_port_id_ = MSG_ROUTING_NONE;
-
- Release();
- ChildProcess::current()->ReleaseProcess();
+ return true;
}
-WebMessagePortChannelImpl::Message::Message() {}
-
-WebMessagePortChannelImpl::Message::Message(const Message& other) = default;
-
-WebMessagePortChannelImpl::Message::~Message() {}
-
} // namespace content
« no previous file with comments | « content/child/webmessageportchannel_impl.h ('k') | content/common/BUILD.gn » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698