Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Unified Diff: components/copresence/copresence_manager_impl.cc

Issue 813553002: Adding support for pre-sent messages (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@state
Patch Set: Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « components/copresence/copresence_manager_impl.h ('k') | components/copresence/handlers/gcm_handler_impl.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: components/copresence/copresence_manager_impl.cc
diff --git a/components/copresence/copresence_manager_impl.cc b/components/copresence/copresence_manager_impl.cc
index 63ad503225d9b58221c1440ea3205eb5c0de482c..89d5e8a1bcad4a33203150033a22034c1635fb53 100644
--- a/components/copresence/copresence_manager_impl.cc
+++ b/components/copresence/copresence_manager_impl.cc
@@ -4,6 +4,7 @@
#include "components/copresence/copresence_manager_impl.h"
+#include <map>
#include <vector>
#include "base/bind.h"
@@ -17,15 +18,30 @@
#include "components/copresence/public/whispernet_client.h"
#include "components/copresence/rpc/rpc_handler.h"
+using google::protobuf::RepeatedPtrField;
+
namespace {
const int kPollTimerIntervalMs = 3000; // milliseconds.
const int kAudioCheckIntervalMs = 1000; // milliseconds.
+const int kQueuedMessageTimeout = 10; // seconds.
+const int kMaxQueuedMessages = 1000;
+
} // namespace
namespace copresence {
+bool SupportedTokenMedium(const TokenObservation& token) {
+ for (const TokenSignals& signals : token.signals()) {
+ if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND ||
+ signals.medium() == AUDIO_AUDIBLE_DTMF)
+ return true;
+ }
+ return false;
+}
+
+
// Public functions.
CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
@@ -42,20 +58,30 @@ CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
base::Bind(&CopresenceStateImpl::UpdateDirectives,
base::Unretained(state_.get())))),
poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
- audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) {
+ audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
+ queued_messages_by_token_(
+ base::TimeDelta::FromSeconds(kQueuedMessageTimeout),
+ kMaxQueuedMessages) {
DCHECK(delegate_);
DCHECK(delegate_->GetWhispernetClient());
delegate_->GetWhispernetClient()->Initialize(
whispernet_init_callback_.callback());
+ MessagesCallback messages_callback = base::Bind(
+ &CopresenceManagerImpl::DispatchMessages,
+ // This will only be passed to objects that we own.
+ base::Unretained(this));
+
if (delegate->GetGCMDriver())
gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(),
- directive_handler_.get()));
+ directive_handler_.get(),
+ messages_callback));
rpc_handler_.reset(new RpcHandler(delegate,
state_.get(),
directive_handler_.get(),
- gcm_handler_.get()));
+ gcm_handler_.get(),
+ messages_callback));
}
CopresenceManagerImpl::~CopresenceManagerImpl() {
@@ -113,19 +139,58 @@ void CopresenceManagerImpl::ReceivedTokens(
const std::vector<AudioToken>& tokens) {
rpc_handler_->ReportTokens(tokens);
- // Update the CopresenceState.
for (const AudioToken audio_token : tokens) {
- DVLOG(3) << "Heard token: " << audio_token.token;
+ const std::string& token_id = audio_token.token;
+ DVLOG(3) << "Heard token: " << token_id;
+
+ // Update the CopresenceState.
ReceivedToken token(
- audio_token.token,
+ token_id,
audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND,
base::Time::Now());
state_->UpdateReceivedToken(token);
+
+ // Deliver messages that were pre-sent on this token.
+ if (queued_messages_by_token_.HasKey(token_id)) {
+ // Not const because we have to remove the required tokens for delivery.
+ // We're going to delete this whole vector at the end anyway.
+ RepeatedPtrField<SubscribedMessage>* messages =
+ queued_messages_by_token_.GetMutableValue(token_id);
+ DCHECK_GT(messages->size(), 0)
+ << "Empty entry in queued_messages_by_token_";
+
+ // These messages still have their required tokens stored, and
+ // DispatchMessages() will still check for them. If we don't remove
+ // the tokens before delivery, we'll just end up re-queuing the message.
+ for (SubscribedMessage& message : *messages)
+ message.mutable_required_token()->Clear();
+
+ DVLOG(3) << "Delivering " << messages->size()
+ << " message(s) pre-sent on token " << token_id;
+ DispatchMessages(*messages);
+
+ // The messages have been delivered, so we don't need to keep them
+ // in the queue. Note that the token will still be reported
+ // to the server (above), so we'll keep getting the message.
+ // But we can now drop our local copy of it.
+ int erase_count = queued_messages_by_token_.Erase(token_id);
+ DCHECK_GT(erase_count, 0);
+ }
+ }
+}
+
+void CopresenceManagerImpl::AudioCheck() {
+ if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() &&
+ !directive_handler_->IsAudioTokenHeard(AUDIBLE)) {
+ delegate_->HandleStatusUpdate(AUDIO_FAIL);
+ } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() &&
+ !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) {
+ delegate_->HandleStatusUpdate(AUDIO_FAIL);
}
}
+// Report our currently playing tokens to the server.
void CopresenceManagerImpl::PollForMessages() {
- // Report our currently playing tokens.
const std::string& audible_token =
directive_handler_->GetCurrentAudioToken(AUDIBLE);
const std::string& inaudible_token =
@@ -141,13 +206,60 @@ void CopresenceManagerImpl::PollForMessages() {
rpc_handler_->ReportTokens(tokens);
}
-void CopresenceManagerImpl::AudioCheck() {
- if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() &&
- !directive_handler_->IsAudioTokenHeard(AUDIBLE)) {
- delegate_->HandleStatusUpdate(AUDIO_FAIL);
- } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() &&
- !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) {
- delegate_->HandleStatusUpdate(AUDIO_FAIL);
+void CopresenceManagerImpl::DispatchMessages(
+ const RepeatedPtrField<SubscribedMessage>& messages) {
+ if (messages.size() == 0)
+ return;
+
+ // Index the messages by subscription id.
+ std::map<std::string, std::vector<Message>> messages_by_subscription;
+ DVLOG(3) << "Processing " << messages.size() << " received message(s).";
+ int immediate_message_count = 0;
+ for (const SubscribedMessage& message : messages) {
+ // If tokens are required for this message, queue it.
+ // Otherwise stage it for delivery.
+ if (message.required_token_size() > 0) {
+ int supported_token_count = 0;
+ for (const TokenObservation& token : message.required_token()) {
+ if (SupportedTokenMedium(token)) {
+ if (!queued_messages_by_token_.HasKey(token.token_id())) {
+ queued_messages_by_token_.Add(
+ token.token_id(), RepeatedPtrField<SubscribedMessage>());
+ }
+ RepeatedPtrField<SubscribedMessage>* queued_messages =
+ queued_messages_by_token_.GetMutableValue(token.token_id());
+ DCHECK(queued_messages);
+ queued_messages->Add()->CopyFrom(message);
+ supported_token_count++;
+ }
+ }
+
+ if (supported_token_count > 0) {
+ DVLOG(3) << "Queued message under " << supported_token_count
+ << "token(s).";
+ } else {
+ VLOG(2) << "Discarded message that requires one of "
+ << message.required_token_size()
+ << " token(s), all on unsupported mediums.";
+ }
+ } else {
+ immediate_message_count++;
+ for (const std::string& subscription_id : message.subscription_id()) {
+ messages_by_subscription[subscription_id].push_back(
+ message.published_message());
+ }
+ }
+ }
+
+ // Send the messages for each subscription.
+ DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for "
+ << messages_by_subscription.size() << " subscription(s).";
+ for (const auto& map_entry : messages_by_subscription) {
+ // TODO(ckehoe): Once we have the app ID from the server, we need to pass
+ // it in here and get rid of the app id registry from the main API class.
+ const std::string& subscription = map_entry.first;
+ const std::vector<Message>& messages = map_entry.second;
+ delegate_->HandleMessages(std::string(), subscription, messages);
}
}
« no previous file with comments | « components/copresence/copresence_manager_impl.h ('k') | components/copresence/handlers/gcm_handler_impl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698