| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "chrome/browser/worker_host/message_port_dispatcher.h" | |
| 6 | |
| 7 #include "base/callback.h" | |
| 8 #include "base/singleton.h" | |
| 9 #include "chrome/browser/renderer_host/render_message_filter.h" | |
| 10 #include "chrome/browser/worker_host/worker_process_host.h" | |
| 11 #include "chrome/common/notification_service.h" | |
| 12 #include "chrome/common/worker_messages.h" | |
| 13 | |
| 14 struct MessagePortDispatcher::MessagePort { | |
| 15 // sender and route_id are what we need to send messages to the port. | |
| 16 IPC::Message::Sender* sender; | |
| 17 int route_id; | |
| 18 // A function pointer to generate a new route id for the sender above. | |
| 19 // Owned by "sender" above, so don't delete. | |
| 20 CallbackWithReturnValue<int>::Type* next_routing_id; | |
| 21 // A globally unique id for this message port. | |
| 22 int message_port_id; | |
| 23 // The globally unique id of the entangled message port. | |
| 24 int entangled_message_port_id; | |
| 25 // If true, all messages to this message port are queued and not delivered. | |
| 26 bool queue_messages; | |
| 27 QueuedMessages queued_messages; | |
| 28 }; | |
| 29 | |
| 30 MessagePortDispatcher* MessagePortDispatcher::GetInstance() { | |
| 31 return Singleton<MessagePortDispatcher>::get(); | |
| 32 } | |
| 33 | |
| 34 MessagePortDispatcher::MessagePortDispatcher() | |
| 35 : next_message_port_id_(0), | |
| 36 sender_(NULL), | |
| 37 next_routing_id_(NULL) { | |
| 38 // Receive a notification if a message filter or WorkerProcessHost is deleted. | |
| 39 registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN, | |
| 40 NotificationService::AllSources()); | |
| 41 | |
| 42 registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN, | |
| 43 NotificationService::AllSources()); | |
| 44 } | |
| 45 | |
| 46 MessagePortDispatcher::~MessagePortDispatcher() { | |
| 47 } | |
| 48 | |
| 49 bool MessagePortDispatcher::OnMessageReceived( | |
| 50 const IPC::Message& message, | |
| 51 IPC::Message::Sender* sender, | |
| 52 CallbackWithReturnValue<int>::Type* next_routing_id, | |
| 53 bool* message_was_ok) { | |
| 54 sender_ = sender; | |
| 55 next_routing_id_ = next_routing_id; | |
| 56 | |
| 57 bool handled = true; | |
| 58 *message_was_ok = true; | |
| 59 | |
| 60 IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok) | |
| 61 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate) | |
| 62 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy) | |
| 63 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle) | |
| 64 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage) | |
| 65 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages) | |
| 66 IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages, | |
| 67 OnSendQueuedMessages) | |
| 68 IPC_MESSAGE_UNHANDLED(handled = false) | |
| 69 IPC_END_MESSAGE_MAP_EX() | |
| 70 | |
| 71 sender_ = NULL; | |
| 72 next_routing_id_ = NULL; | |
| 73 | |
| 74 return handled; | |
| 75 } | |
| 76 | |
| 77 void MessagePortDispatcher::UpdateMessagePort( | |
| 78 int message_port_id, | |
| 79 IPC::Message::Sender* sender, | |
| 80 int routing_id, | |
| 81 CallbackWithReturnValue<int>::Type* next_routing_id) { | |
| 82 if (!message_ports_.count(message_port_id)) { | |
| 83 NOTREACHED(); | |
| 84 return; | |
| 85 } | |
| 86 | |
| 87 MessagePort& port = message_ports_[message_port_id]; | |
| 88 port.sender = sender; | |
| 89 port.route_id = routing_id; | |
| 90 port.next_routing_id = next_routing_id; | |
| 91 } | |
| 92 | |
| 93 bool MessagePortDispatcher::Send(IPC::Message* message) { | |
| 94 return sender_->Send(message); | |
| 95 } | |
| 96 | |
| 97 void MessagePortDispatcher::OnCreate(int *route_id, | |
| 98 int* message_port_id) { | |
| 99 *message_port_id = ++next_message_port_id_; | |
| 100 *route_id = next_routing_id_->Run(); | |
| 101 | |
| 102 MessagePort port; | |
| 103 port.sender = sender_; | |
| 104 port.route_id = *route_id; | |
| 105 port.next_routing_id = next_routing_id_; | |
| 106 port.message_port_id = *message_port_id; | |
| 107 port.entangled_message_port_id = MSG_ROUTING_NONE; | |
| 108 port.queue_messages = false; | |
| 109 message_ports_[*message_port_id] = port; | |
| 110 } | |
| 111 | |
| 112 void MessagePortDispatcher::OnDestroy(int message_port_id) { | |
| 113 if (!message_ports_.count(message_port_id)) { | |
| 114 NOTREACHED(); | |
| 115 return; | |
| 116 } | |
| 117 | |
| 118 DCHECK(message_ports_[message_port_id].queued_messages.empty()); | |
| 119 Erase(message_port_id); | |
| 120 } | |
| 121 | |
| 122 void MessagePortDispatcher::OnEntangle(int local_message_port_id, | |
| 123 int remote_message_port_id) { | |
| 124 if (!message_ports_.count(local_message_port_id) || | |
| 125 !message_ports_.count(remote_message_port_id)) { | |
| 126 NOTREACHED(); | |
| 127 return; | |
| 128 } | |
| 129 | |
| 130 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == | |
| 131 MSG_ROUTING_NONE); | |
| 132 message_ports_[remote_message_port_id].entangled_message_port_id = | |
| 133 local_message_port_id; | |
| 134 } | |
| 135 | |
| 136 void MessagePortDispatcher::OnPostMessage( | |
| 137 int sender_message_port_id, | |
| 138 const string16& message, | |
| 139 const std::vector<int>& sent_message_port_ids) { | |
| 140 if (!message_ports_.count(sender_message_port_id)) { | |
| 141 NOTREACHED(); | |
| 142 return; | |
| 143 } | |
| 144 | |
| 145 int entangled_message_port_id = | |
| 146 message_ports_[sender_message_port_id].entangled_message_port_id; | |
| 147 if (entangled_message_port_id == MSG_ROUTING_NONE) | |
| 148 return; // Process could have crashed. | |
| 149 | |
| 150 if (!message_ports_.count(entangled_message_port_id)) { | |
| 151 NOTREACHED(); | |
| 152 return; | |
| 153 } | |
| 154 | |
| 155 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); | |
| 156 } | |
| 157 | |
| 158 void MessagePortDispatcher::PostMessageTo( | |
| 159 int message_port_id, | |
| 160 const string16& message, | |
| 161 const std::vector<int>& sent_message_port_ids) { | |
| 162 if (!message_ports_.count(message_port_id)) { | |
| 163 NOTREACHED(); | |
| 164 return; | |
| 165 } | |
| 166 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | |
| 167 if (!message_ports_.count(sent_message_port_ids[i])) { | |
| 168 NOTREACHED(); | |
| 169 return; | |
| 170 } | |
| 171 } | |
| 172 | |
| 173 MessagePort& entangled_port = message_ports_[message_port_id]; | |
| 174 | |
| 175 std::vector<MessagePort*> sent_ports(sent_message_port_ids.size()); | |
| 176 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | |
| 177 sent_ports[i] = &message_ports_[sent_message_port_ids[i]]; | |
| 178 sent_ports[i]->queue_messages = true; | |
| 179 } | |
| 180 | |
| 181 if (entangled_port.queue_messages) { | |
| 182 entangled_port.queued_messages.push_back( | |
| 183 std::make_pair(message, sent_message_port_ids)); | |
| 184 } else { | |
| 185 // If a message port was sent around, the new location will need a routing | |
| 186 // id. Instead of having the created port send us a sync message to get it, | |
| 187 // send along with the message. | |
| 188 std::vector<int> new_routing_ids(sent_message_port_ids.size()); | |
| 189 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | |
| 190 new_routing_ids[i] = entangled_port.next_routing_id->Run(); | |
| 191 sent_ports[i]->sender = entangled_port.sender; | |
| 192 | |
| 193 // Update the entry for the sent port as it can be in a different process. | |
| 194 sent_ports[i]->route_id = new_routing_ids[i]; | |
| 195 } | |
| 196 | |
| 197 if (entangled_port.sender) { | |
| 198 // Now send the message to the entangled port. | |
| 199 IPC::Message* ipc_msg = new WorkerProcessMsg_Message( | |
| 200 entangled_port.route_id, message, sent_message_port_ids, | |
| 201 new_routing_ids); | |
| 202 entangled_port.sender->Send(ipc_msg); | |
| 203 } | |
| 204 } | |
| 205 } | |
| 206 | |
| 207 void MessagePortDispatcher::OnQueueMessages(int message_port_id) { | |
| 208 if (!message_ports_.count(message_port_id)) { | |
| 209 NOTREACHED(); | |
| 210 return; | |
| 211 } | |
| 212 | |
| 213 MessagePort& port = message_ports_[message_port_id]; | |
| 214 if (port.sender) { | |
| 215 port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); | |
| 216 port.queue_messages = true; | |
| 217 port.sender = NULL; | |
| 218 } | |
| 219 } | |
| 220 | |
| 221 void MessagePortDispatcher::OnSendQueuedMessages( | |
| 222 int message_port_id, | |
| 223 const QueuedMessages& queued_messages) { | |
| 224 if (!message_ports_.count(message_port_id)) { | |
| 225 NOTREACHED(); | |
| 226 return; | |
| 227 } | |
| 228 | |
| 229 // Send the queued messages to the port again. This time they'll reach the | |
| 230 // new location. | |
| 231 MessagePort& port = message_ports_[message_port_id]; | |
| 232 port.queue_messages = false; | |
| 233 port.queued_messages.insert(port.queued_messages.begin(), | |
| 234 queued_messages.begin(), | |
| 235 queued_messages.end()); | |
| 236 SendQueuedMessagesIfPossible(message_port_id); | |
| 237 } | |
| 238 | |
| 239 void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) { | |
| 240 if (!message_ports_.count(message_port_id)) { | |
| 241 NOTREACHED(); | |
| 242 return; | |
| 243 } | |
| 244 | |
| 245 MessagePort& port = message_ports_[message_port_id]; | |
| 246 if (port.queue_messages || !port.sender) | |
| 247 return; | |
| 248 | |
| 249 for (QueuedMessages::iterator iter = port.queued_messages.begin(); | |
| 250 iter != port.queued_messages.end(); ++iter) { | |
| 251 PostMessageTo(message_port_id, iter->first, iter->second); | |
| 252 } | |
| 253 port.queued_messages.clear(); | |
| 254 } | |
| 255 | |
| 256 void MessagePortDispatcher::Observe(NotificationType type, | |
| 257 const NotificationSource& source, | |
| 258 const NotificationDetails& details) { | |
| 259 IPC::Message::Sender* sender = NULL; | |
| 260 if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { | |
| 261 sender = Source<RenderMessageFilter>(source).ptr(); | |
| 262 } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { | |
| 263 sender = Source<WorkerProcessHost>(source).ptr(); | |
| 264 } else { | |
| 265 NOTREACHED(); | |
| 266 } | |
| 267 | |
| 268 // Check if the (possibly) crashed process had any message ports. | |
| 269 for (MessagePorts::iterator iter = message_ports_.begin(); | |
| 270 iter != message_ports_.end();) { | |
| 271 MessagePorts::iterator cur_item = iter++; | |
| 272 if (cur_item->second.sender == sender) { | |
| 273 Erase(cur_item->first); | |
| 274 } | |
| 275 } | |
| 276 } | |
| 277 | |
| 278 void MessagePortDispatcher::Erase(int message_port_id) { | |
| 279 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); | |
| 280 DCHECK(erase_item != message_ports_.end()); | |
| 281 | |
| 282 int entangled_id = erase_item->second.entangled_message_port_id; | |
| 283 if (entangled_id != MSG_ROUTING_NONE) { | |
| 284 // Do the disentanglement (and be paranoid about the other side existing | |
| 285 // just in case something unusual happened during entanglement). | |
| 286 if (message_ports_.count(entangled_id)) { | |
| 287 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; | |
| 288 } | |
| 289 } | |
| 290 message_ports_.erase(erase_item); | |
| 291 } | |
| OLD | NEW |