OLD | NEW |
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "content/child/webmessageportchannel_impl.h" | 5 #include "content/child/webmessageportchannel_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <utility> | |
9 | 8 |
10 #include "base/bind.h" | 9 #include "base/bind.h" |
11 #include "content/child/child_process.h" | 10 #include "base/logging.h" |
12 #include "content/child/child_thread_impl.h" | 11 #include "base/memory/ptr_util.h" |
13 #include "content/common/message_port_messages.h" | |
14 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" | 12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" |
15 #include "third_party/WebKit/public/platform/WebString.h" | 13 #include "third_party/WebKit/public/platform/WebString.h" |
16 #include "third_party/WebKit/public/web/WebSerializedScriptValue.h" | |
17 #include "v8/include/v8.h" | |
18 | 14 |
19 using blink::WebMessagePortChannel; | 15 using blink::WebMessagePortChannel; |
20 using blink::WebMessagePortChannelArray; | 16 using blink::WebMessagePortChannelArray; |
21 using blink::WebMessagePortChannelClient; | 17 using blink::WebMessagePortChannelClient; |
22 using blink::WebString; | 18 using blink::WebString; |
23 | 19 |
24 namespace content { | 20 namespace content { |
25 | 21 |
26 WebMessagePortChannelImpl::WebMessagePortChannelImpl( | 22 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { |
27 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner) | 23 setClient(nullptr); |
28 : client_(NULL), | |
29 route_id_(MSG_ROUTING_NONE), | |
30 message_port_id_(MSG_ROUTING_NONE), | |
31 main_thread_task_runner_(main_thread_task_runner) { | |
32 AddRef(); | |
33 Init(); | |
34 } | 24 } |
35 | 25 |
36 WebMessagePortChannelImpl::WebMessagePortChannelImpl( | 26 WebMessagePortChannelImpl::WebMessagePortChannelImpl( |
37 int route_id, | 27 MessagePort message_port) |
38 int port_id, | 28 : port_(message_port.ReleaseHandle()) { |
39 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner) | |
40 : client_(NULL), | |
41 route_id_(route_id), | |
42 message_port_id_(port_id), | |
43 main_thread_task_runner_(main_thread_task_runner) { | |
44 AddRef(); | |
45 Init(); | |
46 } | |
47 | |
48 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { | |
49 // If we have any queued messages with attached ports, manually destroy them. | |
50 while (!message_queue_.empty()) { | |
51 const WebMessagePortChannelArray& channel_array = | |
52 message_queue_.front().ports; | |
53 for (size_t i = 0; i < channel_array.size(); i++) { | |
54 channel_array[i]->destroy(); | |
55 } | |
56 message_queue_.pop(); | |
57 } | |
58 | |
59 if (message_port_id_ != MSG_ROUTING_NONE) | |
60 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_)); | |
61 | |
62 if (route_id_ != MSG_ROUTING_NONE) | |
63 ChildThreadImpl::current()->GetRouter()->RemoveRoute(route_id_); | |
64 } | 29 } |
65 | 30 |
66 // static | 31 // static |
67 void WebMessagePortChannelImpl::CreatePair( | 32 void WebMessagePortChannelImpl::CreatePair( |
68 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner, | |
69 blink::WebMessagePortChannel** channel1, | 33 blink::WebMessagePortChannel** channel1, |
70 blink::WebMessagePortChannel** channel2) { | 34 blink::WebMessagePortChannel** channel2) { |
71 WebMessagePortChannelImpl* impl1 = | 35 mojo::MessagePipe pipe; |
72 new WebMessagePortChannelImpl(main_thread_task_runner); | 36 *channel1 = new WebMessagePortChannelImpl(std::move(pipe.handle0)); |
73 WebMessagePortChannelImpl* impl2 = | 37 *channel2 = new WebMessagePortChannelImpl(std::move(pipe.handle1)); |
74 new WebMessagePortChannelImpl(main_thread_task_runner); | |
75 | |
76 impl1->Entangle(impl2); | |
77 impl2->Entangle(impl1); | |
78 | |
79 *channel1 = impl1; | |
80 *channel2 = impl2; | |
81 } | 38 } |
82 | 39 |
83 // static | 40 // static |
84 std::vector<int> | 41 std::vector<MessagePort> |
85 WebMessagePortChannelImpl::ExtractMessagePortIDs( | 42 WebMessagePortChannelImpl::ExtractMessagePorts( |
86 std::unique_ptr<WebMessagePortChannelArray> channels) { | 43 WebMessagePortChannelArray channels) { |
87 std::vector<int> message_ports; | 44 std::vector<MessagePort> message_ports(channels.size()); |
88 if (channels) | |
89 message_ports = ExtractMessagePortIDs(*channels); | |
90 return message_ports; | |
91 } | |
92 | |
93 // static | |
94 std::vector<int> | |
95 WebMessagePortChannelImpl::ExtractMessagePortIDs( | |
96 const WebMessagePortChannelArray& channels) { | |
97 std::vector<int> message_ports(channels.size()); | |
98 for (size_t i = 0; i < channels.size(); ++i) { | 45 for (size_t i = 0; i < channels.size(); ++i) { |
99 WebMessagePortChannelImpl* webchannel = | 46 WebMessagePortChannelImpl* channel_impl = |
100 static_cast<WebMessagePortChannelImpl*>(channels[i]); | 47 static_cast<WebMessagePortChannelImpl*>(channels[i].get()); |
101 // The message port ids might not be set up yet if this channel | 48 message_ports[i] = channel_impl->ReleaseMessagePort(); |
102 // wasn't created on the main thread. | 49 DCHECK(message_ports[i].GetHandle().is_valid()); |
103 DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread()); | |
104 message_ports[i] = webchannel->message_port_id(); | |
105 webchannel->QueueMessages(); | |
106 DCHECK(message_ports[i] != MSG_ROUTING_NONE); | |
107 } | 50 } |
108 return message_ports; | 51 return message_ports; |
109 } | 52 } |
110 | 53 |
111 // static | 54 // static |
112 std::vector<int> | 55 WebMessagePortChannelArray |
113 WebMessagePortChannelImpl::ExtractMessagePortIDsWithoutQueueing( | 56 WebMessagePortChannelImpl::CreateFromMessagePorts( |
114 std::unique_ptr<WebMessagePortChannelArray> channels) { | 57 const std::vector<MessagePort>& message_ports) { |
115 if (!channels) | 58 WebMessagePortChannelArray channels(message_ports.size()); |
116 return std::vector<int>(); | 59 for (size_t i = 0; i < message_ports.size(); ++i) |
117 | 60 channels[i] = base::MakeUnique<WebMessagePortChannelImpl>(message_ports[i]); |
118 std::vector<int> message_ports(channels->size()); | 61 return channels; |
119 for (size_t i = 0; i < channels->size(); ++i) { | |
120 WebMessagePortChannelImpl* webchannel = | |
121 static_cast<WebMessagePortChannelImpl*>((*channels)[i]); | |
122 // The message port ids might not be set up yet if this channel | |
123 // wasn't created on the main thread. | |
124 DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread()); | |
125 message_ports[i] = webchannel->message_port_id(); | |
126 // Don't queue messages, but do increase the child processes ref-count to | |
127 // ensure this child process stays alive long enough to receive all | |
128 // in-flight messages. | |
129 ChildProcess::current()->AddRefProcess(); | |
130 DCHECK(message_ports[i] != MSG_ROUTING_NONE); | |
131 } | |
132 return message_ports; | |
133 } | 62 } |
134 | 63 |
135 // static | 64 // static |
136 WebMessagePortChannelArray WebMessagePortChannelImpl::CreatePorts( | 65 WebMessagePortChannelArray |
137 const std::vector<int>& message_ports, | 66 WebMessagePortChannelImpl::CreateFromMessagePipeHandles( |
138 const std::vector<int>& new_routing_ids, | 67 std::vector<mojo::ScopedMessagePipeHandle> handles) { |
139 const scoped_refptr<base::SingleThreadTaskRunner>& | 68 WebMessagePortChannelArray channels(handles.size()); |
140 main_thread_task_runner) { | 69 for (size_t i = 0; i < handles.size(); ++i) { |
141 DCHECK_EQ(message_ports.size(), new_routing_ids.size()); | 70 channels[i] = base::MakeUnique<WebMessagePortChannelImpl>( |
142 WebMessagePortChannelArray channels(message_ports.size()); | 71 MessagePort(std::move(handles[i]))); |
143 for (size_t i = 0; i < message_ports.size() && i < new_routing_ids.size(); | |
144 ++i) { | |
145 channels[i] = new WebMessagePortChannelImpl( | |
146 new_routing_ids[i], message_ports[i], | |
147 main_thread_task_runner); | |
148 } | 72 } |
149 return channels; | 73 return channels; |
150 } | 74 } |
151 | 75 |
152 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { | 76 MessagePort WebMessagePortChannelImpl::ReleaseMessagePort() { |
153 // Must lock here since client_ is called on the main thread. | 77 return MessagePort(port_.ReleaseHandle()); |
154 base::AutoLock auto_lock(lock_); | |
155 client_ = client; | |
156 } | 78 } |
157 | 79 |
158 void WebMessagePortChannelImpl::destroy() { | 80 WebMessagePortChannelImpl::WebMessagePortChannelImpl( |
159 setClient(NULL); | 81 mojo::ScopedMessagePipeHandle handle) |
| 82 : port_(std::move(handle)) { |
| 83 } |
160 | 84 |
161 // Release the object on the main thread, since the destructor might want to | 85 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { |
162 // send an IPC, and that has to happen on the main thread. | 86 if (client) { |
163 main_thread_task_runner_->ReleaseSoon(FROM_HERE, this); | 87 port_.SetCallback( |
| 88 base::Bind(&WebMessagePortChannelClient::messageAvailable, |
| 89 base::Unretained(client))); |
| 90 } else { |
| 91 port_.ClearCallback(); |
| 92 } |
164 } | 93 } |
165 | 94 |
166 void WebMessagePortChannelImpl::postMessage( | 95 void WebMessagePortChannelImpl::postMessage( |
167 const WebString& message, | 96 const WebString& encoded_message, |
168 WebMessagePortChannelArray* channels_ptr) { | 97 WebMessagePortChannelArray channels) { |
169 std::unique_ptr<WebMessagePortChannelArray> channels(channels_ptr); | 98 std::vector<MessagePort> ports; |
170 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | 99 if (!channels.isEmpty()) { |
171 // Note: we must construct the base::string16 here and pass that. Otherwise, | 100 ports.resize(channels.size()); |
172 // the WebString will be passed, leading to references to the StringImpl | 101 for (size_t i = 0; i < channels.size(); ++i) { |
173 // from two threads, which is a data race. | 102 ports[i] = static_cast<WebMessagePortChannelImpl*>(channels[i].get())-> |
174 main_thread_task_runner_->PostTask( | 103 ReleaseMessagePort(); |
175 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::SendPostMessage, this, | 104 } |
176 base::Passed(message.utf16()), | |
177 base::Passed(std::move(channels)))); | |
178 } else { | |
179 SendPostMessage(message.utf16(), std::move(channels)); | |
180 } | 105 } |
181 } | 106 port_.PostMessage(encoded_message.utf16(), std::move(ports)); |
182 | |
183 void WebMessagePortChannelImpl::SendPostMessage( | |
184 const base::string16& message, | |
185 std::unique_ptr<WebMessagePortChannelArray> channels) { | |
186 IPC::Message* msg = new MessagePortHostMsg_PostMessage( | |
187 message_port_id_, message, ExtractMessagePortIDs(std::move(channels))); | |
188 Send(msg); | |
189 } | 107 } |
190 | 108 |
191 bool WebMessagePortChannelImpl::tryGetMessage( | 109 bool WebMessagePortChannelImpl::tryGetMessage( |
192 WebString* message, | 110 WebString* encoded_message, |
193 WebMessagePortChannelArray& channels) { | 111 WebMessagePortChannelArray& channels) { |
194 base::AutoLock auto_lock(lock_); | 112 base::string16 buffer; |
195 if (message_queue_.empty()) | 113 std::vector<MessagePort> ports; |
| 114 if (!port_.GetMessage(&buffer, &ports)) |
196 return false; | 115 return false; |
197 | 116 |
198 *message = WebString::fromUTF16(message_queue_.front().message); | 117 *encoded_message = WebString::fromUTF16(buffer); |
199 channels = message_queue_.front().ports; | 118 |
200 message_queue_.pop(); | 119 if (!ports.empty()) { |
| 120 channels = WebMessagePortChannelArray(ports.size()); |
| 121 for (size_t i = 0; i < ports.size(); ++i) |
| 122 channels[i] = base::MakeUnique<WebMessagePortChannelImpl>(ports[i]); |
| 123 } |
201 return true; | 124 return true; |
202 } | 125 } |
203 | 126 |
204 void WebMessagePortChannelImpl::Init() { | |
205 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | |
206 main_thread_task_runner_->PostTask( | |
207 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this)); | |
208 return; | |
209 } | |
210 | |
211 if (route_id_ == MSG_ROUTING_NONE) { | |
212 DCHECK(message_port_id_ == MSG_ROUTING_NONE); | |
213 Send(new MessagePortHostMsg_CreateMessagePort( | |
214 &route_id_, &message_port_id_)); | |
215 } else if (message_port_id_ != MSG_ROUTING_NONE) { | |
216 Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_)); | |
217 } | |
218 | |
219 ChildThreadImpl::current()->GetRouter()->AddRoute(route_id_, this); | |
220 } | |
221 | |
222 void WebMessagePortChannelImpl::Entangle( | |
223 scoped_refptr<WebMessagePortChannelImpl> channel) { | |
224 // The message port ids might not be set up yet, if this channel wasn't | |
225 // created on the main thread. So need to wait until we're on the main thread | |
226 // before getting the other message port id. | |
227 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | |
228 main_thread_task_runner_->PostTask( | |
229 FROM_HERE, | |
230 base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel)); | |
231 return; | |
232 } | |
233 | |
234 Send(new MessagePortHostMsg_Entangle( | |
235 message_port_id_, channel->message_port_id())); | |
236 } | |
237 | |
238 void WebMessagePortChannelImpl::QueueMessages() { | |
239 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | |
240 main_thread_task_runner_->PostTask( | |
241 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this)); | |
242 return; | |
243 } | |
244 // This message port is being sent elsewhere (perhaps to another process). | |
245 // The new endpoint needs to receive the queued messages, including ones that | |
246 // could still be in-flight. So we tell the browser to queue messages, and it | |
247 // sends us an ack, whose receipt we know means that no more messages are | |
248 // in-flight. We then send the queued messages to the browser, which prepends | |
249 // them to the ones it queued and it sends them to the new endpoint. | |
250 Send(new MessagePortHostMsg_QueueMessages(message_port_id_)); | |
251 | |
252 // The process could potentially go away while we're still waiting for | |
253 // in-flight messages. Ensure it stays alive. | |
254 ChildProcess::current()->AddRefProcess(); | |
255 } | |
256 | |
257 void WebMessagePortChannelImpl::Send(IPC::Message* message) { | |
258 if (!main_thread_task_runner_->BelongsToCurrentThread()) { | |
259 DCHECK(!message->is_sync()); | |
260 main_thread_task_runner_->PostTask( | |
261 FROM_HERE, | |
262 base::Bind(&WebMessagePortChannelImpl::Send, this, message)); | |
263 return; | |
264 } | |
265 | |
266 ChildThreadImpl::current()->GetRouter()->Send(message); | |
267 } | |
268 | |
269 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) { | |
270 bool handled = true; | |
271 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message) | |
272 IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage) | |
273 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued) | |
274 IPC_MESSAGE_UNHANDLED(handled = false) | |
275 IPC_END_MESSAGE_MAP() | |
276 return handled; | |
277 } | |
278 | |
279 void WebMessagePortChannelImpl::OnMessage( | |
280 const base::string16& message, | |
281 const std::vector<int>& sent_message_ports, | |
282 const std::vector<int>& new_routing_ids) { | |
283 base::AutoLock auto_lock(lock_); | |
284 Message msg; | |
285 msg.message = message; | |
286 msg.ports = CreatePorts(sent_message_ports, new_routing_ids, | |
287 main_thread_task_runner_.get()); | |
288 | |
289 bool was_empty = message_queue_.empty(); | |
290 message_queue_.push(msg); | |
291 if (client_ && was_empty) | |
292 client_->messageAvailable(); | |
293 } | |
294 | |
295 void WebMessagePortChannelImpl::OnMessagesQueued() { | |
296 std::vector<QueuedMessage> queued_messages; | |
297 | |
298 { | |
299 base::AutoLock auto_lock(lock_); | |
300 queued_messages.reserve(message_queue_.size()); | |
301 while (!message_queue_.empty()) { | |
302 base::string16 message = message_queue_.front().message; | |
303 std::vector<int> ports = | |
304 ExtractMessagePortIDs(message_queue_.front().ports); | |
305 queued_messages.push_back(std::make_pair(message, ports)); | |
306 message_queue_.pop(); | |
307 } | |
308 } | |
309 | |
310 Send(new MessagePortHostMsg_SendQueuedMessages( | |
311 message_port_id_, queued_messages)); | |
312 | |
313 message_port_id_ = MSG_ROUTING_NONE; | |
314 | |
315 Release(); | |
316 ChildProcess::current()->ReleaseProcess(); | |
317 } | |
318 | |
319 WebMessagePortChannelImpl::Message::Message() {} | |
320 | |
321 WebMessagePortChannelImpl::Message::Message(const Message& other) = default; | |
322 | |
323 WebMessagePortChannelImpl::Message::~Message() {} | |
324 | |
325 } // namespace content | 127 } // namespace content |
OLD | NEW |