OLD | NEW |
---|---|
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "chrome/browser/worker_host/message_port_dispatcher.h" | 5 #include "chrome/browser/worker_host/message_port_service.h" |
6 | 6 |
7 #include "base/callback.h" | 7 #include "chrome/browser/worker_host/worker_message_filter.h" |
8 #include "base/singleton.h" | |
9 #include "chrome/browser/renderer_host/render_message_filter.h" | |
10 #include "chrome/browser/worker_host/worker_process_host.h" | |
11 #include "chrome/common/notification_service.h" | |
12 #include "chrome/common/worker_messages.h" | 8 #include "chrome/common/worker_messages.h" |
13 | 9 |
14 struct MessagePortDispatcher::MessagePort { | 10 struct MessagePortService::MessagePort { |
15 // sender and route_id are what we need to send messages to the port. | 11 // |filter| and |route_id| are what we need to send messages to the port. |
16 IPC::Message::Sender* sender; | 12 // |filter| is just a weak pointer since we get notified when its process has |
13 // gone away and remove it. | |
14 WorkerMessageFilter* filter; | |
17 int route_id; | 15 int route_id; |
18 // A function pointer to generate a new route id for the sender above. | |
19 // Owned by "sender" above, so don't delete. | |
20 CallbackWithReturnValue<int>::Type* next_routing_id; | |
21 // A globally unique id for this message port. | 16 // A globally unique id for this message port. |
22 int message_port_id; | 17 int message_port_id; |
23 // The globally unique id of the entangled message port. | 18 // The globally unique id of the entangled message port. |
24 int entangled_message_port_id; | 19 int entangled_message_port_id; |
25 // If true, all messages to this message port are queued and not delivered. | 20 // If true, all messages to this message port are queued and not delivered. |
26 bool queue_messages; | 21 bool queue_messages; |
27 QueuedMessages queued_messages; | 22 QueuedMessages queued_messages; |
28 }; | 23 }; |
29 | 24 |
30 MessagePortDispatcher* MessagePortDispatcher::GetInstance() { | 25 MessagePortService* MessagePortService::GetInstance() { |
31 return Singleton<MessagePortDispatcher>::get(); | 26 return Singleton<MessagePortService>::get(); |
32 } | 27 } |
33 | 28 |
34 MessagePortDispatcher::MessagePortDispatcher() | 29 MessagePortService::MessagePortService() |
35 : next_message_port_id_(0), | 30 : next_message_port_id_(0) { |
36 sender_(NULL), | |
37 next_routing_id_(NULL) { | |
38 // Receive a notification if a message filter or WorkerProcessHost is deleted. | |
39 registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN, | |
40 NotificationService::AllSources()); | |
41 | |
42 registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN, | |
43 NotificationService::AllSources()); | |
44 } | 31 } |
45 | 32 |
46 MessagePortDispatcher::~MessagePortDispatcher() { | 33 MessagePortService::~MessagePortService() { |
47 } | 34 } |
48 | 35 |
49 bool MessagePortDispatcher::OnMessageReceived( | 36 void MessagePortService::UpdateMessagePort( |
50 const IPC::Message& message, | |
51 IPC::Message::Sender* sender, | |
52 CallbackWithReturnValue<int>::Type* next_routing_id, | |
53 bool* message_was_ok) { | |
54 sender_ = sender; | |
55 next_routing_id_ = next_routing_id; | |
56 | |
57 bool handled = true; | |
58 *message_was_ok = true; | |
59 | |
60 IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok) | |
61 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate) | |
62 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy) | |
63 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle) | |
64 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage) | |
65 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages) | |
66 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages, | |
67 OnSendQueuedMessages) | |
68 IPC_MESSAGE_UNHANDLED(handled = false) | |
69 IPC_END_MESSAGE_MAP_EX() | |
70 | |
71 sender_ = NULL; | |
72 next_routing_id_ = NULL; | |
73 | |
74 return handled; | |
75 } | |
76 | |
77 void MessagePortDispatcher::UpdateMessagePort( | |
78 int message_port_id, | 37 int message_port_id, |
79 IPC::Message::Sender* sender, | 38 WorkerMessageFilter* filter, |
Andrew T Wilson (Slow)
2010/12/21 03:12:21
Remove extra space at front
jam
2010/12/21 07:41:51
Done.
| |
80 int routing_id, | 39 int routing_id) { |
81 CallbackWithReturnValue<int>::Type* next_routing_id) { | |
82 if (!message_ports_.count(message_port_id)) { | 40 if (!message_ports_.count(message_port_id)) { |
83 NOTREACHED(); | 41 NOTREACHED(); |
84 return; | 42 return; |
85 } | 43 } |
86 | 44 |
87 MessagePort& port = message_ports_[message_port_id]; | 45 MessagePort& port = message_ports_[message_port_id]; |
88 port.sender = sender; | 46 port.filter = filter; |
89 port.route_id = routing_id; | 47 port.route_id = routing_id; |
90 port.next_routing_id = next_routing_id; | |
91 } | 48 } |
92 | 49 |
93 bool MessagePortDispatcher::Send(IPC::Message* message) { | 50 void MessagePortService::OnWorkerMessageFilterClosing( |
94 return sender_->Send(message); | 51 WorkerMessageFilter* filter) { |
52 // Check if the (possibly) crashed process had any message ports. | |
53 for (MessagePorts::iterator iter = message_ports_.begin(); | |
54 iter != message_ports_.end();) { | |
55 MessagePorts::iterator cur_item = iter++; | |
56 if (cur_item->second.filter == filter) { | |
57 Erase(cur_item->first); | |
58 } | |
59 } | |
95 } | 60 } |
96 | 61 |
97 void MessagePortDispatcher::OnCreate(int *route_id, | 62 void MessagePortService::Create(int route_id, |
98 int* message_port_id) { | 63 WorkerMessageFilter* filter, |
64 int* message_port_id) { | |
99 *message_port_id = ++next_message_port_id_; | 65 *message_port_id = ++next_message_port_id_; |
100 *route_id = next_routing_id_->Run(); | |
101 | 66 |
102 MessagePort port; | 67 MessagePort port; |
103 port.sender = sender_; | 68 port.filter = filter; |
104 port.route_id = *route_id; | 69 port.route_id = route_id; |
105 port.next_routing_id = next_routing_id_; | |
106 port.message_port_id = *message_port_id; | 70 port.message_port_id = *message_port_id; |
107 port.entangled_message_port_id = MSG_ROUTING_NONE; | 71 port.entangled_message_port_id = MSG_ROUTING_NONE; |
108 port.queue_messages = false; | 72 port.queue_messages = false; |
109 message_ports_[*message_port_id] = port; | 73 message_ports_[*message_port_id] = port; |
110 } | 74 } |
111 | 75 |
112 void MessagePortDispatcher::OnDestroy(int message_port_id) { | 76 void MessagePortService::Destroy(int message_port_id) { |
113 if (!message_ports_.count(message_port_id)) { | 77 if (!message_ports_.count(message_port_id)) { |
114 NOTREACHED(); | 78 NOTREACHED(); |
115 return; | 79 return; |
116 } | 80 } |
117 | 81 |
118 DCHECK(message_ports_[message_port_id].queued_messages.empty()); | 82 DCHECK(message_ports_[message_port_id].queued_messages.empty()); |
119 Erase(message_port_id); | 83 Erase(message_port_id); |
120 } | 84 } |
121 | 85 |
122 void MessagePortDispatcher::OnEntangle(int local_message_port_id, | 86 void MessagePortService::Entangle(int local_message_port_id, |
123 int remote_message_port_id) { | 87 int remote_message_port_id) { |
124 if (!message_ports_.count(local_message_port_id) || | 88 if (!message_ports_.count(local_message_port_id) || |
125 !message_ports_.count(remote_message_port_id)) { | 89 !message_ports_.count(remote_message_port_id)) { |
126 NOTREACHED(); | 90 NOTREACHED(); |
127 return; | 91 return; |
128 } | 92 } |
129 | 93 |
130 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == | 94 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == |
131 MSG_ROUTING_NONE); | 95 MSG_ROUTING_NONE); |
132 message_ports_[remote_message_port_id].entangled_message_port_id = | 96 message_ports_[remote_message_port_id].entangled_message_port_id = |
133 local_message_port_id; | 97 local_message_port_id; |
134 } | 98 } |
135 | 99 |
136 void MessagePortDispatcher::OnPostMessage( | 100 void MessagePortService::PostMessage( |
137 int sender_message_port_id, | 101 int sender_message_port_id, |
138 const string16& message, | 102 const string16& message, |
139 const std::vector<int>& sent_message_port_ids) { | 103 const std::vector<int>& sent_message_port_ids) { |
140 if (!message_ports_.count(sender_message_port_id)) { | 104 if (!message_ports_.count(sender_message_port_id)) { |
141 NOTREACHED(); | 105 NOTREACHED(); |
142 return; | 106 return; |
143 } | 107 } |
144 | 108 |
145 int entangled_message_port_id = | 109 int entangled_message_port_id = |
146 message_ports_[sender_message_port_id].entangled_message_port_id; | 110 message_ports_[sender_message_port_id].entangled_message_port_id; |
147 if (entangled_message_port_id == MSG_ROUTING_NONE) | 111 if (entangled_message_port_id == MSG_ROUTING_NONE) |
148 return; // Process could have crashed. | 112 return; // Process could have crashed. |
149 | 113 |
150 if (!message_ports_.count(entangled_message_port_id)) { | 114 if (!message_ports_.count(entangled_message_port_id)) { |
151 NOTREACHED(); | 115 NOTREACHED(); |
152 return; | 116 return; |
153 } | 117 } |
154 | 118 |
155 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); | 119 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); |
156 } | 120 } |
157 | 121 |
158 void MessagePortDispatcher::PostMessageTo( | 122 void MessagePortService::PostMessageTo( |
159 int message_port_id, | 123 int message_port_id, |
160 const string16& message, | 124 const string16& message, |
161 const std::vector<int>& sent_message_port_ids) { | 125 const std::vector<int>& sent_message_port_ids) { |
162 if (!message_ports_.count(message_port_id)) { | 126 if (!message_ports_.count(message_port_id)) { |
163 NOTREACHED(); | 127 NOTREACHED(); |
164 return; | 128 return; |
165 } | 129 } |
166 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 130 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { |
167 if (!message_ports_.count(sent_message_port_ids[i])) { | 131 if (!message_ports_.count(sent_message_port_ids[i])) { |
168 NOTREACHED(); | 132 NOTREACHED(); |
(...skipping 11 matching lines...) Expand all Loading... | |
180 | 144 |
181 if (entangled_port.queue_messages) { | 145 if (entangled_port.queue_messages) { |
182 entangled_port.queued_messages.push_back( | 146 entangled_port.queued_messages.push_back( |
183 std::make_pair(message, sent_message_port_ids)); | 147 std::make_pair(message, sent_message_port_ids)); |
184 } else { | 148 } else { |
185 // If a message port was sent around, the new location will need a routing | 149 // If a message port was sent around, the new location will need a routing |
186 // id. Instead of having the created port send us a sync message to get it, | 150 // id. Instead of having the created port send us a sync message to get it, |
187 // send along with the message. | 151 // send along with the message. |
188 std::vector<int> new_routing_ids(sent_message_port_ids.size()); | 152 std::vector<int> new_routing_ids(sent_message_port_ids.size()); |
189 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 153 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { |
190 new_routing_ids[i] = entangled_port.next_routing_id->Run(); | 154 new_routing_ids[i] = entangled_port.filter->GetNextRoutingID(); |
191 sent_ports[i]->sender = entangled_port.sender; | 155 sent_ports[i]->filter = entangled_port.filter; |
192 | 156 |
193 // Update the entry for the sent port as it can be in a different process. | 157 // Update the entry for the sent port as it can be in a different process. |
194 sent_ports[i]->route_id = new_routing_ids[i]; | 158 sent_ports[i]->route_id = new_routing_ids[i]; |
195 } | 159 } |
196 | 160 |
197 if (entangled_port.sender) { | 161 if (entangled_port.filter) { |
Andrew T Wilson (Slow)
2010/12/21 03:12:21
If entangled_port.filter is null here, then we wou
jam
2010/12/21 07:41:51
good point, made the "else" be an "else if" and ad
| |
198 // Now send the message to the entangled port. | 162 // Now send the message to the entangled port. |
199 IPC::Message* ipc_msg = new WorkerProcessMsg_Message( | 163 entangled_port.filter->Send(new WorkerProcessMsg_Message( |
200 entangled_port.route_id, message, sent_message_port_ids, | 164 entangled_port.route_id, message, sent_message_port_ids, |
201 new_routing_ids); | 165 new_routing_ids)); |
202 entangled_port.sender->Send(ipc_msg); | |
203 } | 166 } |
204 } | 167 } |
205 } | 168 } |
206 | 169 |
207 void MessagePortDispatcher::OnQueueMessages(int message_port_id) { | 170 void MessagePortService::QueueMessages(int message_port_id) { |
208 if (!message_ports_.count(message_port_id)) { | 171 if (!message_ports_.count(message_port_id)) { |
209 NOTREACHED(); | 172 NOTREACHED(); |
210 return; | 173 return; |
211 } | 174 } |
212 | 175 |
213 MessagePort& port = message_ports_[message_port_id]; | 176 MessagePort& port = message_ports_[message_port_id]; |
214 if (port.sender) { | 177 if (port.filter) { |
215 port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); | 178 port.filter->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); |
216 port.queue_messages = true; | 179 port.queue_messages = true; |
217 port.sender = NULL; | 180 port.filter = NULL; |
Andrew T Wilson (Slow)
2010/12/21 03:12:21
Coming back to this code after a several-month hia
jam
2010/12/21 07:41:51
I'd love to add some good comments, but I haven't
| |
218 } | 181 } |
219 } | 182 } |
220 | 183 |
221 void MessagePortDispatcher::OnSendQueuedMessages( | 184 void MessagePortService::SendQueuedMessages( |
222 int message_port_id, | 185 int message_port_id, |
223 const QueuedMessages& queued_messages) { | 186 const QueuedMessages& queued_messages) { |
224 if (!message_ports_.count(message_port_id)) { | 187 if (!message_ports_.count(message_port_id)) { |
225 NOTREACHED(); | 188 NOTREACHED(); |
226 return; | 189 return; |
227 } | 190 } |
228 | 191 |
229 // Send the queued messages to the port again. This time they'll reach the | 192 // Send the queued messages to the port again. This time they'll reach the |
230 // new location. | 193 // new location. |
231 MessagePort& port = message_ports_[message_port_id]; | 194 MessagePort& port = message_ports_[message_port_id]; |
232 port.queue_messages = false; | 195 port.queue_messages = false; |
233 port.queued_messages.insert(port.queued_messages.begin(), | 196 port.queued_messages.insert(port.queued_messages.begin(), |
234 queued_messages.begin(), | 197 queued_messages.begin(), |
235 queued_messages.end()); | 198 queued_messages.end()); |
236 SendQueuedMessagesIfPossible(message_port_id); | 199 SendQueuedMessagesIfPossible(message_port_id); |
237 } | 200 } |
238 | 201 |
239 void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) { | 202 void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) { |
240 if (!message_ports_.count(message_port_id)) { | 203 if (!message_ports_.count(message_port_id)) { |
241 NOTREACHED(); | 204 NOTREACHED(); |
242 return; | 205 return; |
243 } | 206 } |
244 | 207 |
245 MessagePort& port = message_ports_[message_port_id]; | 208 MessagePort& port = message_ports_[message_port_id]; |
246 if (port.queue_messages || !port.sender) | 209 if (port.queue_messages || !port.filter) |
247 return; | 210 return; |
248 | 211 |
249 for (QueuedMessages::iterator iter = port.queued_messages.begin(); | 212 for (QueuedMessages::iterator iter = port.queued_messages.begin(); |
250 iter != port.queued_messages.end(); ++iter) { | 213 iter != port.queued_messages.end(); ++iter) { |
251 PostMessageTo(message_port_id, iter->first, iter->second); | 214 PostMessageTo(message_port_id, iter->first, iter->second); |
252 } | 215 } |
253 port.queued_messages.clear(); | 216 port.queued_messages.clear(); |
254 } | 217 } |
255 | 218 |
256 void MessagePortDispatcher::Observe(NotificationType type, | 219 void MessagePortService::Erase(int message_port_id) { |
257 const NotificationSource& source, | |
258 const NotificationDetails& details) { | |
259 IPC::Message::Sender* sender = NULL; | |
260 if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { | |
261 sender = Source<RenderMessageFilter>(source).ptr(); | |
262 } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { | |
263 sender = Source<WorkerProcessHost>(source).ptr(); | |
264 } else { | |
265 NOTREACHED(); | |
266 } | |
267 | |
268 // Check if the (possibly) crashed process had any message ports. | |
269 for (MessagePorts::iterator iter = message_ports_.begin(); | |
270 iter != message_ports_.end();) { | |
271 MessagePorts::iterator cur_item = iter++; | |
272 if (cur_item->second.sender == sender) { | |
273 Erase(cur_item->first); | |
274 } | |
275 } | |
276 } | |
277 | |
278 void MessagePortDispatcher::Erase(int message_port_id) { | |
279 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); | 220 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); |
280 DCHECK(erase_item != message_ports_.end()); | 221 DCHECK(erase_item != message_ports_.end()); |
281 | 222 |
282 int entangled_id = erase_item->second.entangled_message_port_id; | 223 int entangled_id = erase_item->second.entangled_message_port_id; |
283 if (entangled_id != MSG_ROUTING_NONE) { | 224 if (entangled_id != MSG_ROUTING_NONE) { |
284 // Do the disentanglement (and be paranoid about the other side existing | 225 // Do the disentanglement (and be paranoid about the other side existing |
285 // just in case something unusual happened during entanglement). | 226 // just in case something unusual happened during entanglement). |
286 if (message_ports_.count(entangled_id)) { | 227 if (message_ports_.count(entangled_id)) { |
287 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; | 228 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; |
288 } | 229 } |
289 } | 230 } |
290 message_ports_.erase(erase_item); | 231 message_ports_.erase(erase_item); |
291 } | 232 } |
OLD | NEW |