OLD | NEW |
| (Empty) |
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "chrome/browser/worker_host/message_port_dispatcher.h" | |
6 | |
7 #include "base/callback.h" | |
8 #include "base/singleton.h" | |
9 #include "chrome/browser/renderer_host/render_message_filter.h" | |
10 #include "chrome/browser/worker_host/worker_process_host.h" | |
11 #include "chrome/common/notification_service.h" | |
12 #include "chrome/common/worker_messages.h" | |
13 | |
14 struct MessagePortDispatcher::MessagePort { | |
15 // sender and route_id are what we need to send messages to the port. | |
16 IPC::Message::Sender* sender; | |
17 int route_id; | |
18 // A function pointer to generate a new route id for the sender above. | |
19 // Owned by "sender" above, so don't delete. | |
20 CallbackWithReturnValue<int>::Type* next_routing_id; | |
21 // A globally unique id for this message port. | |
22 int message_port_id; | |
23 // The globally unique id of the entangled message port. | |
24 int entangled_message_port_id; | |
25 // If true, all messages to this message port are queued and not delivered. | |
26 bool queue_messages; | |
27 QueuedMessages queued_messages; | |
28 }; | |
29 | |
30 MessagePortDispatcher* MessagePortDispatcher::GetInstance() { | |
31 return Singleton<MessagePortDispatcher>::get(); | |
32 } | |
33 | |
34 MessagePortDispatcher::MessagePortDispatcher() | |
35 : next_message_port_id_(0), | |
36 sender_(NULL), | |
37 next_routing_id_(NULL) { | |
38 // Receive a notification if a message filter or WorkerProcessHost is deleted. | |
39 registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN, | |
40 NotificationService::AllSources()); | |
41 | |
42 registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN, | |
43 NotificationService::AllSources()); | |
44 } | |
45 | |
46 MessagePortDispatcher::~MessagePortDispatcher() { | |
47 } | |
48 | |
49 bool MessagePortDispatcher::OnMessageReceived( | |
50 const IPC::Message& message, | |
51 IPC::Message::Sender* sender, | |
52 CallbackWithReturnValue<int>::Type* next_routing_id, | |
53 bool* message_was_ok) { | |
54 sender_ = sender; | |
55 next_routing_id_ = next_routing_id; | |
56 | |
57 bool handled = true; | |
58 *message_was_ok = true; | |
59 | |
60 IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok) | |
61 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate) | |
62 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy) | |
63 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle) | |
64 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage) | |
65 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages) | |
66 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages, | |
67 OnSendQueuedMessages) | |
68 IPC_MESSAGE_UNHANDLED(handled = false) | |
69 IPC_END_MESSAGE_MAP_EX() | |
70 | |
71 sender_ = NULL; | |
72 next_routing_id_ = NULL; | |
73 | |
74 return handled; | |
75 } | |
76 | |
77 void MessagePortDispatcher::UpdateMessagePort( | |
78 int message_port_id, | |
79 IPC::Message::Sender* sender, | |
80 int routing_id, | |
81 CallbackWithReturnValue<int>::Type* next_routing_id) { | |
82 if (!message_ports_.count(message_port_id)) { | |
83 NOTREACHED(); | |
84 return; | |
85 } | |
86 | |
87 MessagePort& port = message_ports_[message_port_id]; | |
88 port.sender = sender; | |
89 port.route_id = routing_id; | |
90 port.next_routing_id = next_routing_id; | |
91 } | |
92 | |
93 bool MessagePortDispatcher::Send(IPC::Message* message) { | |
94 return sender_->Send(message); | |
95 } | |
96 | |
97 void MessagePortDispatcher::OnCreate(int *route_id, | |
98 int* message_port_id) { | |
99 *message_port_id = ++next_message_port_id_; | |
100 *route_id = next_routing_id_->Run(); | |
101 | |
102 MessagePort port; | |
103 port.sender = sender_; | |
104 port.route_id = *route_id; | |
105 port.next_routing_id = next_routing_id_; | |
106 port.message_port_id = *message_port_id; | |
107 port.entangled_message_port_id = MSG_ROUTING_NONE; | |
108 port.queue_messages = false; | |
109 message_ports_[*message_port_id] = port; | |
110 } | |
111 | |
112 void MessagePortDispatcher::OnDestroy(int message_port_id) { | |
113 if (!message_ports_.count(message_port_id)) { | |
114 NOTREACHED(); | |
115 return; | |
116 } | |
117 | |
118 DCHECK(message_ports_[message_port_id].queued_messages.empty()); | |
119 Erase(message_port_id); | |
120 } | |
121 | |
122 void MessagePortDispatcher::OnEntangle(int local_message_port_id, | |
123 int remote_message_port_id) { | |
124 if (!message_ports_.count(local_message_port_id) || | |
125 !message_ports_.count(remote_message_port_id)) { | |
126 NOTREACHED(); | |
127 return; | |
128 } | |
129 | |
130 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == | |
131 MSG_ROUTING_NONE); | |
132 message_ports_[remote_message_port_id].entangled_message_port_id = | |
133 local_message_port_id; | |
134 } | |
135 | |
136 void MessagePortDispatcher::OnPostMessage( | |
137 int sender_message_port_id, | |
138 const string16& message, | |
139 const std::vector<int>& sent_message_port_ids) { | |
140 if (!message_ports_.count(sender_message_port_id)) { | |
141 NOTREACHED(); | |
142 return; | |
143 } | |
144 | |
145 int entangled_message_port_id = | |
146 message_ports_[sender_message_port_id].entangled_message_port_id; | |
147 if (entangled_message_port_id == MSG_ROUTING_NONE) | |
148 return; // Process could have crashed. | |
149 | |
150 if (!message_ports_.count(entangled_message_port_id)) { | |
151 NOTREACHED(); | |
152 return; | |
153 } | |
154 | |
155 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); | |
156 } | |
157 | |
158 void MessagePortDispatcher::PostMessageTo( | |
159 int message_port_id, | |
160 const string16& message, | |
161 const std::vector<int>& sent_message_port_ids) { | |
162 if (!message_ports_.count(message_port_id)) { | |
163 NOTREACHED(); | |
164 return; | |
165 } | |
166 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | |
167 if (!message_ports_.count(sent_message_port_ids[i])) { | |
168 NOTREACHED(); | |
169 return; | |
170 } | |
171 } | |
172 | |
173 MessagePort& entangled_port = message_ports_[message_port_id]; | |
174 | |
175 std::vector<MessagePort*> sent_ports(sent_message_port_ids.size()); | |
176 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | |
177 sent_ports[i] = &message_ports_[sent_message_port_ids[i]]; | |
178 sent_ports[i]->queue_messages = true; | |
179 } | |
180 | |
181 if (entangled_port.queue_messages) { | |
182 entangled_port.queued_messages.push_back( | |
183 std::make_pair(message, sent_message_port_ids)); | |
184 } else { | |
185 // If a message port was sent around, the new location will need a routing | |
186 // id. Instead of having the created port send us a sync message to get it, | |
187 // send along with the message. | |
188 std::vector<int> new_routing_ids(sent_message_port_ids.size()); | |
189 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | |
190 new_routing_ids[i] = entangled_port.next_routing_id->Run(); | |
191 sent_ports[i]->sender = entangled_port.sender; | |
192 | |
193 // Update the entry for the sent port as it can be in a different process. | |
194 sent_ports[i]->route_id = new_routing_ids[i]; | |
195 } | |
196 | |
197 if (entangled_port.sender) { | |
198 // Now send the message to the entangled port. | |
199 IPC::Message* ipc_msg = new WorkerProcessMsg_Message( | |
200 entangled_port.route_id, message, sent_message_port_ids, | |
201 new_routing_ids); | |
202 entangled_port.sender->Send(ipc_msg); | |
203 } | |
204 } | |
205 } | |
206 | |
207 void MessagePortDispatcher::OnQueueMessages(int message_port_id) { | |
208 if (!message_ports_.count(message_port_id)) { | |
209 NOTREACHED(); | |
210 return; | |
211 } | |
212 | |
213 MessagePort& port = message_ports_[message_port_id]; | |
214 if (port.sender) { | |
215 port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); | |
216 port.queue_messages = true; | |
217 port.sender = NULL; | |
218 } | |
219 } | |
220 | |
221 void MessagePortDispatcher::OnSendQueuedMessages( | |
222 int message_port_id, | |
223 const QueuedMessages& queued_messages) { | |
224 if (!message_ports_.count(message_port_id)) { | |
225 NOTREACHED(); | |
226 return; | |
227 } | |
228 | |
229 // Send the queued messages to the port again. This time they'll reach the | |
230 // new location. | |
231 MessagePort& port = message_ports_[message_port_id]; | |
232 port.queue_messages = false; | |
233 port.queued_messages.insert(port.queued_messages.begin(), | |
234 queued_messages.begin(), | |
235 queued_messages.end()); | |
236 SendQueuedMessagesIfPossible(message_port_id); | |
237 } | |
238 | |
239 void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) { | |
240 if (!message_ports_.count(message_port_id)) { | |
241 NOTREACHED(); | |
242 return; | |
243 } | |
244 | |
245 MessagePort& port = message_ports_[message_port_id]; | |
246 if (port.queue_messages || !port.sender) | |
247 return; | |
248 | |
249 for (QueuedMessages::iterator iter = port.queued_messages.begin(); | |
250 iter != port.queued_messages.end(); ++iter) { | |
251 PostMessageTo(message_port_id, iter->first, iter->second); | |
252 } | |
253 port.queued_messages.clear(); | |
254 } | |
255 | |
256 void MessagePortDispatcher::Observe(NotificationType type, | |
257 const NotificationSource& source, | |
258 const NotificationDetails& details) { | |
259 IPC::Message::Sender* sender = NULL; | |
260 if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { | |
261 sender = Source<RenderMessageFilter>(source).ptr(); | |
262 } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { | |
263 sender = Source<WorkerProcessHost>(source).ptr(); | |
264 } else { | |
265 NOTREACHED(); | |
266 } | |
267 | |
268 // Check if the (possibly) crashed process had any message ports. | |
269 for (MessagePorts::iterator iter = message_ports_.begin(); | |
270 iter != message_ports_.end();) { | |
271 MessagePorts::iterator cur_item = iter++; | |
272 if (cur_item->second.sender == sender) { | |
273 Erase(cur_item->first); | |
274 } | |
275 } | |
276 } | |
277 | |
278 void MessagePortDispatcher::Erase(int message_port_id) { | |
279 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); | |
280 DCHECK(erase_item != message_ports_.end()); | |
281 | |
282 int entangled_id = erase_item->second.entangled_message_port_id; | |
283 if (entangled_id != MSG_ROUTING_NONE) { | |
284 // Do the disentanglement (and be paranoid about the other side existing | |
285 // just in case something unusual happened during entanglement). | |
286 if (message_ports_.count(entangled_id)) { | |
287 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; | |
288 } | |
289 } | |
290 message_ports_.erase(erase_item); | |
291 } | |
OLD | NEW |