Chromium Code Reviews| Index: components/copresence/copresence_manager_impl.cc |
| diff --git a/components/copresence/copresence_manager_impl.cc b/components/copresence/copresence_manager_impl.cc |
| index 63ad503225d9b58221c1440ea3205eb5c0de482c..f3c507eb9dddda7a794910a2860dcb1f53a63cba 100644 |
| --- a/components/copresence/copresence_manager_impl.cc |
| +++ b/components/copresence/copresence_manager_impl.cc |
| @@ -26,6 +26,16 @@ const int kAudioCheckIntervalMs = 1000; // milliseconds. |
| namespace copresence { |
| +bool SupportedTokenMedium(const TokenObservation& token) { |
| + for (const TokenSignals& signals : token.signals()) { |
| + if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND || |
| + signals.medium() == AUDIO_AUDIBLE_DTMF) |
| + return true; |
| + } |
| + return false; |
| +} |
| + |
| + |
| // Public functions. |
| CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) |
| @@ -48,14 +58,21 @@ CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) |
| delegate_->GetWhispernetClient()->Initialize( |
| whispernet_init_callback_.callback()); |
| + MessagesCallback messages_callback = base::Bind( |
| + &CopresenceManagerImpl::DispatchMessages< |
| + google::protobuf::RepeatedPtrField<SubscribedMessage>>, |
| + base::Unretained(this)); |
|
rkc
2015/01/07 21:31:43
Add a one liner about why unretained is safe here.
Charlie
2015/01/07 22:06:07
Done.
|
| + |
| if (delegate->GetGCMDriver()) |
| gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), |
| - directive_handler_.get())); |
| + directive_handler_.get(), |
| + messages_callback)); |
| rpc_handler_.reset(new RpcHandler(delegate, |
| state_.get(), |
| directive_handler_.get(), |
| - gcm_handler_.get())); |
| + gcm_handler_.get(), |
| + messages_callback)); |
| } |
| CopresenceManagerImpl::~CopresenceManagerImpl() { |
| @@ -113,19 +130,53 @@ void CopresenceManagerImpl::ReceivedTokens( |
| const std::vector<AudioToken>& tokens) { |
| rpc_handler_->ReportTokens(tokens); |
| - // Update the CopresenceState. |
| for (const AudioToken audio_token : tokens) { |
| - DVLOG(3) << "Heard token: " << audio_token.token; |
| + const std::string& token_id = audio_token.token; |
| + DVLOG(3) << "Heard token: " << token_id; |
| + |
| + // Update the CopresenceState. |
| ReceivedToken token( |
| - audio_token.token, |
| + token_id, |
| audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, |
| base::Time::Now()); |
| state_->UpdateReceivedToken(token); |
| + |
| + // Deliver messages that were pre-sent on this token. |
| + if (queued_messages_by_token_.count(token_id) > 0) { |
| + // Not const because we have to remove the required tokens for delivery. |
| + // We're going to delete this whole vector at the end anyway. |
| + std::vector<SubscribedMessage>& messages = |
| + queued_messages_by_token_[token_id]; |
| + DCHECK(!messages.empty()) << "Empty entry in queued_messages_by_token_"; |
| + |
| + // These messages still have their required tokens stored, and |
| + // DispatchMessages() will still check for them. If we don't remove |
| + // the tokens before delivery, we'll just end up re-queuing the message. |
| + for (SubscribedMessage& message : messages) |
| + message.mutable_required_token()->Clear(); |
| + |
| + DVLOG(3) << "Delivering " << messages.size() |
| + << " message(s) pre-sent on token " << token_id; |
| + DispatchMessages(messages); |
| + |
| + int erase_count = queued_messages_by_token_.erase(token_id); |
|
rkc
2015/01/07 21:31:43
Is this the behavior we want? Currently for non-pr
Charlie
2015/01/07 22:06:07
The token will still be reported to the server and
|
| + DCHECK_GT(erase_count, 0); |
| + } |
| + } |
| +} |
| + |
| +void CopresenceManagerImpl::AudioCheck() { |
| + if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && |
| + !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { |
| + delegate_->HandleStatusUpdate(AUDIO_FAIL); |
| + } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && |
| + !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { |
| + delegate_->HandleStatusUpdate(AUDIO_FAIL); |
| } |
| } |
| +// Report our currently playing tokens to the server. |
| void CopresenceManagerImpl::PollForMessages() { |
| - // Report our currently playing tokens. |
| const std::string& audible_token = |
| directive_handler_->GetCurrentAudioToken(AUDIBLE); |
| const std::string& inaudible_token = |
| @@ -141,13 +192,54 @@ void CopresenceManagerImpl::PollForMessages() { |
| rpc_handler_->ReportTokens(tokens); |
| } |
| -void CopresenceManagerImpl::AudioCheck() { |
| - if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && |
| - !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { |
| - delegate_->HandleStatusUpdate(AUDIO_FAIL); |
| - } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && |
| - !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { |
| - delegate_->HandleStatusUpdate(AUDIO_FAIL); |
| +template<typename IterableSubscribedMessages> |
| +void CopresenceManagerImpl::DispatchMessages( |
| + const IterableSubscribedMessages& messages) { |
| + if (messages.size() == 0) |
| + return; |
| + |
| + // Index the messages by subscription id. |
| + std::map<std::string, std::vector<Message>> messages_by_subscription; |
| + DVLOG(3) << "Processing " << messages.size() << " received message(s)."; |
| + int immediate_message_count = 0; |
| + for (const SubscribedMessage& message : messages) { |
| + // If tokens are required for this message, queue it. |
| + // Otherwise stage it for delivery. |
| + if (message.required_token_size() > 0) { |
| + int supported_token_count = 0; |
| + for (const TokenObservation& token : message.required_token()) { |
| + if (SupportedTokenMedium(token)) { |
| + queued_messages_by_token_[token.token_id()].push_back(message); |
| + supported_token_count++; |
| + } |
| + } |
| + |
| + if (supported_token_count > 0) { |
| + DVLOG(3) << "Queued message under " << supported_token_count |
| + << "token(s)."; |
| + } else { |
| + VLOG(2) << "Discarded message that requires one of " |
| + << message.required_token_size() |
| + << " token(s), all on unsupported mediums."; |
| + } |
| + } else { |
| + immediate_message_count++; |
| + for (const std::string& subscription_id : message.subscription_id()) { |
| + messages_by_subscription[subscription_id].push_back( |
| + message.published_message()); |
| + } |
| + } |
| + } |
| + |
| + // Send the messages for each subscription. |
| + DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for " |
| + << messages_by_subscription.size() << " subscription(s)."; |
| + for (const auto& map_entry : messages_by_subscription) { |
| + // TODO(ckehoe): Once we have the app ID from the server, we need to pass |
| + // it in here and get rid of the app id registry from the main API class. |
| + const std::string& subscription = map_entry.first; |
| + const std::vector<Message>& messages = map_entry.second; |
| + delegate_->HandleMessages(std::string(), subscription, messages); |
| } |
| } |