Chromium Code Reviews| Index: chrome/browser/worker_host/message_port_service.cc |
| =================================================================== |
| --- chrome/browser/worker_host/message_port_service.cc (revision 69328) |
| +++ chrome/browser/worker_host/message_port_service.cc (working copy) |
| @@ -2,22 +2,17 @@ |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| -#include "chrome/browser/worker_host/message_port_dispatcher.h" |
| +#include "chrome/browser/worker_host/message_port_service.h" |
| -#include "base/callback.h" |
| -#include "base/singleton.h" |
| -#include "chrome/browser/renderer_host/render_message_filter.h" |
| -#include "chrome/browser/worker_host/worker_process_host.h" |
| -#include "chrome/common/notification_service.h" |
| +#include "chrome/browser/worker_host/worker_message_filter.h" |
| #include "chrome/common/worker_messages.h" |
| -struct MessagePortDispatcher::MessagePort { |
| - // sender and route_id are what we need to send messages to the port. |
| - IPC::Message::Sender* sender; |
| +struct MessagePortService::MessagePort { |
| + // |filter| and |route_id| are what we need to send messages to the port. |
| + // |filter| is just a weak pointer since we get notified when its process has |
| + // gone away and remove it. |
| + WorkerMessageFilter* filter; |
| int route_id; |
| - // A function pointer to generate a new route id for the sender above. |
| - // Owned by "sender" above, so don't delete. |
| - CallbackWithReturnValue<int>::Type* next_routing_id; |
| // A globally unique id for this message port. |
| int message_port_id; |
| // The globally unique id of the entangled message port. |
| @@ -27,89 +22,58 @@ |
| QueuedMessages queued_messages; |
| }; |
| -MessagePortDispatcher* MessagePortDispatcher::GetInstance() { |
| - return Singleton<MessagePortDispatcher>::get(); |
| +MessagePortService* MessagePortService::GetInstance() { |
| + return Singleton<MessagePortService>::get(); |
| } |
| -MessagePortDispatcher::MessagePortDispatcher() |
| - : next_message_port_id_(0), |
| - sender_(NULL), |
| - next_routing_id_(NULL) { |
| - // Receive a notification if a message filter or WorkerProcessHost is deleted. |
| - registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN, |
| - NotificationService::AllSources()); |
| - |
| - registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN, |
| - NotificationService::AllSources()); |
| +MessagePortService::MessagePortService() |
| + : next_message_port_id_(0) { |
| } |
| -MessagePortDispatcher::~MessagePortDispatcher() { |
| +MessagePortService::~MessagePortService() { |
| } |
| -bool MessagePortDispatcher::OnMessageReceived( |
| - const IPC::Message& message, |
| - IPC::Message::Sender* sender, |
| - CallbackWithReturnValue<int>::Type* next_routing_id, |
| - bool* message_was_ok) { |
| - sender_ = sender; |
| - next_routing_id_ = next_routing_id; |
| - |
| - bool handled = true; |
| - *message_was_ok = true; |
| - |
| - IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok) |
| - IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate) |
| - IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy) |
| - IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle) |
| - IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage) |
| - IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages) |
| - IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages, |
| - OnSendQueuedMessages) |
| - IPC_MESSAGE_UNHANDLED(handled = false) |
| - IPC_END_MESSAGE_MAP_EX() |
| - |
| - sender_ = NULL; |
| - next_routing_id_ = NULL; |
| - |
| - return handled; |
| -} |
| - |
| -void MessagePortDispatcher::UpdateMessagePort( |
| +void MessagePortService::UpdateMessagePort( |
| int message_port_id, |
| - IPC::Message::Sender* sender, |
| - int routing_id, |
| - CallbackWithReturnValue<int>::Type* next_routing_id) { |
| + WorkerMessageFilter* filter, |
| + int routing_id) { |
| if (!message_ports_.count(message_port_id)) { |
| NOTREACHED(); |
| return; |
| } |
| MessagePort& port = message_ports_[message_port_id]; |
| - port.sender = sender; |
| + port.filter = filter; |
| port.route_id = routing_id; |
| - port.next_routing_id = next_routing_id; |
| } |
| -bool MessagePortDispatcher::Send(IPC::Message* message) { |
| - return sender_->Send(message); |
| +void MessagePortService::OnWorkerMessageFilterClosing( |
| + WorkerMessageFilter* filter) { |
| + // Check if the (possibly) crashed process had any message ports. |
| + for (MessagePorts::iterator iter = message_ports_.begin(); |
| + iter != message_ports_.end();) { |
| + MessagePorts::iterator cur_item = iter++; |
| + if (cur_item->second.filter == filter) { |
| + Erase(cur_item->first); |
| + } |
| + } |
| } |
| -void MessagePortDispatcher::OnCreate(int *route_id, |
| - int* message_port_id) { |
| +void MessagePortService::Create(int route_id, |
| + WorkerMessageFilter* filter, |
| + int* message_port_id) { |
| *message_port_id = ++next_message_port_id_; |
| - *route_id = next_routing_id_->Run(); |
| MessagePort port; |
| - port.sender = sender_; |
| - port.route_id = *route_id; |
| - port.next_routing_id = next_routing_id_; |
| + port.filter = filter; |
| + port.route_id = route_id; |
| port.message_port_id = *message_port_id; |
| port.entangled_message_port_id = MSG_ROUTING_NONE; |
| port.queue_messages = false; |
| message_ports_[*message_port_id] = port; |
| } |
| -void MessagePortDispatcher::OnDestroy(int message_port_id) { |
| +void MessagePortService::Destroy(int message_port_id) { |
| if (!message_ports_.count(message_port_id)) { |
| NOTREACHED(); |
| return; |
| @@ -119,8 +83,8 @@ |
| Erase(message_port_id); |
| } |
| -void MessagePortDispatcher::OnEntangle(int local_message_port_id, |
| - int remote_message_port_id) { |
| +void MessagePortService::Entangle(int local_message_port_id, |
| + int remote_message_port_id) { |
| if (!message_ports_.count(local_message_port_id) || |
| !message_ports_.count(remote_message_port_id)) { |
| NOTREACHED(); |
| @@ -133,7 +97,7 @@ |
| local_message_port_id; |
| } |
| -void MessagePortDispatcher::OnPostMessage( |
| +void MessagePortService::PostMessage( |
| int sender_message_port_id, |
| const string16& message, |
| const std::vector<int>& sent_message_port_ids) { |
| @@ -155,7 +119,7 @@ |
| PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); |
| } |
| -void MessagePortDispatcher::PostMessageTo( |
| +void MessagePortService::PostMessageTo( |
| int message_port_id, |
| const string16& message, |
| const std::vector<int>& sent_message_port_ids) { |
| @@ -181,44 +145,41 @@ |
| if (entangled_port.queue_messages) { |
| entangled_port.queued_messages.push_back( |
| std::make_pair(message, sent_message_port_ids)); |
| - } else { |
| + } else if (entangled_port.filter) { // TODO: can filter ever be NULL? |
|
Andrew T Wilson (Slow)
2010/12/21 19:28:23
Rather than leave this as a TODO, can we make it:
|
| // If a message port was sent around, the new location will need a routing |
| // id. Instead of having the created port send us a sync message to get it, |
| // send along with the message. |
| std::vector<int> new_routing_ids(sent_message_port_ids.size()); |
| for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { |
| - new_routing_ids[i] = entangled_port.next_routing_id->Run(); |
| - sent_ports[i]->sender = entangled_port.sender; |
| + new_routing_ids[i] = entangled_port.filter->GetNextRoutingID(); |
| + sent_ports[i]->filter = entangled_port.filter; |
| // Update the entry for the sent port as it can be in a different process. |
| sent_ports[i]->route_id = new_routing_ids[i]; |
| } |
| - if (entangled_port.sender) { |
| - // Now send the message to the entangled port. |
| - IPC::Message* ipc_msg = new WorkerProcessMsg_Message( |
| - entangled_port.route_id, message, sent_message_port_ids, |
| - new_routing_ids); |
| - entangled_port.sender->Send(ipc_msg); |
| - } |
| + // Now send the message to the entangled port. |
| + entangled_port.filter->Send(new WorkerProcessMsg_Message( |
| + entangled_port.route_id, message, sent_message_port_ids, |
| + new_routing_ids)); |
| } |
| } |
| -void MessagePortDispatcher::OnQueueMessages(int message_port_id) { |
| +void MessagePortService::QueueMessages(int message_port_id) { |
| if (!message_ports_.count(message_port_id)) { |
| NOTREACHED(); |
| return; |
| } |
| MessagePort& port = message_ports_[message_port_id]; |
| - if (port.sender) { |
| - port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); |
| + if (port.filter) { |
| + port.filter->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); |
| port.queue_messages = true; |
| - port.sender = NULL; |
| + port.filter = NULL; |
| } |
| } |
| -void MessagePortDispatcher::OnSendQueuedMessages( |
| +void MessagePortService::SendQueuedMessages( |
| int message_port_id, |
| const QueuedMessages& queued_messages) { |
| if (!message_ports_.count(message_port_id)) { |
| @@ -236,14 +197,14 @@ |
| SendQueuedMessagesIfPossible(message_port_id); |
| } |
| -void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) { |
| +void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) { |
| if (!message_ports_.count(message_port_id)) { |
| NOTREACHED(); |
| return; |
| } |
| MessagePort& port = message_ports_[message_port_id]; |
| - if (port.queue_messages || !port.sender) |
| + if (port.queue_messages || !port.filter) |
| return; |
| for (QueuedMessages::iterator iter = port.queued_messages.begin(); |
| @@ -253,29 +214,7 @@ |
| port.queued_messages.clear(); |
| } |
| -void MessagePortDispatcher::Observe(NotificationType type, |
| - const NotificationSource& source, |
| - const NotificationDetails& details) { |
| - IPC::Message::Sender* sender = NULL; |
| - if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { |
| - sender = Source<RenderMessageFilter>(source).ptr(); |
| - } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { |
| - sender = Source<WorkerProcessHost>(source).ptr(); |
| - } else { |
| - NOTREACHED(); |
| - } |
| - |
| - // Check if the (possibly) crashed process had any message ports. |
| - for (MessagePorts::iterator iter = message_ports_.begin(); |
| - iter != message_ports_.end();) { |
| - MessagePorts::iterator cur_item = iter++; |
| - if (cur_item->second.sender == sender) { |
| - Erase(cur_item->first); |
| - } |
| - } |
| -} |
| - |
| -void MessagePortDispatcher::Erase(int message_port_id) { |
| +void MessagePortService::Erase(int message_port_id) { |
| MessagePorts::iterator erase_item = message_ports_.find(message_port_id); |
| DCHECK(erase_item != message_ports_.end()); |