Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Side by Side Diff: content/child/webmessageportchannel_impl.cc

Issue 2422793002: HTML MessagePort as mojo::MessagePipeHandle (Closed)
Patch Set: Eliminate unnecessary PostTask Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/child/webmessageportchannel_impl.h" 5 #include "content/child/webmessageportchannel_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <utility>
9 8
10 #include "base/bind.h" 9 #include "base/bind.h"
11 #include "content/child/child_process.h" 10 #include "base/logging.h"
12 #include "content/child/child_thread_impl.h" 11 #include "base/memory/ptr_util.h"
13 #include "content/common/message_port_messages.h"
14 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" 12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
15 #include "third_party/WebKit/public/platform/WebString.h" 13 #include "third_party/WebKit/public/platform/WebString.h"
16 #include "third_party/WebKit/public/web/WebSerializedScriptValue.h"
17 #include "v8/include/v8.h"
18 14
19 using blink::WebMessagePortChannel; 15 using blink::WebMessagePortChannel;
20 using blink::WebMessagePortChannelArray; 16 using blink::WebMessagePortChannelArray;
21 using blink::WebMessagePortChannelClient; 17 using blink::WebMessagePortChannelClient;
22 using blink::WebString; 18 using blink::WebString;
23 19
24 namespace content { 20 namespace content {
25 21
26 WebMessagePortChannelImpl::WebMessagePortChannelImpl( 22 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
27 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner) 23 setClient(nullptr);
28 : client_(NULL),
29 route_id_(MSG_ROUTING_NONE),
30 message_port_id_(MSG_ROUTING_NONE),
31 main_thread_task_runner_(main_thread_task_runner) {
32 AddRef();
33 Init();
34 } 24 }
35 25
36 WebMessagePortChannelImpl::WebMessagePortChannelImpl( 26 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
37 int route_id, 27 MessagePort message_port)
38 int port_id, 28 : port_(message_port.ReleaseHandle()) {
39 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner)
40 : client_(NULL),
41 route_id_(route_id),
42 message_port_id_(port_id),
43 main_thread_task_runner_(main_thread_task_runner) {
44 AddRef();
45 Init();
46 }
47
48 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
49 // If we have any queued messages with attached ports, manually destroy them.
50 while (!message_queue_.empty()) {
51 const WebMessagePortChannelArray& channel_array =
52 message_queue_.front().ports;
53 for (size_t i = 0; i < channel_array.size(); i++) {
54 channel_array[i]->destroy();
55 }
56 message_queue_.pop();
57 }
58
59 if (message_port_id_ != MSG_ROUTING_NONE)
60 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
61
62 if (route_id_ != MSG_ROUTING_NONE)
63 ChildThreadImpl::current()->GetRouter()->RemoveRoute(route_id_);
64 } 29 }
65 30
66 // static 31 // static
67 void WebMessagePortChannelImpl::CreatePair( 32 void WebMessagePortChannelImpl::CreatePair(
68 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner,
69 blink::WebMessagePortChannel** channel1, 33 blink::WebMessagePortChannel** channel1,
70 blink::WebMessagePortChannel** channel2) { 34 blink::WebMessagePortChannel** channel2) {
71 WebMessagePortChannelImpl* impl1 = 35 mojo::MessagePipe pipe;
72 new WebMessagePortChannelImpl(main_thread_task_runner); 36 *channel1 = new WebMessagePortChannelImpl(std::move(pipe.handle0));
73 WebMessagePortChannelImpl* impl2 = 37 *channel2 = new WebMessagePortChannelImpl(std::move(pipe.handle1));
74 new WebMessagePortChannelImpl(main_thread_task_runner);
75
76 impl1->Entangle(impl2);
77 impl2->Entangle(impl1);
78
79 *channel1 = impl1;
80 *channel2 = impl2;
81 } 38 }
82 39
83 // static 40 // static
84 std::vector<int> 41 std::vector<MessagePort>
85 WebMessagePortChannelImpl::ExtractMessagePortIDs( 42 WebMessagePortChannelImpl::ExtractMessagePorts(
86 std::unique_ptr<WebMessagePortChannelArray> channels) { 43 WebMessagePortChannelArray channels) {
87 std::vector<int> message_ports; 44 std::vector<MessagePort> message_ports(channels.size());
88 if (channels)
89 message_ports = ExtractMessagePortIDs(*channels);
90 return message_ports;
91 }
92
93 // static
94 std::vector<int>
95 WebMessagePortChannelImpl::ExtractMessagePortIDs(
96 const WebMessagePortChannelArray& channels) {
97 std::vector<int> message_ports(channels.size());
98 for (size_t i = 0; i < channels.size(); ++i) { 45 for (size_t i = 0; i < channels.size(); ++i) {
99 WebMessagePortChannelImpl* webchannel = 46 WebMessagePortChannelImpl* channel_impl =
100 static_cast<WebMessagePortChannelImpl*>(channels[i]); 47 static_cast<WebMessagePortChannelImpl*>(channels[i].get());
101 // The message port ids might not be set up yet if this channel 48 message_ports[i] = channel_impl->ReleaseMessagePort();
102 // wasn't created on the main thread. 49 DCHECK(message_ports[i].GetHandle().is_valid());
103 DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread());
104 message_ports[i] = webchannel->message_port_id();
105 webchannel->QueueMessages();
106 DCHECK(message_ports[i] != MSG_ROUTING_NONE);
107 } 50 }
108 return message_ports; 51 return message_ports;
109 } 52 }
110 53
111 // static 54 // static
112 std::vector<int> 55 WebMessagePortChannelArray
113 WebMessagePortChannelImpl::ExtractMessagePortIDsWithoutQueueing( 56 WebMessagePortChannelImpl::CreateFromMessagePorts(
114 std::unique_ptr<WebMessagePortChannelArray> channels) { 57 const std::vector<MessagePort>& message_ports) {
115 if (!channels) 58 WebMessagePortChannelArray channels(message_ports.size());
116 return std::vector<int>(); 59 for (size_t i = 0; i < message_ports.size(); ++i)
117 60 channels[i] = base::MakeUnique<WebMessagePortChannelImpl>(message_ports[i]);
118 std::vector<int> message_ports(channels->size()); 61 return channels;
119 for (size_t i = 0; i < channels->size(); ++i) {
120 WebMessagePortChannelImpl* webchannel =
121 static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
122 // The message port ids might not be set up yet if this channel
123 // wasn't created on the main thread.
124 DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread());
125 message_ports[i] = webchannel->message_port_id();
126 // Don't queue messages, but do increase the child processes ref-count to
127 // ensure this child process stays alive long enough to receive all
128 // in-flight messages.
129 ChildProcess::current()->AddRefProcess();
130 DCHECK(message_ports[i] != MSG_ROUTING_NONE);
131 }
132 return message_ports;
133 } 62 }
134 63
135 // static 64 // static
136 WebMessagePortChannelArray WebMessagePortChannelImpl::CreatePorts( 65 WebMessagePortChannelArray
137 const std::vector<int>& message_ports, 66 WebMessagePortChannelImpl::CreateFromMessagePipeHandles(
138 const std::vector<int>& new_routing_ids, 67 std::vector<mojo::ScopedMessagePipeHandle> handles) {
139 const scoped_refptr<base::SingleThreadTaskRunner>& 68 WebMessagePortChannelArray channels(handles.size());
140 main_thread_task_runner) { 69 for (size_t i = 0; i < handles.size(); ++i) {
141 DCHECK_EQ(message_ports.size(), new_routing_ids.size()); 70 channels[i] = base::MakeUnique<WebMessagePortChannelImpl>(
142 WebMessagePortChannelArray channels(message_ports.size()); 71 MessagePort(std::move(handles[i])));
143 for (size_t i = 0; i < message_ports.size() && i < new_routing_ids.size();
144 ++i) {
145 channels[i] = new WebMessagePortChannelImpl(
146 new_routing_ids[i], message_ports[i],
147 main_thread_task_runner);
148 } 72 }
149 return channels; 73 return channels;
150 } 74 }
151 75
152 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { 76 MessagePort WebMessagePortChannelImpl::ReleaseMessagePort() {
153 // Must lock here since client_ is called on the main thread. 77 return MessagePort(port_.ReleaseHandle());
154 base::AutoLock auto_lock(lock_);
155 client_ = client;
156 } 78 }
157 79
158 void WebMessagePortChannelImpl::destroy() { 80 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
159 setClient(NULL); 81 mojo::ScopedMessagePipeHandle handle)
82 : port_(std::move(handle)) {
83 }
160 84
161 // Release the object on the main thread, since the destructor might want to 85 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
162 // send an IPC, and that has to happen on the main thread. 86 if (client) {
163 main_thread_task_runner_->ReleaseSoon(FROM_HERE, this); 87 port_.SetCallback(
88 base::Bind(&WebMessagePortChannelClient::messageAvailable,
89 base::Unretained(client)));
90 } else {
91 port_.ClearCallback();
92 }
164 } 93 }
165 94
166 void WebMessagePortChannelImpl::postMessage( 95 void WebMessagePortChannelImpl::postMessage(
167 const WebString& message, 96 const WebString& encoded_message,
168 WebMessagePortChannelArray* channels_ptr) { 97 WebMessagePortChannelArray channels) {
169 std::unique_ptr<WebMessagePortChannelArray> channels(channels_ptr); 98 std::vector<MessagePort> ports;
170 if (!main_thread_task_runner_->BelongsToCurrentThread()) { 99 if (!channels.isEmpty()) {
171 // Note: we must construct the base::string16 here and pass that. Otherwise, 100 ports.resize(channels.size());
172 // the WebString will be passed, leading to references to the StringImpl 101 for (size_t i = 0; i < channels.size(); ++i) {
173 // from two threads, which is a data race. 102 ports[i] = static_cast<WebMessagePortChannelImpl*>(channels[i].get())->
174 main_thread_task_runner_->PostTask( 103 ReleaseMessagePort();
175 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::SendPostMessage, this, 104 }
176 base::Passed(message.utf16()),
177 base::Passed(std::move(channels))));
178 } else {
179 SendPostMessage(message.utf16(), std::move(channels));
180 } 105 }
181 } 106 port_.PostMessage(encoded_message.utf16(), std::move(ports));
182
183 void WebMessagePortChannelImpl::SendPostMessage(
184 const base::string16& message,
185 std::unique_ptr<WebMessagePortChannelArray> channels) {
186 IPC::Message* msg = new MessagePortHostMsg_PostMessage(
187 message_port_id_, message, ExtractMessagePortIDs(std::move(channels)));
188 Send(msg);
189 } 107 }
190 108
191 bool WebMessagePortChannelImpl::tryGetMessage( 109 bool WebMessagePortChannelImpl::tryGetMessage(
192 WebString* message, 110 WebString* encoded_message,
193 WebMessagePortChannelArray& channels) { 111 WebMessagePortChannelArray& channels) {
194 base::AutoLock auto_lock(lock_); 112 base::string16 buffer;
195 if (message_queue_.empty()) 113 std::vector<MessagePort> ports;
114 if (!port_.GetMessage(&buffer, &ports))
196 return false; 115 return false;
197 116
198 *message = WebString::fromUTF16(message_queue_.front().message); 117 *encoded_message = WebString::fromUTF16(buffer);
199 channels = message_queue_.front().ports; 118
200 message_queue_.pop(); 119 if (!ports.empty()) {
120 channels = WebMessagePortChannelArray(ports.size());
121 for (size_t i = 0; i < ports.size(); ++i)
122 channels[i] = base::MakeUnique<WebMessagePortChannelImpl>(ports[i]);
123 }
201 return true; 124 return true;
202 } 125 }
203 126
204 void WebMessagePortChannelImpl::Init() {
205 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
206 main_thread_task_runner_->PostTask(
207 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
208 return;
209 }
210
211 if (route_id_ == MSG_ROUTING_NONE) {
212 DCHECK(message_port_id_ == MSG_ROUTING_NONE);
213 Send(new MessagePortHostMsg_CreateMessagePort(
214 &route_id_, &message_port_id_));
215 } else if (message_port_id_ != MSG_ROUTING_NONE) {
216 Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_));
217 }
218
219 ChildThreadImpl::current()->GetRouter()->AddRoute(route_id_, this);
220 }
221
222 void WebMessagePortChannelImpl::Entangle(
223 scoped_refptr<WebMessagePortChannelImpl> channel) {
224 // The message port ids might not be set up yet, if this channel wasn't
225 // created on the main thread. So need to wait until we're on the main thread
226 // before getting the other message port id.
227 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
228 main_thread_task_runner_->PostTask(
229 FROM_HERE,
230 base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
231 return;
232 }
233
234 Send(new MessagePortHostMsg_Entangle(
235 message_port_id_, channel->message_port_id()));
236 }
237
238 void WebMessagePortChannelImpl::QueueMessages() {
239 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
240 main_thread_task_runner_->PostTask(
241 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
242 return;
243 }
244 // This message port is being sent elsewhere (perhaps to another process).
245 // The new endpoint needs to receive the queued messages, including ones that
246 // could still be in-flight. So we tell the browser to queue messages, and it
247 // sends us an ack, whose receipt we know means that no more messages are
248 // in-flight. We then send the queued messages to the browser, which prepends
249 // them to the ones it queued and it sends them to the new endpoint.
250 Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
251
252 // The process could potentially go away while we're still waiting for
253 // in-flight messages. Ensure it stays alive.
254 ChildProcess::current()->AddRefProcess();
255 }
256
257 void WebMessagePortChannelImpl::Send(IPC::Message* message) {
258 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
259 DCHECK(!message->is_sync());
260 main_thread_task_runner_->PostTask(
261 FROM_HERE,
262 base::Bind(&WebMessagePortChannelImpl::Send, this, message));
263 return;
264 }
265
266 ChildThreadImpl::current()->GetRouter()->Send(message);
267 }
268
269 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
270 bool handled = true;
271 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
272 IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
273 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
274 IPC_MESSAGE_UNHANDLED(handled = false)
275 IPC_END_MESSAGE_MAP()
276 return handled;
277 }
278
279 void WebMessagePortChannelImpl::OnMessage(
280 const base::string16& message,
281 const std::vector<int>& sent_message_ports,
282 const std::vector<int>& new_routing_ids) {
283 base::AutoLock auto_lock(lock_);
284 Message msg;
285 msg.message = message;
286 msg.ports = CreatePorts(sent_message_ports, new_routing_ids,
287 main_thread_task_runner_.get());
288
289 bool was_empty = message_queue_.empty();
290 message_queue_.push(msg);
291 if (client_ && was_empty)
292 client_->messageAvailable();
293 }
294
295 void WebMessagePortChannelImpl::OnMessagesQueued() {
296 std::vector<QueuedMessage> queued_messages;
297
298 {
299 base::AutoLock auto_lock(lock_);
300 queued_messages.reserve(message_queue_.size());
301 while (!message_queue_.empty()) {
302 base::string16 message = message_queue_.front().message;
303 std::vector<int> ports =
304 ExtractMessagePortIDs(message_queue_.front().ports);
305 queued_messages.push_back(std::make_pair(message, ports));
306 message_queue_.pop();
307 }
308 }
309
310 Send(new MessagePortHostMsg_SendQueuedMessages(
311 message_port_id_, queued_messages));
312
313 message_port_id_ = MSG_ROUTING_NONE;
314
315 Release();
316 ChildProcess::current()->ReleaseProcess();
317 }
318
319 WebMessagePortChannelImpl::Message::Message() {}
320
321 WebMessagePortChannelImpl::Message::Message(const Message& other) = default;
322
323 WebMessagePortChannelImpl::Message::~Message() {}
324
325 } // namespace content 127 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698