| OLD | NEW |
| 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "chrome/browser/worker_host/message_port_dispatcher.h" | 5 #include "chrome/browser/worker_host/message_port_dispatcher.h" |
| 6 | 6 |
| 7 #include "base/singleton.h" | 7 #include "base/singleton.h" |
| 8 #include "chrome/browser/chrome_thread.h" |
| 8 #include "chrome/browser/renderer_host/resource_message_filter.h" | 9 #include "chrome/browser/renderer_host/resource_message_filter.h" |
| 9 #include "chrome/browser/worker_host/worker_process_host.h" | 10 #include "chrome/browser/worker_host/worker_process_host.h" |
| 10 #include "chrome/common/notification_service.h" | 11 #include "chrome/common/notification_service.h" |
| 11 #include "chrome/common/worker_messages.h" | 12 #include "chrome/common/worker_messages.h" |
| 12 | 13 |
| 13 | 14 |
| 14 MessagePortDispatcher* MessagePortDispatcher::GetInstance() { | 15 MessagePortDispatcher* MessagePortDispatcher::GetInstance() { |
| 15 return Singleton<MessagePortDispatcher>::get(); | 16 return Singleton<MessagePortDispatcher>::get(); |
| 16 } | 17 } |
| 17 | 18 |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 56 next_routing_id_ = NULL; | 57 next_routing_id_ = NULL; |
| 57 | 58 |
| 58 return handled; | 59 return handled; |
| 59 } | 60 } |
| 60 | 61 |
| 61 void MessagePortDispatcher::UpdateMessagePort( | 62 void MessagePortDispatcher::UpdateMessagePort( |
| 62 int message_port_id, | 63 int message_port_id, |
| 63 IPC::Message::Sender* sender, | 64 IPC::Message::Sender* sender, |
| 64 int routing_id, | 65 int routing_id, |
| 65 CallbackWithReturnValue<int>::Type* next_routing_id) { | 66 CallbackWithReturnValue<int>::Type* next_routing_id) { |
| 67 DCHECK(CheckMessagePortMap(true)); |
| 66 if (!message_ports_.count(message_port_id)) { | 68 if (!message_ports_.count(message_port_id)) { |
| 67 NOTREACHED(); | 69 NOTREACHED(); |
| 68 return; | 70 return; |
| 69 } | 71 } |
| 70 | 72 |
| 71 MessagePort& port = message_ports_[message_port_id]; | 73 MessagePort& port = message_ports_[message_port_id]; |
| 72 port.sender = sender; | 74 port.sender = sender; |
| 73 port.route_id = routing_id; | 75 port.route_id = routing_id; |
| 74 port.next_routing_id = next_routing_id; | 76 port.next_routing_id = next_routing_id; |
| 77 DCHECK(CheckMessagePortMap(true)); |
| 75 } | 78 } |
| 76 | 79 |
| 77 bool MessagePortDispatcher::Send(IPC::Message* message) { | 80 bool MessagePortDispatcher::Send(IPC::Message* message) { |
| 81 DCHECK(CheckMessagePortMap(true)); |
| 78 return sender_->Send(message); | 82 return sender_->Send(message); |
| 79 } | 83 } |
| 80 | 84 |
| 81 void MessagePortDispatcher::OnCreate(int *route_id, | 85 void MessagePortDispatcher::OnCreate(int *route_id, |
| 82 int* message_port_id) { | 86 int* message_port_id) { |
| 87 DCHECK(CheckMessagePortMap(true)); |
| 83 *message_port_id = ++next_message_port_id_; | 88 *message_port_id = ++next_message_port_id_; |
| 84 *route_id = next_routing_id_->Run(); | 89 *route_id = next_routing_id_->Run(); |
| 85 | 90 |
| 86 MessagePort port; | 91 MessagePort port; |
| 87 port.sender = sender_; | 92 port.sender = sender_; |
| 88 port.route_id = *route_id; | 93 port.route_id = *route_id; |
| 89 port.next_routing_id = next_routing_id_; | 94 port.next_routing_id = next_routing_id_; |
| 90 port.message_port_id = *message_port_id; | 95 port.message_port_id = *message_port_id; |
| 91 port.entangled_message_port_id = MSG_ROUTING_NONE; | 96 port.entangled_message_port_id = MSG_ROUTING_NONE; |
| 92 port.queue_messages = false; | 97 port.queue_messages = false; |
| 93 message_ports_[*message_port_id] = port; | 98 message_ports_[*message_port_id] = port; |
| 99 DCHECK(CheckMessagePortMap(true)); |
| 94 } | 100 } |
| 95 | 101 |
| 96 void MessagePortDispatcher::OnDestroy(int message_port_id) { | 102 void MessagePortDispatcher::OnDestroy(int message_port_id) { |
| 103 DCHECK(CheckMessagePortMap(true)); |
| 97 if (!message_ports_.count(message_port_id)) { | 104 if (!message_ports_.count(message_port_id)) { |
| 98 NOTREACHED(); | 105 NOTREACHED(); |
| 99 return; | 106 return; |
| 100 } | 107 } |
| 101 | 108 |
| 102 DCHECK(message_ports_[message_port_id].queued_messages.empty()); | 109 DCHECK(message_ports_[message_port_id].queued_messages.empty()); |
| 103 message_ports_.erase(message_port_id); | 110 Erase(message_port_id); |
| 111 DCHECK(CheckMessagePortMap(true)); |
| 104 } | 112 } |
| 105 | 113 |
| 106 void MessagePortDispatcher::OnEntangle(int local_message_port_id, | 114 void MessagePortDispatcher::OnEntangle(int local_message_port_id, |
| 107 int remote_message_port_id) { | 115 int remote_message_port_id) { |
| 116 DCHECK(CheckMessagePortMap(false)); |
| 108 if (!message_ports_.count(local_message_port_id) || | 117 if (!message_ports_.count(local_message_port_id) || |
| 109 !message_ports_.count(remote_message_port_id)) { | 118 !message_ports_.count(remote_message_port_id)) { |
| 110 NOTREACHED(); | 119 NOTREACHED(); |
| 111 return; | 120 return; |
| 112 } | 121 } |
| 113 | 122 |
| 114 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == | 123 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == |
| 115 MSG_ROUTING_NONE); | 124 MSG_ROUTING_NONE); |
| 116 message_ports_[remote_message_port_id].entangled_message_port_id = | 125 message_ports_[remote_message_port_id].entangled_message_port_id = |
| 117 local_message_port_id; | 126 local_message_port_id; |
| 127 DCHECK(CheckMessagePortMap(false)); |
| 118 } | 128 } |
| 119 | 129 |
| 120 void MessagePortDispatcher::OnPostMessage( | 130 void MessagePortDispatcher::OnPostMessage( |
| 121 int sender_message_port_id, | 131 int sender_message_port_id, |
| 122 const string16& message, | 132 const string16& message, |
| 123 const std::vector<int>& sent_message_port_ids) { | 133 const std::vector<int>& sent_message_port_ids) { |
| 134 DCHECK(CheckMessagePortMap(true)); |
| 124 if (!message_ports_.count(sender_message_port_id)) { | 135 if (!message_ports_.count(sender_message_port_id)) { |
| 125 NOTREACHED(); | 136 NOTREACHED(); |
| 126 return; | 137 return; |
| 127 } | 138 } |
| 128 | 139 |
| 129 int entangled_message_port_id = | 140 int entangled_message_port_id = |
| 130 message_ports_[sender_message_port_id].entangled_message_port_id; | 141 message_ports_[sender_message_port_id].entangled_message_port_id; |
| 131 if (entangled_message_port_id == MSG_ROUTING_NONE) | 142 if (entangled_message_port_id == MSG_ROUTING_NONE) |
| 132 return; // Process could have crashed. | 143 return; // Process could have crashed. |
| 133 | 144 |
| 134 if (!message_ports_.count(entangled_message_port_id)) { | 145 if (!message_ports_.count(entangled_message_port_id)) { |
| 135 NOTREACHED(); | 146 NOTREACHED(); |
| 136 return; | 147 return; |
| 137 } | 148 } |
| 138 | 149 |
| 139 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); | 150 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); |
| 151 DCHECK(CheckMessagePortMap(true)); |
| 140 } | 152 } |
| 141 | 153 |
| 142 void MessagePortDispatcher::PostMessageTo( | 154 void MessagePortDispatcher::PostMessageTo( |
| 143 int message_port_id, | 155 int message_port_id, |
| 144 const string16& message, | 156 const string16& message, |
| 145 const std::vector<int>& sent_message_port_ids) { | 157 const std::vector<int>& sent_message_port_ids) { |
| 158 DCHECK(CheckMessagePortMap(true)); |
| 146 if (!message_ports_.count(message_port_id)) { | 159 if (!message_ports_.count(message_port_id)) { |
| 147 NOTREACHED(); | 160 NOTREACHED(); |
| 148 return; | 161 return; |
| 149 } | 162 } |
| 150 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { | 163 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { |
| 151 if (!message_ports_.count(sent_message_port_ids[i])) { | 164 if (!message_ports_.count(sent_message_port_ids[i])) { |
| 152 NOTREACHED(); | 165 NOTREACHED(); |
| 153 return; | 166 return; |
| 154 } | 167 } |
| 155 } | 168 } |
| (...skipping 21 matching lines...) Expand all Loading... |
| 177 // Update the entry for the sent port as it can be in a different process. | 190 // Update the entry for the sent port as it can be in a different process. |
| 178 sent_ports[i]->route_id = new_routing_ids[i]; | 191 sent_ports[i]->route_id = new_routing_ids[i]; |
| 179 } | 192 } |
| 180 | 193 |
| 181 // Now send the message to the entangled port. | 194 // Now send the message to the entangled port. |
| 182 IPC::Message* ipc_msg = new WorkerProcessMsg_Message( | 195 IPC::Message* ipc_msg = new WorkerProcessMsg_Message( |
| 183 entangled_port.route_id, message, sent_message_port_ids, | 196 entangled_port.route_id, message, sent_message_port_ids, |
| 184 new_routing_ids); | 197 new_routing_ids); |
| 185 entangled_port.sender->Send(ipc_msg); | 198 entangled_port.sender->Send(ipc_msg); |
| 186 } | 199 } |
| 200 DCHECK(CheckMessagePortMap(true)); |
| 187 } | 201 } |
| 188 | 202 |
| 189 void MessagePortDispatcher::OnQueueMessages(int message_port_id) { | 203 void MessagePortDispatcher::OnQueueMessages(int message_port_id) { |
| 204 DCHECK(CheckMessagePortMap(true)); |
| 190 if (!message_ports_.count(message_port_id)) { | 205 if (!message_ports_.count(message_port_id)) { |
| 191 NOTREACHED(); | 206 NOTREACHED(); |
| 192 return; | 207 return; |
| 193 } | 208 } |
| 194 | 209 |
| 195 MessagePort& port = message_ports_[message_port_id]; | 210 MessagePort& port = message_ports_[message_port_id]; |
| 196 port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); | 211 port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); |
| 197 port.queue_messages = true; | 212 port.queue_messages = true; |
| 198 port.sender = NULL; | 213 port.sender = NULL; |
| 214 DCHECK(CheckMessagePortMap(true)); |
| 199 } | 215 } |
| 200 | 216 |
| 201 void MessagePortDispatcher::OnSendQueuedMessages( | 217 void MessagePortDispatcher::OnSendQueuedMessages( |
| 202 int message_port_id, | 218 int message_port_id, |
| 203 const QueuedMessages& queued_messages) { | 219 const QueuedMessages& queued_messages) { |
| 220 DCHECK(CheckMessagePortMap(true)); |
| 204 if (!message_ports_.count(message_port_id)) { | 221 if (!message_ports_.count(message_port_id)) { |
| 205 NOTREACHED(); | 222 NOTREACHED(); |
| 206 return; | 223 return; |
| 207 } | 224 } |
| 208 | 225 |
| 209 // Send the queued messages to the port again. This time they'll reach the | 226 // Send the queued messages to the port again. This time they'll reach the |
| 210 // new location. | 227 // new location. |
| 211 MessagePort& port = message_ports_[message_port_id]; | 228 MessagePort& port = message_ports_[message_port_id]; |
| 212 port.queue_messages = false; | 229 port.queue_messages = false; |
| 213 port.queued_messages.insert(port.queued_messages.begin(), | 230 port.queued_messages.insert(port.queued_messages.begin(), |
| 214 queued_messages.begin(), | 231 queued_messages.begin(), |
| 215 queued_messages.end()); | 232 queued_messages.end()); |
| 216 SendQueuedMessagesIfPossible(message_port_id); | 233 SendQueuedMessagesIfPossible(message_port_id); |
| 234 DCHECK(CheckMessagePortMap(true)); |
| 217 } | 235 } |
| 218 | 236 |
| 219 void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) { | 237 void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) { |
| 238 DCHECK(CheckMessagePortMap(true)); |
| 220 MessagePort& port = message_ports_[message_port_id]; | 239 MessagePort& port = message_ports_[message_port_id]; |
| 221 if (port.queue_messages || !port.sender) | 240 if (port.queue_messages || !port.sender) |
| 222 return; | 241 return; |
| 223 | 242 |
| 224 for (QueuedMessages::iterator iter = port.queued_messages.begin(); | 243 for (QueuedMessages::iterator iter = port.queued_messages.begin(); |
| 225 iter != port.queued_messages.end(); ++iter) { | 244 iter != port.queued_messages.end(); ++iter) { |
| 226 PostMessageTo(message_port_id, iter->first, iter->second); | 245 PostMessageTo(message_port_id, iter->first, iter->second); |
| 227 } | 246 } |
| 228 port.queued_messages.clear(); | 247 port.queued_messages.clear(); |
| 248 DCHECK(CheckMessagePortMap(true)); |
| 229 } | 249 } |
| 230 | 250 |
| 231 void MessagePortDispatcher::Observe(NotificationType type, | 251 void MessagePortDispatcher::Observe(NotificationType type, |
| 232 const NotificationSource& source, | 252 const NotificationSource& source, |
| 233 const NotificationDetails& details) { | 253 const NotificationDetails& details) { |
| 254 DCHECK(CheckMessagePortMap(true)); |
| 255 |
| 234 IPC::Message::Sender* sender = NULL; | 256 IPC::Message::Sender* sender = NULL; |
| 235 if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { | 257 if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { |
| 236 sender = Source<ResourceMessageFilter>(source).ptr(); | 258 sender = Source<ResourceMessageFilter>(source).ptr(); |
| 237 } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { | 259 } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { |
| 238 sender = Source<WorkerProcessHost>(source).ptr(); | 260 sender = Source<WorkerProcessHost>(source).ptr(); |
| 239 } else { | 261 } else { |
| 240 NOTREACHED(); | 262 NOTREACHED(); |
| 241 } | 263 } |
| 242 | 264 |
| 243 // Check if the (possibly) crashed process had any message ports. | 265 // Check if the (possibly) crashed process had any message ports. |
| 244 for (MessagePorts::iterator iter = message_ports_.begin(); | 266 for (MessagePorts::iterator iter = message_ports_.begin(); |
| 245 iter != message_ports_.end();) { | 267 iter != message_ports_.end();) { |
| 246 MessagePorts::iterator cur_item = iter++; | 268 MessagePorts::iterator cur_item = iter++; |
| 247 if (cur_item->second.sender == sender) { | 269 if (cur_item->second.sender == sender) { |
| 248 if (cur_item->second.entangled_message_port_id != MSG_ROUTING_NONE) { | 270 Erase(cur_item->first); |
| 249 message_ports_[cur_item->second.entangled_message_port_id]. | |
| 250 entangled_message_port_id = MSG_ROUTING_NONE; | |
| 251 } | |
| 252 message_ports_.erase(cur_item); | |
| 253 } | 271 } |
| 254 } | 272 } |
| 273 |
| 274 DCHECK(CheckMessagePortMap(true)); |
| 255 } | 275 } |
| 276 |
| 277 void MessagePortDispatcher::Erase(int message_port_id) { |
| 278 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); |
| 279 DCHECK(erase_item != message_ports_.end()); |
| 280 |
| 281 int entangled_id = erase_item->second.entangled_message_port_id; |
| 282 if (entangled_id != MSG_ROUTING_NONE) { |
| 283 // Do the disentanglement (and be paranoid about the other side existing |
| 284 // just in case something unusual happened during entanglement). |
| 285 if (message_ports_.count(entangled_id)) { |
| 286 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; |
| 287 } |
| 288 } |
| 289 message_ports_.erase(erase_item); |
| 290 } |
| 291 |
| 292 #ifndef NDEBUG |
| 293 bool MessagePortDispatcher::CheckMessagePortMap(bool check_entanglements) { |
| 294 DCHECK(ChromeThread::CurrentlyOn(ChromeThread::IO)); |
| 295 |
| 296 for (MessagePorts::iterator iter = message_ports_.begin(); |
| 297 iter != message_ports_.end(); iter++) { |
| 298 DCHECK(iter->first <= next_message_port_id_); |
| 299 DCHECK(iter->first == iter->second.message_port_id); |
| 300 |
| 301 int entangled_id = iter->second.entangled_message_port_id; |
| 302 if (check_entanglements && entangled_id != MSG_ROUTING_NONE) { |
| 303 MessagePorts::iterator entangled_item = message_ports_.find(entangled_id); |
| 304 DCHECK(entangled_item != message_ports_.end()); |
| 305 DCHECK(entangled_item->second.entangled_message_port_id == iter->first); |
| 306 } |
| 307 } |
| 308 return true; |
| 309 } |
| 310 #endif // NDEBUG |
| OLD | NEW |