Chromium Code Reviews| Index: content/child/webmessageportchannel_impl.cc |
| diff --git a/content/child/webmessageportchannel_impl.cc b/content/child/webmessageportchannel_impl.cc |
| index 2b4d265a895c981473ee0968e714490b6d43f3e1..c3644cf4bf8392acad630e440d6f2099b3603883 100644 |
| --- a/content/child/webmessageportchannel_impl.cc |
| +++ b/content/child/webmessageportchannel_impl.cc |
| @@ -5,16 +5,11 @@ |
| #include "content/child/webmessageportchannel_impl.h" |
| #include <stddef.h> |
| -#include <utility> |
| #include "base/bind.h" |
| -#include "content/child/child_process.h" |
| -#include "content/child/child_thread_impl.h" |
| -#include "content/common/message_port_messages.h" |
| +#include "base/logging.h" |
| #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" |
| #include "third_party/WebKit/public/platform/WebString.h" |
| -#include "third_party/WebKit/public/web/WebSerializedScriptValue.h" |
| -#include "v8/include/v8.h" |
| using blink::WebMessagePortChannel; |
| using blink::WebMessagePortChannelArray; |
| @@ -23,303 +18,120 @@ using blink::WebString; |
| namespace content { |
| -WebMessagePortChannelImpl::WebMessagePortChannelImpl( |
| - const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner) |
| - : client_(NULL), |
| - route_id_(MSG_ROUTING_NONE), |
| - message_port_id_(MSG_ROUTING_NONE), |
| - main_thread_task_runner_(main_thread_task_runner) { |
| - AddRef(); |
| - Init(); |
| +WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { |
| + setClient(nullptr); |
| } |
| WebMessagePortChannelImpl::WebMessagePortChannelImpl( |
| - int route_id, |
| - int port_id, |
| - const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner) |
| - : client_(NULL), |
| - route_id_(route_id), |
| - message_port_id_(port_id), |
| - main_thread_task_runner_(main_thread_task_runner) { |
| - AddRef(); |
| - Init(); |
| -} |
| - |
| -WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { |
| - // If we have any queued messages with attached ports, manually destroy them. |
| - while (!message_queue_.empty()) { |
| - const WebMessagePortChannelArray& channel_array = |
| - message_queue_.front().ports; |
| - for (size_t i = 0; i < channel_array.size(); i++) { |
| - channel_array[i]->destroy(); |
| - } |
| - message_queue_.pop(); |
| - } |
| - |
| - if (message_port_id_ != MSG_ROUTING_NONE) |
| - Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_)); |
| - |
| - if (route_id_ != MSG_ROUTING_NONE) |
| - ChildThreadImpl::current()->GetRouter()->RemoveRoute(route_id_); |
| + MessagePort message_port) |
| + : port_(message_port.ReleaseHandle()) { |
| } |
| // static |
| void WebMessagePortChannelImpl::CreatePair( |
| - const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner, |
| blink::WebMessagePortChannel** channel1, |
| blink::WebMessagePortChannel** channel2) { |
| - WebMessagePortChannelImpl* impl1 = |
| - new WebMessagePortChannelImpl(main_thread_task_runner); |
| - WebMessagePortChannelImpl* impl2 = |
| - new WebMessagePortChannelImpl(main_thread_task_runner); |
| - |
| - impl1->Entangle(impl2); |
| - impl2->Entangle(impl1); |
| - |
| - *channel1 = impl1; |
| - *channel2 = impl2; |
| + mojo::MessagePipe pipe; |
| + *channel1 = new WebMessagePortChannelImpl(std::move(pipe.handle0)); |
| + *channel2 = new WebMessagePortChannelImpl(std::move(pipe.handle1)); |
| } |
| // static |
| -std::vector<int> |
| -WebMessagePortChannelImpl::ExtractMessagePortIDs( |
| +std::vector<MessagePort> |
| +WebMessagePortChannelImpl::ExtractMessagePorts( |
| std::unique_ptr<WebMessagePortChannelArray> channels) { |
| - std::vector<int> message_ports; |
| + std::vector<MessagePort> message_ports; |
| if (channels) |
| - message_ports = ExtractMessagePortIDs(*channels); |
| + message_ports = ExtractMessagePorts(*channels); |
| return message_ports; |
| } |
| // static |
| -std::vector<int> |
| -WebMessagePortChannelImpl::ExtractMessagePortIDs( |
| +std::vector<MessagePort> |
| +WebMessagePortChannelImpl::ExtractMessagePorts( |
| const WebMessagePortChannelArray& channels) { |
| - std::vector<int> message_ports(channels.size()); |
| + std::vector<MessagePort> message_ports(channels.size()); |
| for (size_t i = 0; i < channels.size(); ++i) { |
| - WebMessagePortChannelImpl* webchannel = |
| + WebMessagePortChannelImpl* channel_impl = |
| static_cast<WebMessagePortChannelImpl*>(channels[i]); |
| - // The message port ids might not be set up yet if this channel |
| - // wasn't created on the main thread. |
| - DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread()); |
| - message_ports[i] = webchannel->message_port_id(); |
| - webchannel->QueueMessages(); |
| - DCHECK(message_ports[i] != MSG_ROUTING_NONE); |
| + message_ports[i] = channel_impl->ReleaseMessagePort(); |
| + DCHECK(message_ports[i].GetHandle().is_valid()); |
|
kinuko
2017/01/24 12:32:54
I think we're leaking WebMessagePortChannel here n
darin (slow to review)
2017/01/26 22:20:03
Oh, wow. I didn't realize ownership of the WebMess
|
| } |
| return message_ports; |
| } |
| // static |
| -std::vector<int> |
| -WebMessagePortChannelImpl::ExtractMessagePortIDsWithoutQueueing( |
| - std::unique_ptr<WebMessagePortChannelArray> channels) { |
| - if (!channels) |
| - return std::vector<int>(); |
| - |
| - std::vector<int> message_ports(channels->size()); |
| - for (size_t i = 0; i < channels->size(); ++i) { |
| - WebMessagePortChannelImpl* webchannel = |
| - static_cast<WebMessagePortChannelImpl*>((*channels)[i]); |
| - // The message port ids might not be set up yet if this channel |
| - // wasn't created on the main thread. |
| - DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread()); |
| - message_ports[i] = webchannel->message_port_id(); |
| - // Don't queue messages, but do increase the child processes ref-count to |
| - // ensure this child process stays alive long enough to receive all |
| - // in-flight messages. |
| - ChildProcess::current()->AddRefProcess(); |
| - DCHECK(message_ports[i] != MSG_ROUTING_NONE); |
| - } |
| - return message_ports; |
| +WebMessagePortChannelArray |
| +WebMessagePortChannelImpl::CreateFromMessagePorts( |
| + const std::vector<MessagePort>& message_ports) { |
| + WebMessagePortChannelArray channels(message_ports.size()); |
| + for (size_t i = 0; i < message_ports.size(); ++i) |
| + channels[i] = new WebMessagePortChannelImpl(message_ports[i]); |
| + return channels; |
| } |
| // static |
| -WebMessagePortChannelArray WebMessagePortChannelImpl::CreatePorts( |
| - const std::vector<int>& message_ports, |
| - const std::vector<int>& new_routing_ids, |
| - const scoped_refptr<base::SingleThreadTaskRunner>& |
| - main_thread_task_runner) { |
| - DCHECK_EQ(message_ports.size(), new_routing_ids.size()); |
| - WebMessagePortChannelArray channels(message_ports.size()); |
| - for (size_t i = 0; i < message_ports.size() && i < new_routing_ids.size(); |
| - ++i) { |
| - channels[i] = new WebMessagePortChannelImpl( |
| - new_routing_ids[i], message_ports[i], |
| - main_thread_task_runner); |
| +WebMessagePortChannelArray |
| +WebMessagePortChannelImpl::CreateFromMessagePipeHandles( |
| + std::vector<mojo::ScopedMessagePipeHandle> handles) { |
| + WebMessagePortChannelArray channels(handles.size()); |
| + for (size_t i = 0; i < handles.size(); ++i) { |
| + channels[i] = |
| + new WebMessagePortChannelImpl(MessagePort(std::move(handles[i]))); |
| } |
| return channels; |
| } |
| -void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { |
| - // Must lock here since client_ is called on the main thread. |
| - base::AutoLock auto_lock(lock_); |
| - client_ = client; |
| +MessagePort WebMessagePortChannelImpl::ReleaseMessagePort() { |
| + return MessagePort(port_.ReleaseHandle()); |
| } |
| -void WebMessagePortChannelImpl::destroy() { |
| - setClient(NULL); |
| - |
| - // Release the object on the main thread, since the destructor might want to |
| - // send an IPC, and that has to happen on the main thread. |
| - main_thread_task_runner_->ReleaseSoon(FROM_HERE, this); |
| +WebMessagePortChannelImpl::WebMessagePortChannelImpl( |
| + mojo::ScopedMessagePipeHandle handle) |
| + : port_(std::move(handle)) { |
| } |
| -void WebMessagePortChannelImpl::postMessage( |
| - const WebString& message, |
| - WebMessagePortChannelArray* channels_ptr) { |
| - std::unique_ptr<WebMessagePortChannelArray> channels(channels_ptr); |
| - if (!main_thread_task_runner_->BelongsToCurrentThread()) { |
| - // Note: we must construct the base::string16 here and pass that. Otherwise, |
| - // the WebString will be passed, leading to references to the StringImpl |
| - // from two threads, which is a data race. |
| - main_thread_task_runner_->PostTask( |
| - FROM_HERE, base::Bind(&WebMessagePortChannelImpl::SendPostMessage, this, |
| - base::Passed(base::string16(message)), |
| - base::Passed(std::move(channels)))); |
| +void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { |
| + if (client) { |
| + port_.SetCallback( |
| + base::Bind(&WebMessagePortChannelClient::messageAvailable, |
| + base::Unretained(client))); |
| } else { |
| - SendPostMessage(message, std::move(channels)); |
| + port_.ClearCallback(); |
| } |
| } |
| -void WebMessagePortChannelImpl::SendPostMessage( |
| - const base::string16& message, |
| - std::unique_ptr<WebMessagePortChannelArray> channels) { |
| - IPC::Message* msg = new MessagePortHostMsg_PostMessage( |
| - message_port_id_, message, ExtractMessagePortIDs(std::move(channels))); |
| - Send(msg); |
| +void WebMessagePortChannelImpl::postMessage( |
| + const WebString& encoded_message, |
| + WebMessagePortChannelArray* channels_ptr) { |
|
kinuko
2017/01/24 12:32:55
This method is supposed to take channels_ptr, shou
darin (slow to review)
2017/01/26 22:20:03
Yes, good call
|
| + // TODO: Address race condition with blob registration. |
|
kinuko
2017/01/24 12:32:55
Do we still have race condition now that we use sy
darin (slow to review)
2017/01/26 22:20:03
Oh, yes, I can kill this comment now. That was jus
|
| + std::vector<MessagePort> ports; |
| + if (channels_ptr && !channels_ptr->isEmpty()) { |
| + ports.resize(channels_ptr->size()); |
| + for (size_t i = 0; i < channels_ptr->size(); ++i) { |
| + ports[i] = static_cast<WebMessagePortChannelImpl*>((*channels_ptr)[i])-> |
| + ReleaseMessagePort(); |
|
kinuko
2017/01/24 12:32:54
This probably is leaking WebMessagePortChannel her
darin (slow to review)
2017/01/26 22:20:03
Yeah, that would probably be much better.
|
| + } |
| + } |
| + port_.PostMessage(encoded_message, std::move(ports)); |
| } |
| bool WebMessagePortChannelImpl::tryGetMessage( |
| - WebString* message, |
| + WebString* encoded_message, |
| WebMessagePortChannelArray& channels) { |
| - base::AutoLock auto_lock(lock_); |
| - if (message_queue_.empty()) |
| + base::string16 buffer; |
| + std::vector<MessagePort> ports; |
| + if (!port_.GetMessage(&buffer, &ports)) |
| return false; |
| - *message = message_queue_.front().message; |
| - channels = message_queue_.front().ports; |
| - message_queue_.pop(); |
| - return true; |
| -} |
| + *encoded_message = buffer; |
| -void WebMessagePortChannelImpl::Init() { |
| - if (!main_thread_task_runner_->BelongsToCurrentThread()) { |
| - main_thread_task_runner_->PostTask( |
| - FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this)); |
| - return; |
| + if (!ports.empty()) { |
| + channels = WebMessagePortChannelArray(ports.size()); |
| + for (size_t i = 0; i < ports.size(); ++i) |
| + channels[i] = new WebMessagePortChannelImpl(ports[i].ReleaseHandle()); |
| } |
| - |
| - if (route_id_ == MSG_ROUTING_NONE) { |
| - DCHECK(message_port_id_ == MSG_ROUTING_NONE); |
| - Send(new MessagePortHostMsg_CreateMessagePort( |
| - &route_id_, &message_port_id_)); |
| - } else if (message_port_id_ != MSG_ROUTING_NONE) { |
| - Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_)); |
| - } |
| - |
| - ChildThreadImpl::current()->GetRouter()->AddRoute(route_id_, this); |
| -} |
| - |
| -void WebMessagePortChannelImpl::Entangle( |
| - scoped_refptr<WebMessagePortChannelImpl> channel) { |
| - // The message port ids might not be set up yet, if this channel wasn't |
| - // created on the main thread. So need to wait until we're on the main thread |
| - // before getting the other message port id. |
| - if (!main_thread_task_runner_->BelongsToCurrentThread()) { |
| - main_thread_task_runner_->PostTask( |
| - FROM_HERE, |
| - base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel)); |
| - return; |
| - } |
| - |
| - Send(new MessagePortHostMsg_Entangle( |
| - message_port_id_, channel->message_port_id())); |
| -} |
| - |
| -void WebMessagePortChannelImpl::QueueMessages() { |
| - if (!main_thread_task_runner_->BelongsToCurrentThread()) { |
| - main_thread_task_runner_->PostTask( |
| - FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this)); |
| - return; |
| - } |
| - // This message port is being sent elsewhere (perhaps to another process). |
| - // The new endpoint needs to receive the queued messages, including ones that |
| - // could still be in-flight. So we tell the browser to queue messages, and it |
| - // sends us an ack, whose receipt we know means that no more messages are |
| - // in-flight. We then send the queued messages to the browser, which prepends |
| - // them to the ones it queued and it sends them to the new endpoint. |
| - Send(new MessagePortHostMsg_QueueMessages(message_port_id_)); |
| - |
| - // The process could potentially go away while we're still waiting for |
| - // in-flight messages. Ensure it stays alive. |
| - ChildProcess::current()->AddRefProcess(); |
| -} |
| - |
| -void WebMessagePortChannelImpl::Send(IPC::Message* message) { |
| - if (!main_thread_task_runner_->BelongsToCurrentThread()) { |
| - DCHECK(!message->is_sync()); |
| - main_thread_task_runner_->PostTask( |
| - FROM_HERE, |
| - base::Bind(&WebMessagePortChannelImpl::Send, this, message)); |
| - return; |
| - } |
| - |
| - ChildThreadImpl::current()->GetRouter()->Send(message); |
| -} |
| - |
| -bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) { |
| - bool handled = true; |
| - IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message) |
| - IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage) |
| - IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued) |
| - IPC_MESSAGE_UNHANDLED(handled = false) |
| - IPC_END_MESSAGE_MAP() |
| - return handled; |
| -} |
| - |
| -void WebMessagePortChannelImpl::OnMessage( |
| - const base::string16& message, |
| - const std::vector<int>& sent_message_ports, |
| - const std::vector<int>& new_routing_ids) { |
| - base::AutoLock auto_lock(lock_); |
| - Message msg; |
| - msg.message = message; |
| - msg.ports = CreatePorts(sent_message_ports, new_routing_ids, |
| - main_thread_task_runner_.get()); |
| - |
| - bool was_empty = message_queue_.empty(); |
| - message_queue_.push(msg); |
| - if (client_ && was_empty) |
| - client_->messageAvailable(); |
| -} |
| - |
| -void WebMessagePortChannelImpl::OnMessagesQueued() { |
| - std::vector<QueuedMessage> queued_messages; |
| - |
| - { |
| - base::AutoLock auto_lock(lock_); |
| - queued_messages.reserve(message_queue_.size()); |
| - while (!message_queue_.empty()) { |
| - base::string16 message = message_queue_.front().message; |
| - std::vector<int> ports = |
| - ExtractMessagePortIDs(message_queue_.front().ports); |
| - queued_messages.push_back(std::make_pair(message, ports)); |
| - message_queue_.pop(); |
| - } |
| - } |
| - |
| - Send(new MessagePortHostMsg_SendQueuedMessages( |
| - message_port_id_, queued_messages)); |
| - |
| - message_port_id_ = MSG_ROUTING_NONE; |
| - |
| - Release(); |
| - ChildProcess::current()->ReleaseProcess(); |
| + return true; |
| } |
| -WebMessagePortChannelImpl::Message::Message() {} |
| - |
| -WebMessagePortChannelImpl::Message::Message(const Message& other) = default; |
| - |
| -WebMessagePortChannelImpl::Message::~Message() {} |
| - |
| } // namespace content |