Index: components/copresence/copresence_manager_impl.cc |
diff --git a/components/copresence/copresence_manager_impl.cc b/components/copresence/copresence_manager_impl.cc |
index 63ad503225d9b58221c1440ea3205eb5c0de482c..f3c507eb9dddda7a794910a2860dcb1f53a63cba 100644 |
--- a/components/copresence/copresence_manager_impl.cc |
+++ b/components/copresence/copresence_manager_impl.cc |
@@ -26,6 +26,16 @@ const int kAudioCheckIntervalMs = 1000; // milliseconds. |
namespace copresence { |
+bool SupportedTokenMedium(const TokenObservation& token) { |
+ for (const TokenSignals& signals : token.signals()) { |
+ if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND || |
+ signals.medium() == AUDIO_AUDIBLE_DTMF) |
+ return true; |
+ } |
+ return false; |
+} |
+ |
+ |
// Public functions. |
CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) |
@@ -48,14 +58,21 @@ CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) |
delegate_->GetWhispernetClient()->Initialize( |
whispernet_init_callback_.callback()); |
+ MessagesCallback messages_callback = base::Bind( |
+ &CopresenceManagerImpl::DispatchMessages< |
+ google::protobuf::RepeatedPtrField<SubscribedMessage>>, |
+ base::Unretained(this)); |
rkc
2015/01/07 21:31:43
Add a one liner about why unretained is safe here.
Charlie
2015/01/07 22:06:07
Done.
|
+ |
if (delegate->GetGCMDriver()) |
gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), |
- directive_handler_.get())); |
+ directive_handler_.get(), |
+ messages_callback)); |
rpc_handler_.reset(new RpcHandler(delegate, |
state_.get(), |
directive_handler_.get(), |
- gcm_handler_.get())); |
+ gcm_handler_.get(), |
+ messages_callback)); |
} |
CopresenceManagerImpl::~CopresenceManagerImpl() { |
@@ -113,19 +130,53 @@ void CopresenceManagerImpl::ReceivedTokens( |
const std::vector<AudioToken>& tokens) { |
rpc_handler_->ReportTokens(tokens); |
- // Update the CopresenceState. |
for (const AudioToken audio_token : tokens) { |
- DVLOG(3) << "Heard token: " << audio_token.token; |
+ const std::string& token_id = audio_token.token; |
+ DVLOG(3) << "Heard token: " << token_id; |
+ |
+ // Update the CopresenceState. |
ReceivedToken token( |
- audio_token.token, |
+ token_id, |
audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, |
base::Time::Now()); |
state_->UpdateReceivedToken(token); |
+ |
+ // Deliver messages that were pre-sent on this token. |
+ if (queued_messages_by_token_.count(token_id) > 0) { |
+ // Not const because we have to remove the required tokens for delivery. |
+ // We're going to delete this whole vector at the end anyway. |
+ std::vector<SubscribedMessage>& messages = |
+ queued_messages_by_token_[token_id]; |
+ DCHECK(!messages.empty()) << "Empty entry in queued_messages_by_token_"; |
+ |
+ // These messages still have their required tokens stored, and |
+ // DispatchMessages() will still check for them. If we don't remove |
+ // the tokens before delivery, we'll just end up re-queuing the message. |
+ for (SubscribedMessage& message : messages) |
+ message.mutable_required_token()->Clear(); |
+ |
+ DVLOG(3) << "Delivering " << messages.size() |
+ << " message(s) pre-sent on token " << token_id; |
+ DispatchMessages(messages); |
+ |
+ int erase_count = queued_messages_by_token_.erase(token_id); |
rkc
2015/01/07 21:31:43
Is this the behavior we want? Currently for non-pr
Charlie
2015/01/07 22:06:07
The token will still be reported to the server and
|
+ DCHECK_GT(erase_count, 0); |
+ } |
+ } |
+} |
+ |
+void CopresenceManagerImpl::AudioCheck() { |
+ if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && |
+ !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { |
+ delegate_->HandleStatusUpdate(AUDIO_FAIL); |
+ } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && |
+ !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { |
+ delegate_->HandleStatusUpdate(AUDIO_FAIL); |
} |
} |
+// Report our currently playing tokens to the server. |
void CopresenceManagerImpl::PollForMessages() { |
- // Report our currently playing tokens. |
const std::string& audible_token = |
directive_handler_->GetCurrentAudioToken(AUDIBLE); |
const std::string& inaudible_token = |
@@ -141,13 +192,54 @@ void CopresenceManagerImpl::PollForMessages() { |
rpc_handler_->ReportTokens(tokens); |
} |
-void CopresenceManagerImpl::AudioCheck() { |
- if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && |
- !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { |
- delegate_->HandleStatusUpdate(AUDIO_FAIL); |
- } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && |
- !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { |
- delegate_->HandleStatusUpdate(AUDIO_FAIL); |
+template<typename IterableSubscribedMessages> |
+void CopresenceManagerImpl::DispatchMessages( |
+ const IterableSubscribedMessages& messages) { |
+ if (messages.size() == 0) |
+ return; |
+ |
+ // Index the messages by subscription id. |
+ std::map<std::string, std::vector<Message>> messages_by_subscription; |
+ DVLOG(3) << "Processing " << messages.size() << " received message(s)."; |
+ int immediate_message_count = 0; |
+ for (const SubscribedMessage& message : messages) { |
+ // If tokens are required for this message, queue it. |
+ // Otherwise stage it for delivery. |
+ if (message.required_token_size() > 0) { |
+ int supported_token_count = 0; |
+ for (const TokenObservation& token : message.required_token()) { |
+ if (SupportedTokenMedium(token)) { |
+ queued_messages_by_token_[token.token_id()].push_back(message); |
+ supported_token_count++; |
+ } |
+ } |
+ |
+ if (supported_token_count > 0) { |
+ DVLOG(3) << "Queued message under " << supported_token_count |
+ << "token(s)."; |
+ } else { |
+ VLOG(2) << "Discarded message that requires one of " |
+ << message.required_token_size() |
+ << " token(s), all on unsupported mediums."; |
+ } |
+ } else { |
+ immediate_message_count++; |
+ for (const std::string& subscription_id : message.subscription_id()) { |
+ messages_by_subscription[subscription_id].push_back( |
+ message.published_message()); |
+ } |
+ } |
+ } |
+ |
+ // Send the messages for each subscription. |
+ DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for " |
+ << messages_by_subscription.size() << " subscription(s)."; |
+ for (const auto& map_entry : messages_by_subscription) { |
+ // TODO(ckehoe): Once we have the app ID from the server, we need to pass |
+ // it in here and get rid of the app id registry from the main API class. |
+ const std::string& subscription = map_entry.first; |
+ const std::vector<Message>& messages = map_entry.second; |
+ delegate_->HandleMessages(std::string(), subscription, messages); |
} |
} |