Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(491)

Side by Side Diff: chrome/browser/worker_host/message_port_dispatcher.cc

Issue 6055002: Create a message filter for message port messages. This allows a nice cleanu... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/worker_host/message_port_dispatcher.h"
6
7 #include "base/callback.h"
8 #include "base/singleton.h"
9 #include "chrome/browser/renderer_host/render_message_filter.h"
10 #include "chrome/browser/worker_host/worker_process_host.h"
11 #include "chrome/common/notification_service.h"
12 #include "chrome/common/worker_messages.h"
13
14 struct MessagePortDispatcher::MessagePort {
15 // sender and route_id are what we need to send messages to the port.
16 IPC::Message::Sender* sender;
17 int route_id;
18 // A function pointer to generate a new route id for the sender above.
19 // Owned by "sender" above, so don't delete.
20 CallbackWithReturnValue<int>::Type* next_routing_id;
21 // A globally unique id for this message port.
22 int message_port_id;
23 // The globally unique id of the entangled message port.
24 int entangled_message_port_id;
25 // If true, all messages to this message port are queued and not delivered.
26 bool queue_messages;
27 QueuedMessages queued_messages;
28 };
29
30 MessagePortDispatcher* MessagePortDispatcher::GetInstance() {
31 return Singleton<MessagePortDispatcher>::get();
32 }
33
34 MessagePortDispatcher::MessagePortDispatcher()
35 : next_message_port_id_(0),
36 sender_(NULL),
37 next_routing_id_(NULL) {
38 // Receive a notification if a message filter or WorkerProcessHost is deleted.
39 registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN,
40 NotificationService::AllSources());
41
42 registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN,
43 NotificationService::AllSources());
44 }
45
46 MessagePortDispatcher::~MessagePortDispatcher() {
47 }
48
49 bool MessagePortDispatcher::OnMessageReceived(
50 const IPC::Message& message,
51 IPC::Message::Sender* sender,
52 CallbackWithReturnValue<int>::Type* next_routing_id,
53 bool* message_was_ok) {
54 sender_ = sender;
55 next_routing_id_ = next_routing_id;
56
57 bool handled = true;
58 *message_was_ok = true;
59
60 IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok)
61 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate)
62 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy)
63 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle)
64 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage)
65 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages)
66 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages,
67 OnSendQueuedMessages)
68 IPC_MESSAGE_UNHANDLED(handled = false)
69 IPC_END_MESSAGE_MAP_EX()
70
71 sender_ = NULL;
72 next_routing_id_ = NULL;
73
74 return handled;
75 }
76
77 void MessagePortDispatcher::UpdateMessagePort(
78 int message_port_id,
79 IPC::Message::Sender* sender,
80 int routing_id,
81 CallbackWithReturnValue<int>::Type* next_routing_id) {
82 if (!message_ports_.count(message_port_id)) {
83 NOTREACHED();
84 return;
85 }
86
87 MessagePort& port = message_ports_[message_port_id];
88 port.sender = sender;
89 port.route_id = routing_id;
90 port.next_routing_id = next_routing_id;
91 }
92
93 bool MessagePortDispatcher::Send(IPC::Message* message) {
94 return sender_->Send(message);
95 }
96
97 void MessagePortDispatcher::OnCreate(int *route_id,
98 int* message_port_id) {
99 *message_port_id = ++next_message_port_id_;
100 *route_id = next_routing_id_->Run();
101
102 MessagePort port;
103 port.sender = sender_;
104 port.route_id = *route_id;
105 port.next_routing_id = next_routing_id_;
106 port.message_port_id = *message_port_id;
107 port.entangled_message_port_id = MSG_ROUTING_NONE;
108 port.queue_messages = false;
109 message_ports_[*message_port_id] = port;
110 }
111
112 void MessagePortDispatcher::OnDestroy(int message_port_id) {
113 if (!message_ports_.count(message_port_id)) {
114 NOTREACHED();
115 return;
116 }
117
118 DCHECK(message_ports_[message_port_id].queued_messages.empty());
119 Erase(message_port_id);
120 }
121
122 void MessagePortDispatcher::OnEntangle(int local_message_port_id,
123 int remote_message_port_id) {
124 if (!message_ports_.count(local_message_port_id) ||
125 !message_ports_.count(remote_message_port_id)) {
126 NOTREACHED();
127 return;
128 }
129
130 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id ==
131 MSG_ROUTING_NONE);
132 message_ports_[remote_message_port_id].entangled_message_port_id =
133 local_message_port_id;
134 }
135
136 void MessagePortDispatcher::OnPostMessage(
137 int sender_message_port_id,
138 const string16& message,
139 const std::vector<int>& sent_message_port_ids) {
140 if (!message_ports_.count(sender_message_port_id)) {
141 NOTREACHED();
142 return;
143 }
144
145 int entangled_message_port_id =
146 message_ports_[sender_message_port_id].entangled_message_port_id;
147 if (entangled_message_port_id == MSG_ROUTING_NONE)
148 return; // Process could have crashed.
149
150 if (!message_ports_.count(entangled_message_port_id)) {
151 NOTREACHED();
152 return;
153 }
154
155 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids);
156 }
157
158 void MessagePortDispatcher::PostMessageTo(
159 int message_port_id,
160 const string16& message,
161 const std::vector<int>& sent_message_port_ids) {
162 if (!message_ports_.count(message_port_id)) {
163 NOTREACHED();
164 return;
165 }
166 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
167 if (!message_ports_.count(sent_message_port_ids[i])) {
168 NOTREACHED();
169 return;
170 }
171 }
172
173 MessagePort& entangled_port = message_ports_[message_port_id];
174
175 std::vector<MessagePort*> sent_ports(sent_message_port_ids.size());
176 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
177 sent_ports[i] = &message_ports_[sent_message_port_ids[i]];
178 sent_ports[i]->queue_messages = true;
179 }
180
181 if (entangled_port.queue_messages) {
182 entangled_port.queued_messages.push_back(
183 std::make_pair(message, sent_message_port_ids));
184 } else {
185 // If a message port was sent around, the new location will need a routing
186 // id. Instead of having the created port send us a sync message to get it,
187 // send along with the message.
188 std::vector<int> new_routing_ids(sent_message_port_ids.size());
189 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
190 new_routing_ids[i] = entangled_port.next_routing_id->Run();
191 sent_ports[i]->sender = entangled_port.sender;
192
193 // Update the entry for the sent port as it can be in a different process.
194 sent_ports[i]->route_id = new_routing_ids[i];
195 }
196
197 if (entangled_port.sender) {
198 // Now send the message to the entangled port.
199 IPC::Message* ipc_msg = new WorkerProcessMsg_Message(
200 entangled_port.route_id, message, sent_message_port_ids,
201 new_routing_ids);
202 entangled_port.sender->Send(ipc_msg);
203 }
204 }
205 }
206
207 void MessagePortDispatcher::OnQueueMessages(int message_port_id) {
208 if (!message_ports_.count(message_port_id)) {
209 NOTREACHED();
210 return;
211 }
212
213 MessagePort& port = message_ports_[message_port_id];
214 if (port.sender) {
215 port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id));
216 port.queue_messages = true;
217 port.sender = NULL;
218 }
219 }
220
221 void MessagePortDispatcher::OnSendQueuedMessages(
222 int message_port_id,
223 const QueuedMessages& queued_messages) {
224 if (!message_ports_.count(message_port_id)) {
225 NOTREACHED();
226 return;
227 }
228
229 // Send the queued messages to the port again. This time they'll reach the
230 // new location.
231 MessagePort& port = message_ports_[message_port_id];
232 port.queue_messages = false;
233 port.queued_messages.insert(port.queued_messages.begin(),
234 queued_messages.begin(),
235 queued_messages.end());
236 SendQueuedMessagesIfPossible(message_port_id);
237 }
238
239 void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) {
240 if (!message_ports_.count(message_port_id)) {
241 NOTREACHED();
242 return;
243 }
244
245 MessagePort& port = message_ports_[message_port_id];
246 if (port.queue_messages || !port.sender)
247 return;
248
249 for (QueuedMessages::iterator iter = port.queued_messages.begin();
250 iter != port.queued_messages.end(); ++iter) {
251 PostMessageTo(message_port_id, iter->first, iter->second);
252 }
253 port.queued_messages.clear();
254 }
255
256 void MessagePortDispatcher::Observe(NotificationType type,
257 const NotificationSource& source,
258 const NotificationDetails& details) {
259 IPC::Message::Sender* sender = NULL;
260 if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) {
261 sender = Source<RenderMessageFilter>(source).ptr();
262 } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) {
263 sender = Source<WorkerProcessHost>(source).ptr();
264 } else {
265 NOTREACHED();
266 }
267
268 // Check if the (possibly) crashed process had any message ports.
269 for (MessagePorts::iterator iter = message_ports_.begin();
270 iter != message_ports_.end();) {
271 MessagePorts::iterator cur_item = iter++;
272 if (cur_item->second.sender == sender) {
273 Erase(cur_item->first);
274 }
275 }
276 }
277
278 void MessagePortDispatcher::Erase(int message_port_id) {
279 MessagePorts::iterator erase_item = message_ports_.find(message_port_id);
280 DCHECK(erase_item != message_ports_.end());
281
282 int entangled_id = erase_item->second.entangled_message_port_id;
283 if (entangled_id != MSG_ROUTING_NONE) {
284 // Do the disentanglement (and be paranoid about the other side existing
285 // just in case something unusual happened during entanglement).
286 if (message_ports_.count(entangled_id)) {
287 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE;
288 }
289 }
290 message_ports_.erase(erase_item);
291 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698