 Chromium Code Reviews
 Chromium Code Reviews Issue 2422793002:
  HTML MessagePort as mojo::MessagePipeHandle  (Closed)
    
  
    Issue 2422793002:
  HTML MessagePort as mojo::MessagePipeHandle  (Closed) 
  | OLD | NEW | 
|---|---|
| 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 #include "content/child/webmessageportchannel_impl.h" | 5 #include "content/child/webmessageportchannel_impl.h" | 
| 6 | 6 | 
| 7 #include <stddef.h> | 7 #include <stddef.h> | 
| 8 #include <utility> | |
| 9 | 8 | 
| 10 #include "base/bind.h" | 9 #include "base/bind.h" | 
| 11 #include "content/child/child_process.h" | 10 #include "base/logging.h" | 
| 12 #include "content/child/child_thread_impl.h" | |
| 13 #include "content/common/message_port_messages.h" | |
| 14 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" | 11 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" | 
| 15 #include "third_party/WebKit/public/platform/WebString.h" | 12 #include "third_party/WebKit/public/platform/WebString.h" | 
| 16 #include "third_party/WebKit/public/web/WebSerializedScriptValue.h" | |
| 17 #include "v8/include/v8.h" | |
| 18 | 13 | 
| 19 using blink::WebMessagePortChannel; | 14 using blink::WebMessagePortChannel; | 
| 20 using blink::WebMessagePortChannelArray; | 15 using blink::WebMessagePortChannelArray; | 
| 21 using blink::WebMessagePortChannelClient; | 16 using blink::WebMessagePortChannelClient; | 
| 22 using blink::WebString; | 17 using blink::WebString; | 
| 23 | 18 | 
| 24 namespace content { | 19 namespace content { | 
| 25 | 20 | 
| 26 WebMessagePortChannelImpl::WebMessagePortChannelImpl( | 21 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { | 
| 27 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner) | 22 setClient(nullptr); | 
| 28 : client_(NULL), | |
| 29 route_id_(MSG_ROUTING_NONE), | |
| 30 message_port_id_(MSG_ROUTING_NONE), | |
| 31 main_thread_task_runner_(main_thread_task_runner) { | |
| 32 AddRef(); | |
| 33 Init(); | |
| 34 } | 23 } | 
| 35 | 24 | 
| 36 WebMessagePortChannelImpl::WebMessagePortChannelImpl( | 25 WebMessagePortChannelImpl::WebMessagePortChannelImpl( | 
| 37 int route_id, | 26 MessagePort message_port) | 
| 38 int port_id, | 27 : port_(message_port.ReleaseHandle()) { | 
| 39 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner) | |
| 40 : client_(NULL), | |
| 41 route_id_(route_id), | |
| 42 message_port_id_(port_id), | |
| 43 main_thread_task_runner_(main_thread_task_runner) { | |
| 44 AddRef(); | |
| 45 Init(); | |
| 46 } | |
| 47 | |
| 48 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { | |
| 49 // If we have any queued messages with attached ports, manually destroy them. | |
| 50 while (!message_queue_.empty()) { | |
| 51 const WebMessagePortChannelArray& channel_array = | |
| 52 message_queue_.front().ports; | |
| 53 for (size_t i = 0; i < channel_array.size(); i++) { | |
| 54 channel_array[i]->destroy(); | |
| 55 } | |
| 56 message_queue_.pop(); | |
| 57 } | |
| 58 | |
| 59 if (message_port_id_ != MSG_ROUTING_NONE) | |
| 60 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_)); | |
| 61 | |
| 62 if (route_id_ != MSG_ROUTING_NONE) | |
| 63 ChildThreadImpl::current()->GetRouter()->RemoveRoute(route_id_); | |
| 64 } | 28 } | 
| 65 | 29 | 
| 66 // static | 30 // static | 
| 67 void WebMessagePortChannelImpl::CreatePair( | 31 void WebMessagePortChannelImpl::CreatePair( | 
| 68 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner, | |
| 69 blink::WebMessagePortChannel** channel1, | 32 blink::WebMessagePortChannel** channel1, | 
| 70 blink::WebMessagePortChannel** channel2) { | 33 blink::WebMessagePortChannel** channel2) { | 
| 71 WebMessagePortChannelImpl* impl1 = | 34 mojo::MessagePipe pipe; | 
| 72 new WebMessagePortChannelImpl(main_thread_task_runner); | 35 *channel1 = new WebMessagePortChannelImpl(std::move(pipe.handle0)); | 
| 73 WebMessagePortChannelImpl* impl2 = | 36 *channel2 = new WebMessagePortChannelImpl(std::move(pipe.handle1)); | 
| 74 new WebMessagePortChannelImpl(main_thread_task_runner); | |
| 75 | |
| 76 impl1->Entangle(impl2); | |
| 77 impl2->Entangle(impl1); | |
| 78 | |
| 79 *channel1 = impl1; | |
| 80 *channel2 = impl2; | |
| 81 } | 37 } | 
| 82 | 38 | 
| 83 // static | 39 // static | 
| 84 std::vector<int> | 40 std::vector<MessagePort> | 
| 85 WebMessagePortChannelImpl::ExtractMessagePortIDs( | 41 WebMessagePortChannelImpl::ExtractMessagePorts( | 
| 86 std::unique_ptr<WebMessagePortChannelArray> channels) { | 42 std::unique_ptr<WebMessagePortChannelArray> channels) { | 
| 87 std::vector<int> message_ports; | 43 std::vector<MessagePort> message_ports; | 
| 88 if (channels) | 44 if (channels) | 
| 89 message_ports = ExtractMessagePortIDs(*channels); | 45 message_ports = ExtractMessagePorts(*channels); | 
| 90 return message_ports; | 46 return message_ports; | 
| 91 } | 47 } | 
| 92 | 48 | 
| 93 // static | 49 // static | 
| 94 std::vector<int> | 50 std::vector<MessagePort> | 
| 95 WebMessagePortChannelImpl::ExtractMessagePortIDs( | 51 WebMessagePortChannelImpl::ExtractMessagePorts( | 
| 96 const WebMessagePortChannelArray& channels) { | 52 const WebMessagePortChannelArray& channels) { | 
| 97 std::vector<int> message_ports(channels.size()); | 53 std::vector<MessagePort> message_ports(channels.size()); | 
| 98 for (size_t i = 0; i < channels.size(); ++i) { | 54 for (size_t i = 0; i < channels.size(); ++i) { | 
| 99 WebMessagePortChannelImpl* webchannel = | 55 WebMessagePortChannelImpl* channel_impl = | 
| 100 static_cast<WebMessagePortChannelImpl*>(channels[i]); | 56 static_cast<WebMessagePortChannelImpl*>(channels[i]); | 
| 101 // The message port ids might not be set up yet if this channel | 57 message_ports[i] = channel_impl->ReleaseMessagePort(); | 
| 102 // wasn't created on the main thread. | 58 DCHECK(message_ports[i].GetHandle().is_valid()); | 
| 
kinuko
2017/01/24 12:32:54
I think we're leaking WebMessagePortChannel here n
 
darin (slow to review)
2017/01/26 22:20:03
Oh, wow. I didn't realize ownership of the WebMess
 | |
| 103 DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread()); | |
| 104 message_ports[i] = webchannel->message_port_id(); | |
| 105 webchannel->QueueMessages(); | |
| 106 DCHECK(message_ports[i] != MSG_ROUTING_NONE); | |
| 107 } | 59 } | 
| 108 return message_ports; | 60 return message_ports; | 
| 109 } | 61 } | 
| 110 | 62 | 
| 111 // static | 63 // static | 
| 112 std::vector<int> | 64 WebMessagePortChannelArray | 
| 113 WebMessagePortChannelImpl::ExtractMessagePortIDsWithoutQueueing( | 65 WebMessagePortChannelImpl::CreateFromMessagePorts( | 
| 114 std::unique_ptr<WebMessagePortChannelArray> channels) { | 66 const std::vector<MessagePort>& message_ports) { | 
| 115 if (!channels) | 67 WebMessagePortChannelArray channels(message_ports.size()); | 
| 116 return std::vector<int>(); | 68 for (size_t i = 0; i < message_ports.size(); ++i) | 
| 117 | 69 channels[i] = new WebMessagePortChannelImpl(message_ports[i]); | 
| 118 std::vector<int> message_ports(channels->size()); | 70 return channels; | 
| 119 for (size_t i = 0; i < channels->size(); ++i) { | |
| 120 WebMessagePortChannelImpl* webchannel = | |
| 121 static_cast<WebMessagePortChannelImpl*>((*channels)[i]); | |
| 122 // The message port ids might not be set up yet if this channel | |
| 123 // wasn't created on the main thread. | |
| 124 DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread()); | |
| 125 message_ports[i] = webchannel->message_port_id(); | |
| 126 // Don't queue messages, but do increase the child processes ref-count to | |
| 127 // ensure this child process stays alive long enough to receive all | |
| 128 // in-flight messages. | |
| 129 ChildProcess::current()->AddRefProcess(); | |
| 130 DCHECK(message_ports[i] != MSG_ROUTING_NONE); | |
| 131 } | |
| 132 return message_ports; | |
| 133 } | 71 } | 
| 134 | 72 | 
| 135 // static | 73 // static | 
| 136 WebMessagePortChannelArray WebMessagePortChannelImpl::CreatePorts( | 74 WebMessagePortChannelArray | 
| 137 const std::vector<int>& message_ports, | 75 WebMessagePortChannelImpl::CreateFromMessagePipeHandles( | 
| 138 const std::vector<int>& new_routing_ids, | 76 std::vector<mojo::ScopedMessagePipeHandle> handles) { | 
| 139 const scoped_refptr<base::SingleThreadTaskRunner>& | 77 WebMessagePortChannelArray channels(handles.size()); | 
| 140 main_thread_task_runner) { | 78 for (size_t i = 0; i < handles.size(); ++i) { | 
| 141 DCHECK_EQ(message_ports.size(), new_routing_ids.size()); | 79 channels[i] = | 
| 142 WebMessagePortChannelArray channels(message_ports.size()); | 80 new WebMessagePortChannelImpl(MessagePort(std::move(handles[i]))); | 
| 143 for (size_t i = 0; i < message_ports.size() && i < new_routing_ids.size(); | |
| 144 ++i) { | |
| 145 channels[i] = new WebMessagePortChannelImpl( | |
| 146 new_routing_ids[i], message_ports[i], | |
| 147 main_thread_task_runner); | |
| 148 } | 81 } | 
| 149 return channels; | 82 return channels; | 
| 150 } | 83 } | 
| 151 | 84 | 
| 152 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { | 85 MessagePort WebMessagePortChannelImpl::ReleaseMessagePort() { | 
| 153 // Must lock here since client_ is called on the main thread. | 86 return MessagePort(port_.ReleaseHandle()); | 
| 154 base::AutoLock auto_lock(lock_); | |
| 155 client_ = client; | |
| 156 } | 87 } | 
| 157 | 88 | 
| 158 void WebMessagePortChannelImpl::destroy() { | 89 WebMessagePortChannelImpl::WebMessagePortChannelImpl( | 
| 159 setClient(NULL); | 90 mojo::ScopedMessagePipeHandle handle) | 
| 91 : port_(std::move(handle)) { | |
| 92 } | |
| 160 | 93 | 
| 161 // Release the object on the main thread, since the destructor might want to | 94 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { | 
| 162 // send an IPC, and that has to happen on the main thread. | 95 if (client) { | 
| 163 main_thread_task_runner_->ReleaseSoon(FROM_HERE, this); | 96 port_.SetCallback( | 
| 97 base::Bind(&WebMessagePortChannelClient::messageAvailable, | |
| 98 base::Unretained(client))); | |
| 99 } else { | |
| 100 port_.ClearCallback(); | |
| 101 } | |
| 164 } | 102 } | 
| 165 | 103 | 
| 166 void WebMessagePortChannelImpl::postMessage( | 104 void WebMessagePortChannelImpl::postMessage( | 
| 167 const WebString& message, | 105 const WebString& encoded_message, | 
| 168 WebMessagePortChannelArray* channels_ptr) { | 106 WebMessagePortChannelArray* channels_ptr) { | 
| 
kinuko
2017/01/24 12:32:55
This method is supposed to take channels_ptr, shou
 
darin (slow to review)
2017/01/26 22:20:03
Yes, good call
 | |
| 169 std::unique_ptr<WebMessagePortChannelArray> channels(channels_ptr); | 107 // TODO: Address race condition with blob registration. | 
| 
kinuko
2017/01/24 12:32:55
Do we still have race condition now that we use sy
 
darin (slow to review)
2017/01/26 22:20:03
Oh, yes, I can kill this comment now. That was jus
 | |
| 170 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | 108 std::vector<MessagePort> ports; | 
| 171 // Note: we must construct the base::string16 here and pass that. Otherwise, | 109 if (channels_ptr && !channels_ptr->isEmpty()) { | 
| 172 // the WebString will be passed, leading to references to the StringImpl | 110 ports.resize(channels_ptr->size()); | 
| 173 // from two threads, which is a data race. | 111 for (size_t i = 0; i < channels_ptr->size(); ++i) { | 
| 174 main_thread_task_runner_->PostTask( | 112 ports[i] = static_cast<WebMessagePortChannelImpl*>((*channels_ptr)[i])-> | 
| 175 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::SendPostMessage, this, | 113 ReleaseMessagePort(); | 
| 
kinuko
2017/01/24 12:32:54
This probably is leaking WebMessagePortChannel her
 
darin (slow to review)
2017/01/26 22:20:03
Yeah, that would probably be much better.
 | |
| 176 base::Passed(base::string16(message)), | 114 } | 
| 177 base::Passed(std::move(channels)))); | |
| 178 } else { | |
| 179 SendPostMessage(message, std::move(channels)); | |
| 180 } | 115 } | 
| 181 } | 116 port_.PostMessage(encoded_message, std::move(ports)); | 
| 182 | |
| 183 void WebMessagePortChannelImpl::SendPostMessage( | |
| 184 const base::string16& message, | |
| 185 std::unique_ptr<WebMessagePortChannelArray> channels) { | |
| 186 IPC::Message* msg = new MessagePortHostMsg_PostMessage( | |
| 187 message_port_id_, message, ExtractMessagePortIDs(std::move(channels))); | |
| 188 Send(msg); | |
| 189 } | 117 } | 
| 190 | 118 | 
| 191 bool WebMessagePortChannelImpl::tryGetMessage( | 119 bool WebMessagePortChannelImpl::tryGetMessage( | 
| 192 WebString* message, | 120 WebString* encoded_message, | 
| 193 WebMessagePortChannelArray& channels) { | 121 WebMessagePortChannelArray& channels) { | 
| 194 base::AutoLock auto_lock(lock_); | 122 base::string16 buffer; | 
| 195 if (message_queue_.empty()) | 123 std::vector<MessagePort> ports; | 
| 124 if (!port_.GetMessage(&buffer, &ports)) | |
| 196 return false; | 125 return false; | 
| 197 | 126 | 
| 198 *message = message_queue_.front().message; | 127 *encoded_message = buffer; | 
| 199 channels = message_queue_.front().ports; | 128 | 
| 200 message_queue_.pop(); | 129 if (!ports.empty()) { | 
| 130 channels = WebMessagePortChannelArray(ports.size()); | |
| 131 for (size_t i = 0; i < ports.size(); ++i) | |
| 132 channels[i] = new WebMessagePortChannelImpl(ports[i].ReleaseHandle()); | |
| 133 } | |
| 201 return true; | 134 return true; | 
| 202 } | 135 } | 
| 203 | 136 | 
| 204 void WebMessagePortChannelImpl::Init() { | |
| 205 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | |
| 206 main_thread_task_runner_->PostTask( | |
| 207 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this)); | |
| 208 return; | |
| 209 } | |
| 210 | |
| 211 if (route_id_ == MSG_ROUTING_NONE) { | |
| 212 DCHECK(message_port_id_ == MSG_ROUTING_NONE); | |
| 213 Send(new MessagePortHostMsg_CreateMessagePort( | |
| 214 &route_id_, &message_port_id_)); | |
| 215 } else if (message_port_id_ != MSG_ROUTING_NONE) { | |
| 216 Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_)); | |
| 217 } | |
| 218 | |
| 219 ChildThreadImpl::current()->GetRouter()->AddRoute(route_id_, this); | |
| 220 } | |
| 221 | |
| 222 void WebMessagePortChannelImpl::Entangle( | |
| 223 scoped_refptr<WebMessagePortChannelImpl> channel) { | |
| 224 // The message port ids might not be set up yet, if this channel wasn't | |
| 225 // created on the main thread. So need to wait until we're on the main thread | |
| 226 // before getting the other message port id. | |
| 227 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | |
| 228 main_thread_task_runner_->PostTask( | |
| 229 FROM_HERE, | |
| 230 base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel)); | |
| 231 return; | |
| 232 } | |
| 233 | |
| 234 Send(new MessagePortHostMsg_Entangle( | |
| 235 message_port_id_, channel->message_port_id())); | |
| 236 } | |
| 237 | |
| 238 void WebMessagePortChannelImpl::QueueMessages() { | |
| 239 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | |
| 240 main_thread_task_runner_->PostTask( | |
| 241 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this)); | |
| 242 return; | |
| 243 } | |
| 244 // This message port is being sent elsewhere (perhaps to another process). | |
| 245 // The new endpoint needs to receive the queued messages, including ones that | |
| 246 // could still be in-flight. So we tell the browser to queue messages, and it | |
| 247 // sends us an ack, whose receipt we know means that no more messages are | |
| 248 // in-flight. We then send the queued messages to the browser, which prepends | |
| 249 // them to the ones it queued and it sends them to the new endpoint. | |
| 250 Send(new MessagePortHostMsg_QueueMessages(message_port_id_)); | |
| 251 | |
| 252 // The process could potentially go away while we're still waiting for | |
| 253 // in-flight messages. Ensure it stays alive. | |
| 254 ChildProcess::current()->AddRefProcess(); | |
| 255 } | |
| 256 | |
| 257 void WebMessagePortChannelImpl::Send(IPC::Message* message) { | |
| 258 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | |
| 259 DCHECK(!message->is_sync()); | |
| 260 main_thread_task_runner_->PostTask( | |
| 261 FROM_HERE, | |
| 262 base::Bind(&WebMessagePortChannelImpl::Send, this, message)); | |
| 263 return; | |
| 264 } | |
| 265 | |
| 266 ChildThreadImpl::current()->GetRouter()->Send(message); | |
| 267 } | |
| 268 | |
| 269 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) { | |
| 270 bool handled = true; | |
| 271 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message) | |
| 272 IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage) | |
| 273 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued) | |
| 274 IPC_MESSAGE_UNHANDLED(handled = false) | |
| 275 IPC_END_MESSAGE_MAP() | |
| 276 return handled; | |
| 277 } | |
| 278 | |
| 279 void WebMessagePortChannelImpl::OnMessage( | |
| 280 const base::string16& message, | |
| 281 const std::vector<int>& sent_message_ports, | |
| 282 const std::vector<int>& new_routing_ids) { | |
| 283 base::AutoLock auto_lock(lock_); | |
| 284 Message msg; | |
| 285 msg.message = message; | |
| 286 msg.ports = CreatePorts(sent_message_ports, new_routing_ids, | |
| 287 main_thread_task_runner_.get()); | |
| 288 | |
| 289 bool was_empty = message_queue_.empty(); | |
| 290 message_queue_.push(msg); | |
| 291 if (client_ && was_empty) | |
| 292 client_->messageAvailable(); | |
| 293 } | |
| 294 | |
| 295 void WebMessagePortChannelImpl::OnMessagesQueued() { | |
| 296 std::vector<QueuedMessage> queued_messages; | |
| 297 | |
| 298 { | |
| 299 base::AutoLock auto_lock(lock_); | |
| 300 queued_messages.reserve(message_queue_.size()); | |
| 301 while (!message_queue_.empty()) { | |
| 302 base::string16 message = message_queue_.front().message; | |
| 303 std::vector<int> ports = | |
| 304 ExtractMessagePortIDs(message_queue_.front().ports); | |
| 305 queued_messages.push_back(std::make_pair(message, ports)); | |
| 306 message_queue_.pop(); | |
| 307 } | |
| 308 } | |
| 309 | |
| 310 Send(new MessagePortHostMsg_SendQueuedMessages( | |
| 311 message_port_id_, queued_messages)); | |
| 312 | |
| 313 message_port_id_ = MSG_ROUTING_NONE; | |
| 314 | |
| 315 Release(); | |
| 316 ChildProcess::current()->ReleaseProcess(); | |
| 317 } | |
| 318 | |
| 319 WebMessagePortChannelImpl::Message::Message() {} | |
| 320 | |
| 321 WebMessagePortChannelImpl::Message::Message(const Message& other) = default; | |
| 322 | |
| 323 WebMessagePortChannelImpl::Message::~Message() {} | |
| 324 | |
| 325 } // namespace content | 137 } // namespace content | 
| OLD | NEW |