| Index: components/copresence/copresence_manager_impl.cc
|
| diff --git a/components/copresence/copresence_manager_impl.cc b/components/copresence/copresence_manager_impl.cc
|
| index 63ad503225d9b58221c1440ea3205eb5c0de482c..5434b4f20d7117973abb802cb4081f277dc4a2ca 100644
|
| --- a/components/copresence/copresence_manager_impl.cc
|
| +++ b/components/copresence/copresence_manager_impl.cc
|
| @@ -4,6 +4,7 @@
|
|
|
| #include "components/copresence/copresence_manager_impl.h"
|
|
|
| +#include <map>
|
| #include <vector>
|
|
|
| #include "base/bind.h"
|
| @@ -22,10 +23,23 @@ namespace {
|
| const int kPollTimerIntervalMs = 3000; // milliseconds.
|
| const int kAudioCheckIntervalMs = 1000; // milliseconds.
|
|
|
| +const int kQueuedMessageTimeout = 10; // seconds.
|
| +const int kMaxQueuedMessages = 1000;
|
| +
|
| } // namespace
|
|
|
| namespace copresence {
|
|
|
| +bool SupportedTokenMedium(const TokenObservation& token) {
|
| + for (const TokenSignals& signals : token.signals()) {
|
| + if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND ||
|
| + signals.medium() == AUDIO_AUDIBLE_DTMF)
|
| + return true;
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +
|
| // Public functions.
|
|
|
| CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
|
| @@ -42,20 +56,31 @@ CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
|
| base::Bind(&CopresenceStateImpl::UpdateDirectives,
|
| base::Unretained(state_.get())))),
|
| poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
|
| - audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) {
|
| + audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
|
| + queued_messages_by_token_(
|
| + base::TimeDelta::FromSeconds(kQueuedMessageTimeout),
|
| + kMaxQueuedMessages) {
|
| DCHECK(delegate_);
|
| DCHECK(delegate_->GetWhispernetClient());
|
| delegate_->GetWhispernetClient()->Initialize(
|
| whispernet_init_callback_.callback());
|
|
|
| + MessagesCallback messages_callback = base::Bind(
|
| + &CopresenceManagerImpl::DispatchMessages<
|
| + google::protobuf::RepeatedPtrField<SubscribedMessage>>,
|
| + // This will only be passed to objects that we own.
|
| + base::Unretained(this));
|
| +
|
| if (delegate->GetGCMDriver())
|
| gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(),
|
| - directive_handler_.get()));
|
| + directive_handler_.get(),
|
| + messages_callback));
|
|
|
| rpc_handler_.reset(new RpcHandler(delegate,
|
| state_.get(),
|
| directive_handler_.get(),
|
| - gcm_handler_.get()));
|
| + gcm_handler_.get(),
|
| + messages_callback));
|
| }
|
|
|
| CopresenceManagerImpl::~CopresenceManagerImpl() {
|
| @@ -113,19 +138,57 @@ void CopresenceManagerImpl::ReceivedTokens(
|
| const std::vector<AudioToken>& tokens) {
|
| rpc_handler_->ReportTokens(tokens);
|
|
|
| - // Update the CopresenceState.
|
| for (const AudioToken audio_token : tokens) {
|
| - DVLOG(3) << "Heard token: " << audio_token.token;
|
| + const std::string& token_id = audio_token.token;
|
| + DVLOG(3) << "Heard token: " << token_id;
|
| +
|
| + // Update the CopresenceState.
|
| ReceivedToken token(
|
| - audio_token.token,
|
| + token_id,
|
| audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND,
|
| base::Time::Now());
|
| state_->UpdateReceivedToken(token);
|
| +
|
| + // Deliver messages that were pre-sent on this token.
|
| + if (queued_messages_by_token_.HasKey(token_id)) {
|
| + // Not const because we have to remove the required tokens for delivery.
|
| + // We're going to delete this whole vector at the end anyway.
|
| + std::vector<SubscribedMessage>* messages =
|
| + queued_messages_by_token_.GetMutableValue(token_id);
|
| + DCHECK(!messages->empty()) << "Empty entry in queued_messages_by_token_";
|
| +
|
| + // These messages still have their required tokens stored, and
|
| + // DispatchMessages() will still check for them. If we don't remove
|
| + // the tokens before delivery, we'll just end up re-queuing the message.
|
| + for (SubscribedMessage& message : *messages)
|
| + message.mutable_required_token()->Clear();
|
| +
|
| + DVLOG(3) << "Delivering " << messages->size()
|
| + << " message(s) pre-sent on token " << token_id;
|
| + DispatchMessages(*messages);
|
| +
|
| + // The messages have been delivered, so we don't need to keep them
|
| + // in the queue. Note that the token will still be reported
|
| + // to the server (above), so we'll keep getting the message.
|
| + // But we can now drop our local copy of it.
|
| + int erase_count = queued_messages_by_token_.Erase(token_id);
|
| + DCHECK_GT(erase_count, 0);
|
| + }
|
| }
|
| }
|
|
|
| +void CopresenceManagerImpl::AudioCheck() {
|
| + if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() &&
|
| + !directive_handler_->IsAudioTokenHeard(AUDIBLE)) {
|
| + delegate_->HandleStatusUpdate(AUDIO_FAIL);
|
| + } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() &&
|
| + !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) {
|
| + delegate_->HandleStatusUpdate(AUDIO_FAIL);
|
| + }
|
| +}
|
| +
|
| +// Report our currently playing tokens to the server.
|
| void CopresenceManagerImpl::PollForMessages() {
|
| - // Report our currently playing tokens.
|
| const std::string& audible_token =
|
| directive_handler_->GetCurrentAudioToken(AUDIBLE);
|
| const std::string& inaudible_token =
|
| @@ -141,13 +204,55 @@ void CopresenceManagerImpl::PollForMessages() {
|
| rpc_handler_->ReportTokens(tokens);
|
| }
|
|
|
| -void CopresenceManagerImpl::AudioCheck() {
|
| - if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() &&
|
| - !directive_handler_->IsAudioTokenHeard(AUDIBLE)) {
|
| - delegate_->HandleStatusUpdate(AUDIO_FAIL);
|
| - } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() &&
|
| - !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) {
|
| - delegate_->HandleStatusUpdate(AUDIO_FAIL);
|
| +template<typename IterableSubscribedMessages>
|
| +void CopresenceManagerImpl::DispatchMessages(
|
| + const IterableSubscribedMessages& messages) {
|
| + if (messages.size() == 0)
|
| + return;
|
| +
|
| + // Index the messages by subscription id.
|
| + std::map<std::string, std::vector<Message>> messages_by_subscription;
|
| + DVLOG(3) << "Processing " << messages.size() << " received message(s).";
|
| + int immediate_message_count = 0;
|
| + for (const SubscribedMessage& message : messages) {
|
| + // If tokens are required for this message, queue it.
|
| + // Otherwise stage it for delivery.
|
| + if (message.required_token_size() > 0) {
|
| + int supported_token_count = 0;
|
| + for (const TokenObservation& token : message.required_token()) {
|
| + if (SupportedTokenMedium(token)) {
|
| + queued_messages_by_token_.GetMutableValue(token.token_id())
|
| + ->push_back(message);
|
| + supported_token_count++;
|
| + }
|
| + }
|
| +
|
| + if (supported_token_count > 0) {
|
| + DVLOG(3) << "Queued message under " << supported_token_count
|
| + << "token(s).";
|
| + } else {
|
| + VLOG(2) << "Discarded message that requires one of "
|
| + << message.required_token_size()
|
| + << " token(s), all on unsupported mediums.";
|
| + }
|
| + } else {
|
| + immediate_message_count++;
|
| + for (const std::string& subscription_id : message.subscription_id()) {
|
| + messages_by_subscription[subscription_id].push_back(
|
| + message.published_message());
|
| + }
|
| + }
|
| + }
|
| +
|
| + // Send the messages for each subscription.
|
| + DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for "
|
| + << messages_by_subscription.size() << " subscription(s).";
|
| + for (const auto& map_entry : messages_by_subscription) {
|
| + // TODO(ckehoe): Once we have the app ID from the server, we need to pass
|
| + // it in here and get rid of the app id registry from the main API class.
|
| + const std::string& subscription = map_entry.first;
|
| + const std::vector<Message>& messages = map_entry.second;
|
| + delegate_->HandleMessages(std::string(), subscription, messages);
|
| }
|
| }
|
|
|
|
|