Index: chrome/browser/extensions/api/messaging/message_service.cc |
diff --git a/chrome/browser/extensions/api/messaging/message_service.cc b/chrome/browser/extensions/api/messaging/message_service.cc |
index 6cde9dc2fc60ee2529c85a3aab5530b075dcc8fd..7ec23cd87002d59f35f07c153615c2619a987eec 100644 |
--- a/chrome/browser/extensions/api/messaging/message_service.cc |
+++ b/chrome/browser/extensions/api/messaging/message_service.cc |
@@ -16,7 +16,6 @@ |
#include "base/metrics/histogram.h" |
#include "base/prefs/pref_service.h" |
#include "base/stl_util.h" |
-#include "chrome/browser/chrome_notification_types.h" |
#include "chrome/browser/extensions/api/messaging/extension_message_port.h" |
#include "chrome/browser/extensions/api/messaging/incognito_connectability.h" |
#include "chrome/browser/extensions/api/messaging/native_message_port.h" |
@@ -28,7 +27,6 @@ |
#include "chrome/browser/tab_contents/tab_util.h" |
#include "components/guest_view/common/guest_view_constants.h" |
#include "content/public/browser/browser_thread.h" |
-#include "content/public/browser/notification_service.h" |
#include "content/public/browser/render_frame_host.h" |
#include "content/public/browser/render_process_host.h" |
#include "content/public/browser/render_view_host.h" |
@@ -38,6 +36,7 @@ |
#include "content/public/browser/web_contents.h" |
#include "content/public/common/child_process_host.h" |
#include "extensions/browser/event_router.h" |
+#include "extensions/browser/extension_api_frame_id_map.h" |
#include "extensions/browser/extension_host.h" |
#include "extensions/browser/extension_registry.h" |
#include "extensions/browser/extension_system.h" |
@@ -132,10 +131,9 @@ struct MessageService::MessageChannel { |
struct MessageService::OpenChannelParams { |
int source_process_id; |
+ int source_routing_id; |
scoped_ptr<base::DictionaryValue> source_tab; |
int source_frame_id; |
- int target_tab_id; |
- int target_frame_id; |
scoped_ptr<MessagePort> receiver; |
int receiver_port_id; |
std::string source_extension_id; |
@@ -148,10 +146,9 @@ struct MessageService::OpenChannelParams { |
// Takes ownership of receiver. |
OpenChannelParams(int source_process_id, |
+ int source_routing_id, |
scoped_ptr<base::DictionaryValue> source_tab, |
int source_frame_id, |
- int target_tab_id, |
- int target_frame_id, |
MessagePort* receiver, |
int receiver_port_id, |
const std::string& source_extension_id, |
@@ -161,9 +158,8 @@ struct MessageService::OpenChannelParams { |
bool include_tls_channel_id, |
bool include_guest_process_info) |
: source_process_id(source_process_id), |
+ source_routing_id(source_routing_id), |
source_frame_id(source_frame_id), |
- target_tab_id(target_tab_id), |
- target_frame_id(target_frame_id), |
receiver(receiver), |
receiver_port_id(receiver_port_id), |
source_extension_id(source_extension_id), |
@@ -195,11 +191,6 @@ static content::RenderProcessHost* GetExtensionProcess( |
} // namespace |
-content::RenderProcessHost* |
- MessageService::MessagePort::GetRenderProcessHost() { |
- return NULL; |
-} |
- |
// static |
void MessageService::AllocatePortIdPair(int* port1, int* port2) { |
DCHECK_CURRENTLY_ON(BrowserThread::IO); |
@@ -227,11 +218,6 @@ MessageService::MessageService(BrowserContext* context) |
LazyBackgroundTaskQueue::Get(context)), |
weak_factory_(this) { |
DCHECK_CURRENTLY_ON(BrowserThread::UI); |
- |
- registrar_.Add(this, content::NOTIFICATION_RENDERER_PROCESS_TERMINATED, |
- content::NotificationService::AllBrowserContextsAndSources()); |
- registrar_.Add(this, content::NOTIFICATION_RENDERER_PROCESS_CLOSED, |
- content::NotificationService::AllBrowserContextsAndSources()); |
} |
MessageService::~MessageService() { |
@@ -264,11 +250,11 @@ void MessageService::OpenChannelToExtension( |
bool include_tls_channel_id) { |
DCHECK_CURRENTLY_ON(BrowserThread::UI); |
- content::RenderProcessHost* source = |
- content::RenderProcessHost::FromID(source_process_id); |
+ content::RenderFrameHost* source = |
+ content::RenderFrameHost::FromID(source_process_id, source_routing_id); |
if (!source) |
return; |
- BrowserContext* context = source->GetBrowserContext(); |
+ BrowserContext* context = source->GetProcess()->GetBrowserContext(); |
ExtensionRegistry* registry = ExtensionRegistry::Get(context); |
const Extension* target_extension = |
@@ -339,9 +325,8 @@ void MessageService::OpenChannelToExtension( |
content::RenderFrameHost* rfh = |
content::RenderFrameHost::FromID(source_process_id, source_routing_id); |
- // Main frame's frameId is 0. |
if (rfh) |
- source_frame_id = !rfh->GetParent() ? 0 : source_routing_id; |
+ source_frame_id = ExtensionApiFrameIdMap::Get()->GetFrameId(rfh).frame_id; |
} else { |
// Check to see if it was a WebView making the request. |
// Sending messages from WebViews to extensions breaks webview isolation, |
@@ -350,20 +335,13 @@ void MessageService::OpenChannelToExtension( |
if (is_web_view && extensions::Manifest::IsComponentLocation( |
target_extension->location())) { |
include_guest_process_info = true; |
- auto* rfh = content::RenderFrameHost::FromID(source_process_id, |
- source_routing_id); |
- // Include |source_frame_id| so that we can retrieve the guest's frame |
- // routing id in OpenChannelImpl. |
- if (rfh) |
- source_frame_id = source_routing_id; |
} |
} |
scoped_ptr<OpenChannelParams> params(new OpenChannelParams( |
- source_process_id, source_tab.Pass(), source_frame_id, -1, |
- -1, // no target_tab_id/target_frame_id for connections to extensions |
- nullptr, receiver_port_id, source_extension_id, target_extension_id, |
- source_url, channel_name, include_tls_channel_id, |
+ source_process_id, source_routing_id, std::move(source_tab), |
+ source_frame_id, nullptr, receiver_port_id, source_extension_id, |
+ target_extension_id, source_url, channel_name, include_tls_channel_id, |
include_guest_process_info)); |
pending_incognito_channels_[GET_CHANNEL_ID(params->receiver_port_id)] = |
@@ -421,13 +399,14 @@ void MessageService::OpenChannelToNativeApp( |
const std::string& native_app_name) { |
DCHECK_CURRENTLY_ON(BrowserThread::UI); |
- content::RenderProcessHost* source = |
- content::RenderProcessHost::FromID(source_process_id); |
+ content::RenderFrameHost* source = |
+ content::RenderFrameHost::FromID(source_process_id, source_routing_id); |
if (!source) |
return; |
#if defined(OS_WIN) || defined(OS_MACOSX) || defined(OS_LINUX) |
- Profile* profile = Profile::FromBrowserContext(source->GetBrowserContext()); |
+ Profile* profile = |
+ Profile::FromBrowserContext(source->GetProcess()->GetBrowserContext()); |
ExtensionService* extension_service = |
ExtensionSystem::Get(profile)->extension_service(); |
bool has_permission = false; |
@@ -455,14 +434,13 @@ void MessageService::OpenChannelToNativeApp( |
} |
scoped_ptr<MessageChannel> channel(new MessageChannel()); |
- channel->opener.reset(new ExtensionMessagePort(source, MSG_ROUTING_CONTROL, |
- source_extension_id)); |
+ channel->opener.reset( |
+ new ExtensionMessagePort(weak_factory_.GetWeakPtr(), |
+ GET_OPPOSITE_PORT_ID(receiver_port_id), |
+ source_extension_id, source, false)); |
// Get handle of the native view and pass it to the native messaging host. |
- content::RenderFrameHost* render_frame_host = |
- content::RenderFrameHost::FromID(source_process_id, source_routing_id); |
- gfx::NativeView native_view = |
- render_frame_host ? render_frame_host->GetNativeView() : nullptr; |
+ gfx::NativeView native_view = source ? source->GetNativeView() : nullptr; |
std::string error = kReceivingEndDoesntExistError; |
scoped_ptr<NativeMessageHost> native_host = NativeMessageHost::Create( |
@@ -494,18 +472,21 @@ void MessageService::OpenChannelToNativeApp( |
} |
void MessageService::OpenChannelToTab(int source_process_id, |
+ int source_routing_id, |
int receiver_port_id, |
int tab_id, |
int frame_id, |
const std::string& extension_id, |
const std::string& channel_name) { |
DCHECK_CURRENTLY_ON(BrowserThread::UI); |
+ DCHECK_GE(frame_id, -1); |
- content::RenderProcessHost* source = |
- content::RenderProcessHost::FromID(source_process_id); |
+ content::RenderFrameHost* source = |
+ content::RenderFrameHost::FromID(source_process_id, source_routing_id); |
if (!source) |
return; |
- Profile* profile = Profile::FromBrowserContext(source->GetBrowserContext()); |
+ Profile* profile = |
+ Profile::FromBrowserContext(source->GetProcess()->GetBrowserContext()); |
WebContents* contents = NULL; |
scoped_ptr<MessagePort> receiver; |
@@ -518,30 +499,22 @@ void MessageService::OpenChannelToTab(int source_process_id, |
return; |
} |
- int receiver_routing_id; |
- if (frame_id > 0) { |
- // Positive frame ID is child frame. |
- int receiver_process_id = contents->GetRenderProcessHost()->GetID(); |
- if (!content::RenderFrameHost::FromID(receiver_process_id, frame_id)) { |
- // Frame does not exist. |
- DispatchOnDisconnect( |
- source, receiver_port_id, kReceivingEndDoesntExistError); |
- return; |
- } |
- receiver_routing_id = frame_id; |
- } else if (frame_id == 0) { |
- // Frame ID 0 is main frame. |
- receiver_routing_id = contents->GetMainFrame()->GetRoutingID(); |
- } else { |
- DCHECK_EQ(-1, frame_id); |
- // If the frame ID is not set (i.e. -1), then the channel has to be opened |
- // in every frame. |
- // TODO(robwu): Update logic so that frames that are not hosted in the main |
- // frame's process can also receive the port. |
- receiver_routing_id = MSG_ROUTING_CONTROL; |
+ // Frame ID -1 is every frame in the tab. |
+ bool include_child_frames = frame_id == -1; |
+ content::RenderFrameHost* receiver_rfh = |
+ include_child_frames |
+ ? contents->GetMainFrame() |
+ : ExtensionApiFrameIdMap::Get()->GetRenderFrameHostById(contents, |
+ frame_id); |
+ if (!receiver_rfh) { |
+ DispatchOnDisconnect( |
+ source, receiver_port_id, kReceivingEndDoesntExistError); |
+ return; |
} |
- receiver.reset(new ExtensionMessagePort(contents->GetRenderProcessHost(), |
- receiver_routing_id, extension_id)); |
+ receiver.reset( |
+ new ExtensionMessagePort(weak_factory_.GetWeakPtr(), |
+ receiver_port_id, extension_id, receiver_rfh, |
+ include_child_frames)); |
const Extension* extension = nullptr; |
if (!extension_id.empty()) { |
@@ -554,11 +527,11 @@ void MessageService::OpenChannelToTab(int source_process_id, |
scoped_ptr<OpenChannelParams> params(new OpenChannelParams( |
source_process_id, |
+ source_routing_id, |
scoped_ptr<base::DictionaryValue>(), // Source tab doesn't make sense |
// for opening to tabs. |
-1, // If there is no tab, then there is no frame either. |
- tab_id, frame_id, receiver.release(), receiver_port_id, extension_id, |
- extension_id, |
+ receiver.release(), receiver_port_id, extension_id, extension_id, |
GURL(), // Source URL doesn't make sense for opening to tabs. |
channel_name, |
false, // Connections to tabs don't get TLS channel IDs. |
@@ -574,44 +547,46 @@ void MessageService::OpenChannelImpl(BrowserContext* browser_context, |
DCHECK_CURRENTLY_ON(BrowserThread::UI); |
DCHECK_EQ(target_extension != nullptr, !params->target_extension_id.empty()); |
- content::RenderProcessHost* source = |
- content::RenderProcessHost::FromID(params->source_process_id); |
+ content::RenderFrameHost* source = |
+ content::RenderFrameHost::FromID(params->source_process_id, |
+ params->source_routing_id); |
if (!source) |
return; // Closed while in flight. |
- if (!params->receiver || !params->receiver->GetRenderProcessHost()) { |
+ if (!params->receiver || !params->receiver->IsValidPort()) { |
DispatchOnDisconnect(source, params->receiver_port_id, |
kReceivingEndDoesntExistError); |
return; |
} |
MessageChannel* channel(new MessageChannel); |
- channel->opener.reset(new ExtensionMessagePort(source, MSG_ROUTING_CONTROL, |
- params->source_extension_id)); |
+ channel->opener.reset( |
+ new ExtensionMessagePort(weak_factory_.GetWeakPtr(), |
+ GET_OPPOSITE_PORT_ID(params->receiver_port_id), |
+ params->source_extension_id, source, false)); |
channel->receiver.reset(params->receiver.release()); |
AddChannel(channel, params->receiver_port_id); |
+ // TODO(robwu): Could |guest_process_id| and |guest_render_frame_routing_id| |
+ // be removed? In the past extension message routing was process-based, but |
+ // now that extensions are routed from a specific RFH, the special casing for |
+ // guest views seems no longer necessary, because the ExtensionMessagePort can |
+ // simply obtain the source process & frame ID directly from the RFH. |
int guest_process_id = content::ChildProcessHost::kInvalidUniqueID; |
int guest_render_frame_routing_id = MSG_ROUTING_NONE; |
if (params->include_guest_process_info) { |
guest_process_id = params->source_process_id; |
- guest_render_frame_routing_id = params->source_frame_id; |
- auto* guest_rfh = content::RenderFrameHost::FromID( |
- guest_process_id, guest_render_frame_routing_id); |
- // Reset the |source_frame_id| parameter. |
- params->source_frame_id = -1; |
+ guest_render_frame_routing_id = params->source_routing_id; |
- DCHECK(guest_rfh == nullptr || |
- WebViewGuest::FromWebContents( |
- WebContents::FromRenderFrameHost(guest_rfh)) != nullptr); |
+ DCHECK(WebViewGuest::FromWebContents( |
+ WebContents::FromRenderFrameHost(source))); |
} |
// Send the connect event to the receiver. Give it the opener's port ID (the |
// opener has the opposite port ID). |
channel->receiver->DispatchOnConnect( |
- params->receiver_port_id, params->channel_name, params->source_tab.Pass(), |
- params->source_frame_id, params->target_tab_id, params->target_frame_id, |
- guest_process_id, guest_render_frame_routing_id, |
+ params->channel_name, std::move(params->source_tab), |
+ params->source_frame_id, guest_process_id, guest_render_frame_routing_id, |
params->source_extension_id, params->target_extension_id, |
params->source_url, params->tls_channel_id); |
@@ -660,10 +635,36 @@ void MessageService::AddChannel(MessageChannel* channel, int receiver_port_id) { |
pending_lazy_background_page_channels_.erase(channel_id); |
} |
+void MessageService::OpenPort(int port_id, int process_id, int routing_id) { |
+ DCHECK_CURRENTLY_ON(BrowserThread::UI); |
+ DCHECK(!IS_OPENER_PORT_ID(port_id)); |
+ |
+ int channel_id = GET_CHANNEL_ID(port_id); |
+ MessageChannelMap::iterator it = channels_.find(channel_id); |
+ if (it == channels_.end()) |
+ return; |
+ |
+ it->second->receiver->OpenPort(process_id, routing_id); |
+} |
+ |
+void MessageService::ClosePort( |
+ int port_id, int process_id, int routing_id, bool force_close) { |
+ DCHECK_CURRENTLY_ON(BrowserThread::UI); |
+ ClosePortImpl(port_id, process_id, routing_id, force_close, std::string()); |
+} |
+ |
void MessageService::CloseChannel(int port_id, |
const std::string& error_message) { |
DCHECK_CURRENTLY_ON(BrowserThread::UI); |
+ ClosePortImpl(port_id, content::ChildProcessHost::kInvalidUniqueID, |
+ MSG_ROUTING_NONE, true, error_message); |
+} |
+void MessageService::ClosePortImpl(int port_id, |
+ int process_id, |
+ int routing_id, |
+ bool force_close, |
+ const std::string& error_message) { |
// Note: The channel might be gone already, if the other side closed first. |
int channel_id = GET_CHANNEL_ID(port_id); |
MessageChannelMap::iterator it = channels_.find(channel_id); |
@@ -673,12 +674,23 @@ void MessageService::CloseChannel(int port_id, |
if (pending != pending_lazy_background_page_channels_.end()) { |
lazy_background_task_queue_->AddPendingTask( |
pending->second.first, pending->second.second, |
- base::Bind(&MessageService::PendingLazyBackgroundPageCloseChannel, |
- weak_factory_.GetWeakPtr(), port_id, error_message)); |
+ base::Bind(&MessageService::PendingLazyBackgroundPageClosePort, |
+ weak_factory_.GetWeakPtr(), port_id, process_id, |
+ routing_id, force_close, error_message)); |
} |
return; |
} |
- CloseChannelImpl(it, port_id, error_message, true); |
+ |
+ // The difference between closing a channel and port is that closing a port |
+ // does not necessarily have to destroy the channel if there are multiple |
+ // receivers, whereas closing a channel always forces all ports to be closed. |
+ if (force_close) { |
+ CloseChannelImpl(it, port_id, error_message, true); |
+ } else if (IS_OPENER_PORT_ID(port_id)) { |
+ it->second->opener->ClosePort(process_id, routing_id); |
+ } else { |
+ it->second->receiver->ClosePort(process_id, routing_id); |
+ } |
} |
void MessageService::CloseChannelImpl( |
@@ -694,8 +706,7 @@ void MessageService::CloseChannelImpl( |
if (notify_other_port) { |
MessagePort* port = IS_OPENER_PORT_ID(closing_port_id) ? |
channel->receiver.get() : channel->opener.get(); |
- port->DispatchOnDisconnect(GET_OPPOSITE_PORT_ID(closing_port_id), |
- error_message); |
+ port->DispatchOnDisconnect(error_message); |
} |
// Balance the IncrementLazyKeepaliveCount() in OpenChannelImpl. |
@@ -721,53 +732,6 @@ void MessageService::PostMessage(int source_port_id, const Message& message) { |
DispatchMessage(source_port_id, iter->second, message); |
} |
-void MessageService::Observe(int type, |
- const content::NotificationSource& source, |
- const content::NotificationDetails& details) { |
- DCHECK_CURRENTLY_ON(BrowserThread::UI); |
- |
- switch (type) { |
- case content::NOTIFICATION_RENDERER_PROCESS_TERMINATED: |
- case content::NOTIFICATION_RENDERER_PROCESS_CLOSED: { |
- content::RenderProcessHost* renderer = |
- content::Source<content::RenderProcessHost>(source).ptr(); |
- OnProcessClosed(renderer); |
- break; |
- } |
- default: |
- NOTREACHED(); |
- return; |
- } |
-} |
- |
-void MessageService::OnProcessClosed(content::RenderProcessHost* process) { |
- DCHECK_CURRENTLY_ON(BrowserThread::UI); |
- |
- // Close any channels that share this renderer. We notify the opposite |
- // port that its pair has closed. |
- for (MessageChannelMap::iterator it = channels_.begin(); |
- it != channels_.end(); ) { |
- MessageChannelMap::iterator current = it++; |
- |
- content::RenderProcessHost* opener_process = |
- current->second->opener->GetRenderProcessHost(); |
- content::RenderProcessHost* receiver_process = |
- current->second->receiver->GetRenderProcessHost(); |
- |
- // Only notify the other side if it has a different porocess host. |
- bool notify_other_port = opener_process && receiver_process && |
- opener_process != receiver_process; |
- |
- if (opener_process == process) { |
- CloseChannelImpl(current, GET_CHANNEL_OPENER_ID(current->first), |
- std::string(), notify_other_port); |
- } else if (receiver_process == process) { |
- CloseChannelImpl(current, GET_CHANNEL_RECEIVERS_ID(current->first), |
- std::string(), notify_other_port); |
- } |
- } |
-} |
- |
void MessageService::EnqueuePendingMessage(int source_port_id, |
int channel_id, |
const Message& message) { |
@@ -825,7 +789,7 @@ void MessageService::DispatchMessage(int source_port_id, |
MessagePort* port = IS_OPENER_PORT_ID(dest_port_id) ? |
channel->opener.get() : channel->receiver.get(); |
- port->DispatchOnMessage(message, dest_port_id); |
+ port->DispatchOnMessage(message); |
} |
bool MessageService::MaybeAddPendingLazyBackgroundPageOpenChannelTask( |
@@ -880,8 +844,9 @@ void MessageService::OnOpenChannelAllowed(scoped_ptr<OpenChannelParams> params, |
pending_incognito_channels_.erase(pending_for_incognito); |
// Re-lookup the source process since it may no longer be valid. |
- content::RenderProcessHost* source = |
- content::RenderProcessHost::FromID(params->source_process_id); |
+ content::RenderFrameHost* source = |
+ content::RenderFrameHost::FromID(params->source_process_id, |
+ params->source_routing_id); |
if (!source) { |
return; |
} |
@@ -892,14 +857,20 @@ void MessageService::OnOpenChannelAllowed(scoped_ptr<OpenChannelParams> params, |
return; |
} |
- BrowserContext* context = source->GetBrowserContext(); |
+ BrowserContext* context = source->GetProcess()->GetBrowserContext(); |
// Note: we use the source's profile here. If the source is an incognito |
// process, we will use the incognito EPM to find the right extension process, |
// which depends on whether the extension uses spanning or split mode. |
- params->receiver.reset(new ExtensionMessagePort( |
- GetExtensionProcess(context, params->target_extension_id), |
- MSG_ROUTING_CONTROL, params->target_extension_id)); |
+ if (content::RenderProcessHost* extension_process = |
+ GetExtensionProcess(context, params->target_extension_id)) { |
+ params->receiver.reset( |
+ new ExtensionMessagePort( |
+ weak_factory_.GetWeakPtr(), params->receiver_port_id, |
+ params->target_extension_id, extension_process)); |
+ } else { |
+ params->receiver.reset(); |
+ } |
// If the target requests the TLS channel id, begin the lookup for it. |
// The target might also be a lazy background page, checked next, but the |
@@ -955,13 +926,14 @@ void MessageService::GotChannelID(scoped_ptr<OpenChannelParams> params, |
pending_tls_channel_id_channels_.erase(pending_for_tls_channel_id); |
// Re-lookup the source process since it may no longer be valid. |
- content::RenderProcessHost* source = |
- content::RenderProcessHost::FromID(params->source_process_id); |
+ content::RenderFrameHost* source = |
+ content::RenderFrameHost::FromID(params->source_process_id, |
+ params->source_routing_id); |
if (!source) { |
return; |
} |
- BrowserContext* context = source->GetBrowserContext(); |
+ BrowserContext* context = source->GetProcess()->GetBrowserContext(); |
ExtensionRegistry* registry = ExtensionRegistry::Get(context); |
const Extension* target_extension = |
registry->enabled_extensions().GetByID(params->target_extension_id); |
@@ -988,20 +960,22 @@ void MessageService::PendingLazyBackgroundPageOpenChannel( |
if (!host) |
return; // TODO(mpcomplete): notify source of disconnect? |
- params->receiver.reset(new ExtensionMessagePort(host->render_process_host(), |
- MSG_ROUTING_CONTROL, |
- params->target_extension_id)); |
+ params->receiver.reset( |
+ new ExtensionMessagePort( |
+ weak_factory_.GetWeakPtr(), params->receiver_port_id, |
+ params->target_extension_id, host->render_process_host())); |
OpenChannelImpl(host->browser_context(), params.Pass(), host->extension(), |
true /* did_enqueue */); |
} |
-void MessageService::DispatchOnDisconnect(content::RenderProcessHost* source, |
+void MessageService::DispatchOnDisconnect(content::RenderFrameHost* source, |
int port_id, |
const std::string& error_message) { |
DCHECK_CURRENTLY_ON(BrowserThread::UI); |
- ExtensionMessagePort port(source, MSG_ROUTING_CONTROL, ""); |
- port.DispatchOnDisconnect(GET_OPPOSITE_PORT_ID(port_id), error_message); |
+ ExtensionMessagePort port(weak_factory_.GetWeakPtr(), |
+ GET_OPPOSITE_PORT_ID(port_id), "", source, false); |
+ port.DispatchOnDisconnect(error_message); |
} |
void MessageService::DispatchPendingMessages(const PendingMessagesQueue& queue, |