Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(178)

Side by Side Diff: chrome/browser/worker_host/message_port_service.cc

Issue 6055002: Create a message filter for message port messages. This allows a nice cleanu... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/worker_host/message_port_dispatcher.h" 5 #include "chrome/browser/worker_host/message_port_service.h"
6 6
7 #include "base/callback.h" 7 #include "chrome/browser/worker_host/worker_message_filter.h"
8 #include "base/singleton.h"
9 #include "chrome/browser/renderer_host/render_message_filter.h"
10 #include "chrome/browser/worker_host/worker_process_host.h"
11 #include "chrome/common/notification_service.h"
12 #include "chrome/common/worker_messages.h" 8 #include "chrome/common/worker_messages.h"
13 9
14 struct MessagePortDispatcher::MessagePort { 10 struct MessagePortService::MessagePort {
15 // sender and route_id are what we need to send messages to the port. 11 // |filter| and |route_id| are what we need to send messages to the port.
16 IPC::Message::Sender* sender; 12 // |filter| is just a weak pointer since we get notified when its process has
13 // gone away and remove it.
14 WorkerMessageFilter* filter;
17 int route_id; 15 int route_id;
18 // A function pointer to generate a new route id for the sender above.
19 // Owned by "sender" above, so don't delete.
20 CallbackWithReturnValue<int>::Type* next_routing_id;
21 // A globally unique id for this message port. 16 // A globally unique id for this message port.
22 int message_port_id; 17 int message_port_id;
23 // The globally unique id of the entangled message port. 18 // The globally unique id of the entangled message port.
24 int entangled_message_port_id; 19 int entangled_message_port_id;
25 // If true, all messages to this message port are queued and not delivered. 20 // If true, all messages to this message port are queued and not delivered.
26 bool queue_messages; 21 bool queue_messages;
27 QueuedMessages queued_messages; 22 QueuedMessages queued_messages;
28 }; 23 };
29 24
30 MessagePortDispatcher* MessagePortDispatcher::GetInstance() { 25 MessagePortService* MessagePortService::GetInstance() {
31 return Singleton<MessagePortDispatcher>::get(); 26 return Singleton<MessagePortService>::get();
32 } 27 }
33 28
34 MessagePortDispatcher::MessagePortDispatcher() 29 MessagePortService::MessagePortService()
35 : next_message_port_id_(0), 30 : next_message_port_id_(0) {
36 sender_(NULL),
37 next_routing_id_(NULL) {
38 // Receive a notification if a message filter or WorkerProcessHost is deleted.
39 registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN,
40 NotificationService::AllSources());
41
42 registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN,
43 NotificationService::AllSources());
44 } 31 }
45 32
46 MessagePortDispatcher::~MessagePortDispatcher() { 33 MessagePortService::~MessagePortService() {
47 } 34 }
48 35
49 bool MessagePortDispatcher::OnMessageReceived( 36 void MessagePortService::UpdateMessagePort(
50 const IPC::Message& message,
51 IPC::Message::Sender* sender,
52 CallbackWithReturnValue<int>::Type* next_routing_id,
53 bool* message_was_ok) {
54 sender_ = sender;
55 next_routing_id_ = next_routing_id;
56
57 bool handled = true;
58 *message_was_ok = true;
59
60 IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok)
61 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate)
62 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy)
63 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle)
64 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage)
65 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages)
66 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages,
67 OnSendQueuedMessages)
68 IPC_MESSAGE_UNHANDLED(handled = false)
69 IPC_END_MESSAGE_MAP_EX()
70
71 sender_ = NULL;
72 next_routing_id_ = NULL;
73
74 return handled;
75 }
76
77 void MessagePortDispatcher::UpdateMessagePort(
78 int message_port_id, 37 int message_port_id,
79 IPC::Message::Sender* sender, 38 WorkerMessageFilter* filter,
80 int routing_id, 39 int routing_id) {
81 CallbackWithReturnValue<int>::Type* next_routing_id) {
82 if (!message_ports_.count(message_port_id)) { 40 if (!message_ports_.count(message_port_id)) {
83 NOTREACHED(); 41 NOTREACHED();
84 return; 42 return;
85 } 43 }
86 44
87 MessagePort& port = message_ports_[message_port_id]; 45 MessagePort& port = message_ports_[message_port_id];
88 port.sender = sender; 46 port.filter = filter;
89 port.route_id = routing_id; 47 port.route_id = routing_id;
90 port.next_routing_id = next_routing_id;
91 } 48 }
92 49
93 bool MessagePortDispatcher::Send(IPC::Message* message) { 50 void MessagePortService::OnWorkerMessageFilterClosing(
94 return sender_->Send(message); 51 WorkerMessageFilter* filter) {
52 // Check if the (possibly) crashed process had any message ports.
53 for (MessagePorts::iterator iter = message_ports_.begin();
54 iter != message_ports_.end();) {
55 MessagePorts::iterator cur_item = iter++;
56 if (cur_item->second.filter == filter) {
57 Erase(cur_item->first);
58 }
59 }
95 } 60 }
96 61
97 void MessagePortDispatcher::OnCreate(int *route_id, 62 void MessagePortService::Create(int route_id,
98 int* message_port_id) { 63 WorkerMessageFilter* filter,
64 int* message_port_id) {
99 *message_port_id = ++next_message_port_id_; 65 *message_port_id = ++next_message_port_id_;
100 *route_id = next_routing_id_->Run();
101 66
102 MessagePort port; 67 MessagePort port;
103 port.sender = sender_; 68 port.filter = filter;
104 port.route_id = *route_id; 69 port.route_id = route_id;
105 port.next_routing_id = next_routing_id_;
106 port.message_port_id = *message_port_id; 70 port.message_port_id = *message_port_id;
107 port.entangled_message_port_id = MSG_ROUTING_NONE; 71 port.entangled_message_port_id = MSG_ROUTING_NONE;
108 port.queue_messages = false; 72 port.queue_messages = false;
109 message_ports_[*message_port_id] = port; 73 message_ports_[*message_port_id] = port;
110 } 74 }
111 75
112 void MessagePortDispatcher::OnDestroy(int message_port_id) { 76 void MessagePortService::Destroy(int message_port_id) {
113 if (!message_ports_.count(message_port_id)) { 77 if (!message_ports_.count(message_port_id)) {
114 NOTREACHED(); 78 NOTREACHED();
115 return; 79 return;
116 } 80 }
117 81
118 DCHECK(message_ports_[message_port_id].queued_messages.empty()); 82 DCHECK(message_ports_[message_port_id].queued_messages.empty());
119 Erase(message_port_id); 83 Erase(message_port_id);
120 } 84 }
121 85
122 void MessagePortDispatcher::OnEntangle(int local_message_port_id, 86 void MessagePortService::Entangle(int local_message_port_id,
123 int remote_message_port_id) { 87 int remote_message_port_id) {
124 if (!message_ports_.count(local_message_port_id) || 88 if (!message_ports_.count(local_message_port_id) ||
125 !message_ports_.count(remote_message_port_id)) { 89 !message_ports_.count(remote_message_port_id)) {
126 NOTREACHED(); 90 NOTREACHED();
127 return; 91 return;
128 } 92 }
129 93
130 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == 94 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id ==
131 MSG_ROUTING_NONE); 95 MSG_ROUTING_NONE);
132 message_ports_[remote_message_port_id].entangled_message_port_id = 96 message_ports_[remote_message_port_id].entangled_message_port_id =
133 local_message_port_id; 97 local_message_port_id;
134 } 98 }
135 99
136 void MessagePortDispatcher::OnPostMessage( 100 void MessagePortService::PostMessage(
137 int sender_message_port_id, 101 int sender_message_port_id,
138 const string16& message, 102 const string16& message,
139 const std::vector<int>& sent_message_port_ids) { 103 const std::vector<int>& sent_message_port_ids) {
140 if (!message_ports_.count(sender_message_port_id)) { 104 if (!message_ports_.count(sender_message_port_id)) {
141 NOTREACHED(); 105 NOTREACHED();
142 return; 106 return;
143 } 107 }
144 108
145 int entangled_message_port_id = 109 int entangled_message_port_id =
146 message_ports_[sender_message_port_id].entangled_message_port_id; 110 message_ports_[sender_message_port_id].entangled_message_port_id;
147 if (entangled_message_port_id == MSG_ROUTING_NONE) 111 if (entangled_message_port_id == MSG_ROUTING_NONE)
148 return; // Process could have crashed. 112 return; // Process could have crashed.
149 113
150 if (!message_ports_.count(entangled_message_port_id)) { 114 if (!message_ports_.count(entangled_message_port_id)) {
151 NOTREACHED(); 115 NOTREACHED();
152 return; 116 return;
153 } 117 }
154 118
155 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); 119 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids);
156 } 120 }
157 121
158 void MessagePortDispatcher::PostMessageTo( 122 void MessagePortService::PostMessageTo(
159 int message_port_id, 123 int message_port_id,
160 const string16& message, 124 const string16& message,
161 const std::vector<int>& sent_message_port_ids) { 125 const std::vector<int>& sent_message_port_ids) {
162 if (!message_ports_.count(message_port_id)) { 126 if (!message_ports_.count(message_port_id)) {
163 NOTREACHED(); 127 NOTREACHED();
164 return; 128 return;
165 } 129 }
166 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 130 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
167 if (!message_ports_.count(sent_message_port_ids[i])) { 131 if (!message_ports_.count(sent_message_port_ids[i])) {
168 NOTREACHED(); 132 NOTREACHED();
169 return; 133 return;
170 } 134 }
171 } 135 }
172 136
173 MessagePort& entangled_port = message_ports_[message_port_id]; 137 MessagePort& entangled_port = message_ports_[message_port_id];
174 138
175 std::vector<MessagePort*> sent_ports(sent_message_port_ids.size()); 139 std::vector<MessagePort*> sent_ports(sent_message_port_ids.size());
176 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 140 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
177 sent_ports[i] = &message_ports_[sent_message_port_ids[i]]; 141 sent_ports[i] = &message_ports_[sent_message_port_ids[i]];
178 sent_ports[i]->queue_messages = true; 142 sent_ports[i]->queue_messages = true;
179 } 143 }
180 144
181 if (entangled_port.queue_messages) { 145 if (entangled_port.queue_messages) {
182 entangled_port.queued_messages.push_back( 146 entangled_port.queued_messages.push_back(
183 std::make_pair(message, sent_message_port_ids)); 147 std::make_pair(message, sent_message_port_ids));
184 } else { 148 } else if (entangled_port.filter) { // TODO: can filter ever be NULL?
Andrew T Wilson (Slow) 2010/12/21 19:28:23 Rather than leave this as a TODO, can we make it:
185 // If a message port was sent around, the new location will need a routing 149 // If a message port was sent around, the new location will need a routing
186 // id. Instead of having the created port send us a sync message to get it, 150 // id. Instead of having the created port send us a sync message to get it,
187 // send along with the message. 151 // send along with the message.
188 std::vector<int> new_routing_ids(sent_message_port_ids.size()); 152 std::vector<int> new_routing_ids(sent_message_port_ids.size());
189 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 153 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
190 new_routing_ids[i] = entangled_port.next_routing_id->Run(); 154 new_routing_ids[i] = entangled_port.filter->GetNextRoutingID();
191 sent_ports[i]->sender = entangled_port.sender; 155 sent_ports[i]->filter = entangled_port.filter;
192 156
193 // Update the entry for the sent port as it can be in a different process. 157 // Update the entry for the sent port as it can be in a different process.
194 sent_ports[i]->route_id = new_routing_ids[i]; 158 sent_ports[i]->route_id = new_routing_ids[i];
195 } 159 }
196 160
197 if (entangled_port.sender) { 161 // Now send the message to the entangled port.
198 // Now send the message to the entangled port. 162 entangled_port.filter->Send(new WorkerProcessMsg_Message(
199 IPC::Message* ipc_msg = new WorkerProcessMsg_Message( 163 entangled_port.route_id, message, sent_message_port_ids,
200 entangled_port.route_id, message, sent_message_port_ids, 164 new_routing_ids));
201 new_routing_ids);
202 entangled_port.sender->Send(ipc_msg);
203 }
204 } 165 }
205 } 166 }
206 167
207 void MessagePortDispatcher::OnQueueMessages(int message_port_id) { 168 void MessagePortService::QueueMessages(int message_port_id) {
208 if (!message_ports_.count(message_port_id)) { 169 if (!message_ports_.count(message_port_id)) {
209 NOTREACHED(); 170 NOTREACHED();
210 return; 171 return;
211 } 172 }
212 173
213 MessagePort& port = message_ports_[message_port_id]; 174 MessagePort& port = message_ports_[message_port_id];
214 if (port.sender) { 175 if (port.filter) {
215 port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); 176 port.filter->Send(new WorkerProcessMsg_MessagesQueued(port.route_id));
216 port.queue_messages = true; 177 port.queue_messages = true;
217 port.sender = NULL; 178 port.filter = NULL;
218 } 179 }
219 } 180 }
220 181
221 void MessagePortDispatcher::OnSendQueuedMessages( 182 void MessagePortService::SendQueuedMessages(
222 int message_port_id, 183 int message_port_id,
223 const QueuedMessages& queued_messages) { 184 const QueuedMessages& queued_messages) {
224 if (!message_ports_.count(message_port_id)) { 185 if (!message_ports_.count(message_port_id)) {
225 NOTREACHED(); 186 NOTREACHED();
226 return; 187 return;
227 } 188 }
228 189
229 // Send the queued messages to the port again. This time they'll reach the 190 // Send the queued messages to the port again. This time they'll reach the
230 // new location. 191 // new location.
231 MessagePort& port = message_ports_[message_port_id]; 192 MessagePort& port = message_ports_[message_port_id];
232 port.queue_messages = false; 193 port.queue_messages = false;
233 port.queued_messages.insert(port.queued_messages.begin(), 194 port.queued_messages.insert(port.queued_messages.begin(),
234 queued_messages.begin(), 195 queued_messages.begin(),
235 queued_messages.end()); 196 queued_messages.end());
236 SendQueuedMessagesIfPossible(message_port_id); 197 SendQueuedMessagesIfPossible(message_port_id);
237 } 198 }
238 199
239 void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) { 200 void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) {
240 if (!message_ports_.count(message_port_id)) { 201 if (!message_ports_.count(message_port_id)) {
241 NOTREACHED(); 202 NOTREACHED();
242 return; 203 return;
243 } 204 }
244 205
245 MessagePort& port = message_ports_[message_port_id]; 206 MessagePort& port = message_ports_[message_port_id];
246 if (port.queue_messages || !port.sender) 207 if (port.queue_messages || !port.filter)
247 return; 208 return;
248 209
249 for (QueuedMessages::iterator iter = port.queued_messages.begin(); 210 for (QueuedMessages::iterator iter = port.queued_messages.begin();
250 iter != port.queued_messages.end(); ++iter) { 211 iter != port.queued_messages.end(); ++iter) {
251 PostMessageTo(message_port_id, iter->first, iter->second); 212 PostMessageTo(message_port_id, iter->first, iter->second);
252 } 213 }
253 port.queued_messages.clear(); 214 port.queued_messages.clear();
254 } 215 }
255 216
256 void MessagePortDispatcher::Observe(NotificationType type, 217 void MessagePortService::Erase(int message_port_id) {
257 const NotificationSource& source,
258 const NotificationDetails& details) {
259 IPC::Message::Sender* sender = NULL;
260 if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) {
261 sender = Source<RenderMessageFilter>(source).ptr();
262 } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) {
263 sender = Source<WorkerProcessHost>(source).ptr();
264 } else {
265 NOTREACHED();
266 }
267
268 // Check if the (possibly) crashed process had any message ports.
269 for (MessagePorts::iterator iter = message_ports_.begin();
270 iter != message_ports_.end();) {
271 MessagePorts::iterator cur_item = iter++;
272 if (cur_item->second.sender == sender) {
273 Erase(cur_item->first);
274 }
275 }
276 }
277
278 void MessagePortDispatcher::Erase(int message_port_id) {
279 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); 218 MessagePorts::iterator erase_item = message_ports_.find(message_port_id);
280 DCHECK(erase_item != message_ports_.end()); 219 DCHECK(erase_item != message_ports_.end());
281 220
282 int entangled_id = erase_item->second.entangled_message_port_id; 221 int entangled_id = erase_item->second.entangled_message_port_id;
283 if (entangled_id != MSG_ROUTING_NONE) { 222 if (entangled_id != MSG_ROUTING_NONE) {
284 // Do the disentanglement (and be paranoid about the other side existing 223 // Do the disentanglement (and be paranoid about the other side existing
285 // just in case something unusual happened during entanglement). 224 // just in case something unusual happened during entanglement).
286 if (message_ports_.count(entangled_id)) { 225 if (message_ports_.count(entangled_id)) {
287 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; 226 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE;
288 } 227 }
289 } 228 }
290 message_ports_.erase(erase_item); 229 message_ports_.erase(erase_item);
291 } 230 }
OLDNEW
« no previous file with comments | « chrome/browser/worker_host/message_port_service.h ('k') | chrome/browser/worker_host/worker_document_set.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698