 Chromium Code Reviews
 Chromium Code Reviews Issue 6055002:
  Create a message filter for message port messages.  This allows a nice cleanu...  (Closed) 
  Base URL: svn://chrome-svn/chrome/trunk/src/
    
  
    Issue 6055002:
  Create a message filter for message port messages.  This allows a nice cleanu...  (Closed) 
  Base URL: svn://chrome-svn/chrome/trunk/src/| OLD | NEW | 
|---|---|
| 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 #include "chrome/browser/worker_host/message_port_dispatcher.h" | 5 #include "chrome/browser/worker_host/message_port_service.h" | 
| 6 | 6 | 
| 7 #include "base/callback.h" | 7 #include "chrome/browser/worker_host/worker_message_filter.h" | 
| 8 #include "base/singleton.h" | |
| 9 #include "chrome/browser/renderer_host/render_message_filter.h" | |
| 10 #include "chrome/browser/worker_host/worker_process_host.h" | |
| 11 #include "chrome/common/notification_service.h" | |
| 12 #include "chrome/common/worker_messages.h" | 8 #include "chrome/common/worker_messages.h" | 
| 13 | 9 | 
| 14 struct MessagePortDispatcher::MessagePort { | 10 struct MessagePortService::MessagePort { | 
| 15 // sender and route_id are what we need to send messages to the port. | 11 // |filter| and |route_id| are what we need to send messages to the port. | 
| 16 IPC::Message::Sender* sender; | 12 // |filter| is just a weak pointer since we get notified when its process has | 
| 13 // gone away and remove it. | |
| 14 WorkerMessageFilter* filter; | |
| 17 int route_id; | 15 int route_id; | 
| 18 // A function pointer to generate a new route id for the sender above. | |
| 19 // Owned by "sender" above, so don't delete. | |
| 20 CallbackWithReturnValue<int>::Type* next_routing_id; | |
| 21 // A globally unique id for this message port. | 16 // A globally unique id for this message port. | 
| 22 int message_port_id; | 17 int message_port_id; | 
| 23 // The globally unique id of the entangled message port. | 18 // The globally unique id of the entangled message port. | 
| 24 int entangled_message_port_id; | 19 int entangled_message_port_id; | 
| 25 // If true, all messages to this message port are queued and not delivered. | 20 // If true, all messages to this message port are queued and not delivered. | 
| 26 bool queue_messages; | 21 bool queue_messages; | 
| 27 QueuedMessages queued_messages; | 22 QueuedMessages queued_messages; | 
| 28 }; | 23 }; | 
| 29 | 24 | 
| 30 MessagePortDispatcher* MessagePortDispatcher::GetInstance() { | 25 MessagePortService* MessagePortService::GetInstance() { | 
| 31 return Singleton<MessagePortDispatcher>::get(); | 26 return Singleton<MessagePortService>::get(); | 
| 32 } | 27 } | 
| 33 | 28 | 
| 34 MessagePortDispatcher::MessagePortDispatcher() | 29 MessagePortService::MessagePortService() | 
| 35 : next_message_port_id_(0), | 30 : next_message_port_id_(0) { | 
| 36 sender_(NULL), | |
| 37 next_routing_id_(NULL) { | |
| 38 // Receive a notification if a message filter or WorkerProcessHost is deleted. | |
| 39 registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN, | |
| 40 NotificationService::AllSources()); | |
| 41 | |
| 42 registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN, | |
| 43 NotificationService::AllSources()); | |
| 44 } | 31 } | 
| 45 | 32 | 
| 46 MessagePortDispatcher::~MessagePortDispatcher() { | 33 MessagePortService::~MessagePortService() { | 
| 47 } | 34 } | 
| 48 | 35 | 
| 49 bool MessagePortDispatcher::OnMessageReceived( | 36 void MessagePortService::UpdateMessagePort( | 
| 50 const IPC::Message& message, | |
| 51 IPC::Message::Sender* sender, | |
| 52 CallbackWithReturnValue<int>::Type* next_routing_id, | |
| 53 bool* message_was_ok) { | |
| 54 sender_ = sender; | |
| 55 next_routing_id_ = next_routing_id; | |
| 56 | |
| 57 bool handled = true; | |
| 58 *message_was_ok = true; | |
| 59 | |
| 60 IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok) | |
| 61 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate) | |
| 62 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy) | |
| 63 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle) | |
| 64 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage) | |
| 65 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages) | |
| 66 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages, | |
| 67 OnSendQueuedMessages) | |
| 68 IPC_MESSAGE_UNHANDLED(handled = false) | |
| 69 IPC_END_MESSAGE_MAP_EX() | |
| 70 | |
| 71 sender_ = NULL; | |
| 72 next_routing_id_ = NULL; | |
| 73 | |
| 74 return handled; | |
| 75 } | |
| 76 | |
| 77 void MessagePortDispatcher::UpdateMessagePort( | |
| 78 int message_port_id, | 37 int message_port_id, | 
| 79 IPC::Message::Sender* sender, | 38 WorkerMessageFilter* filter, | 
| 80 int routing_id, | 39 int routing_id) { | 
| 81 CallbackWithReturnValue<int>::Type* next_routing_id) { | |
| 82 if (!message_ports_.count(message_port_id)) { | 40 if (!message_ports_.count(message_port_id)) { | 
| 83 NOTREACHED(); | 41 NOTREACHED(); | 
| 84 return; | 42 return; | 
| 85 } | 43 } | 
| 86 | 44 | 
| 87 MessagePort& port = message_ports_[message_port_id]; | 45 MessagePort& port = message_ports_[message_port_id]; | 
| 88 port.sender = sender; | 46 port.filter = filter; | 
| 89 port.route_id = routing_id; | 47 port.route_id = routing_id; | 
| 90 port.next_routing_id = next_routing_id; | |
| 91 } | 48 } | 
| 92 | 49 | 
| 93 bool MessagePortDispatcher::Send(IPC::Message* message) { | 50 void MessagePortService::OnWorkerMessageFilterClosing( | 
| 94 return sender_->Send(message); | 51 WorkerMessageFilter* filter) { | 
| 52 // Check if the (possibly) crashed process had any message ports. | |
| 53 for (MessagePorts::iterator iter = message_ports_.begin(); | |
| 54 iter != message_ports_.end();) { | |
| 55 MessagePorts::iterator cur_item = iter++; | |
| 56 if (cur_item->second.filter == filter) { | |
| 57 Erase(cur_item->first); | |
| 58 } | |
| 59 } | |
| 95 } | 60 } | 
| 96 | 61 | 
| 97 void MessagePortDispatcher::OnCreate(int *route_id, | 62 void MessagePortService::Create(int route_id, | 
| 98 int* message_port_id) { | 63 WorkerMessageFilter* filter, | 
| 64 int* message_port_id) { | |
| 99 *message_port_id = ++next_message_port_id_; | 65 *message_port_id = ++next_message_port_id_; | 
| 100 *route_id = next_routing_id_->Run(); | |
| 101 | 66 | 
| 102 MessagePort port; | 67 MessagePort port; | 
| 103 port.sender = sender_; | 68 port.filter = filter; | 
| 104 port.route_id = *route_id; | 69 port.route_id = route_id; | 
| 105 port.next_routing_id = next_routing_id_; | |
| 106 port.message_port_id = *message_port_id; | 70 port.message_port_id = *message_port_id; | 
| 107 port.entangled_message_port_id = MSG_ROUTING_NONE; | 71 port.entangled_message_port_id = MSG_ROUTING_NONE; | 
| 108 port.queue_messages = false; | 72 port.queue_messages = false; | 
| 109 message_ports_[*message_port_id] = port; | 73 message_ports_[*message_port_id] = port; | 
| 110 } | 74 } | 
| 111 | 75 | 
| 112 void MessagePortDispatcher::OnDestroy(int message_port_id) { | 76 void MessagePortService::Destroy(int message_port_id) { | 
| 113 if (!message_ports_.count(message_port_id)) { | 77 if (!message_ports_.count(message_port_id)) { | 
| 114 NOTREACHED(); | 78 NOTREACHED(); | 
| 115 return; | 79 return; | 
| 116 } | 80 } | 
| 117 | 81 | 
| 118 DCHECK(message_ports_[message_port_id].queued_messages.empty()); | 82 DCHECK(message_ports_[message_port_id].queued_messages.empty()); | 
| 119 Erase(message_port_id); | 83 Erase(message_port_id); | 
| 120 } | 84 } | 
| 121 | 85 | 
| 122 void MessagePortDispatcher::OnEntangle(int local_message_port_id, | 86 void MessagePortService::Entangle(int local_message_port_id, | 
| 123 int remote_message_port_id) { | 87 int remote_message_port_id) { | 
| 124 if (!message_ports_.count(local_message_port_id) || | 88 if (!message_ports_.count(local_message_port_id) || | 
| 125 !message_ports_.count(remote_message_port_id)) { | 89 !message_ports_.count(remote_message_port_id)) { | 
| 126 NOTREACHED(); | 90 NOTREACHED(); | 
| 127 return; | 91 return; | 
| 128 } | 92 } | 
| 129 | 93 | 
| 130 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == | 94 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == | 
| 131 MSG_ROUTING_NONE); | 95 MSG_ROUTING_NONE); | 
| 132 message_ports_[remote_message_port_id].entangled_message_port_id = | 96 message_ports_[remote_message_port_id].entangled_message_port_id = | 
| 133 local_message_port_id; | 97 local_message_port_id; | 
| 134 } | 98 } | 
| 135 | 99 | 
| 136 void MessagePortDispatcher::OnPostMessage( | 100 void MessagePortService::PostMessage( | 
| 137 int sender_message_port_id, | 101 int sender_message_port_id, | 
| 138 const string16& message, | 102 const string16& message, | 
| 139 const std::vector<int>& sent_message_port_ids) { | 103 const std::vector<int>& sent_message_port_ids) { | 
| 140 if (!message_ports_.count(sender_message_port_id)) { | 104 if (!message_ports_.count(sender_message_port_id)) { | 
| 141 NOTREACHED(); | 105 NOTREACHED(); | 
| 142 return; | 106 return; | 
| 143 } | 107 } | 
| 144 | 108 | 
| 145 int entangled_message_port_id = | 109 int entangled_message_port_id = | 
| 146 message_ports_[sender_message_port_id].entangled_message_port_id; | 110 message_ports_[sender_message_port_id].entangled_message_port_id; | 
| 147 if (entangled_message_port_id == MSG_ROUTING_NONE) | 111 if (entangled_message_port_id == MSG_ROUTING_NONE) | 
| 148 return; // Process could have crashed. | 112 return; // Process could have crashed. | 
| 149 | 113 | 
| 150 if (!message_ports_.count(entangled_message_port_id)) { | 114 if (!message_ports_.count(entangled_message_port_id)) { | 
| 151 NOTREACHED(); | 115 NOTREACHED(); | 
| 152 return; | 116 return; | 
| 153 } | 117 } | 
| 154 | 118 | 
| 155 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); | 119 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); | 
| 156 } | 120 } | 
| 157 | 121 | 
| 158 void MessagePortDispatcher::PostMessageTo( | 122 void MessagePortService::PostMessageTo( | 
| 159 int message_port_id, | 123 int message_port_id, | 
| 160 const string16& message, | 124 const string16& message, | 
| 161 const std::vector<int>& sent_message_port_ids) { | 125 const std::vector<int>& sent_message_port_ids) { | 
| 162 if (!message_ports_.count(message_port_id)) { | 126 if (!message_ports_.count(message_port_id)) { | 
| 163 NOTREACHED(); | 127 NOTREACHED(); | 
| 164 return; | 128 return; | 
| 165 } | 129 } | 
| 166 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 130 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 
| 167 if (!message_ports_.count(sent_message_port_ids[i])) { | 131 if (!message_ports_.count(sent_message_port_ids[i])) { | 
| 168 NOTREACHED(); | 132 NOTREACHED(); | 
| 169 return; | 133 return; | 
| 170 } | 134 } | 
| 171 } | 135 } | 
| 172 | 136 | 
| 173 MessagePort& entangled_port = message_ports_[message_port_id]; | 137 MessagePort& entangled_port = message_ports_[message_port_id]; | 
| 174 | 138 | 
| 175 std::vector<MessagePort*> sent_ports(sent_message_port_ids.size()); | 139 std::vector<MessagePort*> sent_ports(sent_message_port_ids.size()); | 
| 176 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 140 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 
| 177 sent_ports[i] = &message_ports_[sent_message_port_ids[i]]; | 141 sent_ports[i] = &message_ports_[sent_message_port_ids[i]]; | 
| 178 sent_ports[i]->queue_messages = true; | 142 sent_ports[i]->queue_messages = true; | 
| 179 } | 143 } | 
| 180 | 144 | 
| 181 if (entangled_port.queue_messages) { | 145 if (entangled_port.queue_messages) { | 
| 182 entangled_port.queued_messages.push_back( | 146 entangled_port.queued_messages.push_back( | 
| 183 std::make_pair(message, sent_message_port_ids)); | 147 std::make_pair(message, sent_message_port_ids)); | 
| 184 } else { | 148 } else if (entangled_port.filter) { // TODO: can filter ever be NULL? | 
| 
Andrew T Wilson (Slow)
2010/12/21 19:28:23
Rather than leave this as a TODO, can we make it:
 | |
| 185 // If a message port was sent around, the new location will need a routing | 149 // If a message port was sent around, the new location will need a routing | 
| 186 // id. Instead of having the created port send us a sync message to get it, | 150 // id. Instead of having the created port send us a sync message to get it, | 
| 187 // send along with the message. | 151 // send along with the message. | 
| 188 std::vector<int> new_routing_ids(sent_message_port_ids.size()); | 152 std::vector<int> new_routing_ids(sent_message_port_ids.size()); | 
| 189 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 153 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 
| 190 new_routing_ids[i] = entangled_port.next_routing_id->Run(); | 154 new_routing_ids[i] = entangled_port.filter->GetNextRoutingID(); | 
| 191 sent_ports[i]->sender = entangled_port.sender; | 155 sent_ports[i]->filter = entangled_port.filter; | 
| 192 | 156 | 
| 193 // Update the entry for the sent port as it can be in a different process. | 157 // Update the entry for the sent port as it can be in a different process. | 
| 194 sent_ports[i]->route_id = new_routing_ids[i]; | 158 sent_ports[i]->route_id = new_routing_ids[i]; | 
| 195 } | 159 } | 
| 196 | 160 | 
| 197 if (entangled_port.sender) { | 161 // Now send the message to the entangled port. | 
| 198 // Now send the message to the entangled port. | 162 entangled_port.filter->Send(new WorkerProcessMsg_Message( | 
| 199 IPC::Message* ipc_msg = new WorkerProcessMsg_Message( | 163 entangled_port.route_id, message, sent_message_port_ids, | 
| 200 entangled_port.route_id, message, sent_message_port_ids, | 164 new_routing_ids)); | 
| 201 new_routing_ids); | |
| 202 entangled_port.sender->Send(ipc_msg); | |
| 203 } | |
| 204 } | 165 } | 
| 205 } | 166 } | 
| 206 | 167 | 
| 207 void MessagePortDispatcher::OnQueueMessages(int message_port_id) { | 168 void MessagePortService::QueueMessages(int message_port_id) { | 
| 208 if (!message_ports_.count(message_port_id)) { | 169 if (!message_ports_.count(message_port_id)) { | 
| 209 NOTREACHED(); | 170 NOTREACHED(); | 
| 210 return; | 171 return; | 
| 211 } | 172 } | 
| 212 | 173 | 
| 213 MessagePort& port = message_ports_[message_port_id]; | 174 MessagePort& port = message_ports_[message_port_id]; | 
| 214 if (port.sender) { | 175 if (port.filter) { | 
| 215 port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); | 176 port.filter->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); | 
| 216 port.queue_messages = true; | 177 port.queue_messages = true; | 
| 217 port.sender = NULL; | 178 port.filter = NULL; | 
| 218 } | 179 } | 
| 219 } | 180 } | 
| 220 | 181 | 
| 221 void MessagePortDispatcher::OnSendQueuedMessages( | 182 void MessagePortService::SendQueuedMessages( | 
| 222 int message_port_id, | 183 int message_port_id, | 
| 223 const QueuedMessages& queued_messages) { | 184 const QueuedMessages& queued_messages) { | 
| 224 if (!message_ports_.count(message_port_id)) { | 185 if (!message_ports_.count(message_port_id)) { | 
| 225 NOTREACHED(); | 186 NOTREACHED(); | 
| 226 return; | 187 return; | 
| 227 } | 188 } | 
| 228 | 189 | 
| 229 // Send the queued messages to the port again. This time they'll reach the | 190 // Send the queued messages to the port again. This time they'll reach the | 
| 230 // new location. | 191 // new location. | 
| 231 MessagePort& port = message_ports_[message_port_id]; | 192 MessagePort& port = message_ports_[message_port_id]; | 
| 232 port.queue_messages = false; | 193 port.queue_messages = false; | 
| 233 port.queued_messages.insert(port.queued_messages.begin(), | 194 port.queued_messages.insert(port.queued_messages.begin(), | 
| 234 queued_messages.begin(), | 195 queued_messages.begin(), | 
| 235 queued_messages.end()); | 196 queued_messages.end()); | 
| 236 SendQueuedMessagesIfPossible(message_port_id); | 197 SendQueuedMessagesIfPossible(message_port_id); | 
| 237 } | 198 } | 
| 238 | 199 | 
| 239 void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) { | 200 void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) { | 
| 240 if (!message_ports_.count(message_port_id)) { | 201 if (!message_ports_.count(message_port_id)) { | 
| 241 NOTREACHED(); | 202 NOTREACHED(); | 
| 242 return; | 203 return; | 
| 243 } | 204 } | 
| 244 | 205 | 
| 245 MessagePort& port = message_ports_[message_port_id]; | 206 MessagePort& port = message_ports_[message_port_id]; | 
| 246 if (port.queue_messages || !port.sender) | 207 if (port.queue_messages || !port.filter) | 
| 247 return; | 208 return; | 
| 248 | 209 | 
| 249 for (QueuedMessages::iterator iter = port.queued_messages.begin(); | 210 for (QueuedMessages::iterator iter = port.queued_messages.begin(); | 
| 250 iter != port.queued_messages.end(); ++iter) { | 211 iter != port.queued_messages.end(); ++iter) { | 
| 251 PostMessageTo(message_port_id, iter->first, iter->second); | 212 PostMessageTo(message_port_id, iter->first, iter->second); | 
| 252 } | 213 } | 
| 253 port.queued_messages.clear(); | 214 port.queued_messages.clear(); | 
| 254 } | 215 } | 
| 255 | 216 | 
| 256 void MessagePortDispatcher::Observe(NotificationType type, | 217 void MessagePortService::Erase(int message_port_id) { | 
| 257 const NotificationSource& source, | |
| 258 const NotificationDetails& details) { | |
| 259 IPC::Message::Sender* sender = NULL; | |
| 260 if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { | |
| 261 sender = Source<RenderMessageFilter>(source).ptr(); | |
| 262 } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { | |
| 263 sender = Source<WorkerProcessHost>(source).ptr(); | |
| 264 } else { | |
| 265 NOTREACHED(); | |
| 266 } | |
| 267 | |
| 268 // Check if the (possibly) crashed process had any message ports. | |
| 269 for (MessagePorts::iterator iter = message_ports_.begin(); | |
| 270 iter != message_ports_.end();) { | |
| 271 MessagePorts::iterator cur_item = iter++; | |
| 272 if (cur_item->second.sender == sender) { | |
| 273 Erase(cur_item->first); | |
| 274 } | |
| 275 } | |
| 276 } | |
| 277 | |
| 278 void MessagePortDispatcher::Erase(int message_port_id) { | |
| 279 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); | 218 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); | 
| 280 DCHECK(erase_item != message_ports_.end()); | 219 DCHECK(erase_item != message_ports_.end()); | 
| 281 | 220 | 
| 282 int entangled_id = erase_item->second.entangled_message_port_id; | 221 int entangled_id = erase_item->second.entangled_message_port_id; | 
| 283 if (entangled_id != MSG_ROUTING_NONE) { | 222 if (entangled_id != MSG_ROUTING_NONE) { | 
| 284 // Do the disentanglement (and be paranoid about the other side existing | 223 // Do the disentanglement (and be paranoid about the other side existing | 
| 285 // just in case something unusual happened during entanglement). | 224 // just in case something unusual happened during entanglement). | 
| 286 if (message_ports_.count(entangled_id)) { | 225 if (message_ports_.count(entangled_id)) { | 
| 287 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; | 226 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; | 
| 288 } | 227 } | 
| 289 } | 228 } | 
| 290 message_ports_.erase(erase_item); | 229 message_ports_.erase(erase_item); | 
| 291 } | 230 } | 
| OLD | NEW |